risingwave_frontend/optimizer/plan_node/generic/
locality_provider.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::Pretty;
16use risingwave_common::catalog::{FieldDisplay, Schema};
17
18use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
19use crate::expr::ExprRewriter;
20use crate::optimizer::optimizer_context::OptimizerContextRef;
21use crate::optimizer::property::FunctionalDependencySet;
22
23/// `LocalityProvider` provides locality for operators during backfilling.
24/// It buffers input data into a state table using locality columns as primary key prefix.
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub struct LocalityProvider<PlanRef> {
27    pub input: PlanRef,
28    /// Columns that define the locality
29    pub locality_columns: Vec<usize>,
30}
31
32impl<PlanRef: GenericPlanRef> LocalityProvider<PlanRef> {
33    pub fn new(input: PlanRef, locality_columns: Vec<usize>) -> Self {
34        Self {
35            input,
36            locality_columns,
37        }
38    }
39
40    pub fn fields_pretty<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> {
41        let locality_columns_display = self
42            .locality_columns
43            .iter()
44            .map(|&i| Pretty::display(&FieldDisplay(self.input.schema().fields.get(i).unwrap())))
45            .collect();
46        vec![("locality_columns", Pretty::Array(locality_columns_display))]
47    }
48}
49
50impl<PlanRef: GenericPlanRef> GenericPlanNode for LocalityProvider<PlanRef> {
51    fn schema(&self) -> Schema {
52        self.input.schema().clone()
53    }
54
55    fn stream_key(&self) -> Option<Vec<usize>> {
56        Some(self.input.stream_key()?.to_vec())
57    }
58
59    fn ctx(&self) -> OptimizerContextRef {
60        self.input.ctx()
61    }
62
63    fn functional_dependency(&self) -> FunctionalDependencySet {
64        self.input.functional_dependency().clone()
65    }
66}
67
68impl<PlanRef: GenericPlanRef> LocalityProvider<PlanRef> {
69    pub fn rewrite_exprs(&mut self, _r: &mut dyn ExprRewriter) {
70        // LocalityProvider doesn't contain expressions to rewrite
71    }
72
73    pub fn visit_exprs(&self, _v: &mut dyn crate::expr::ExprVisitor) {
74        // LocalityProvider doesn't contain expressions to visit
75    }
76}
77
78impl_distill_unit_from_fields!(LocalityProvider, GenericPlanRef);