risingwave_frontend/optimizer/plan_node/generic/
locality_provider.rs1use pretty_xmlish::Pretty;
16use risingwave_common::catalog::{FieldDisplay, Schema};
17
18use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
19use crate::expr::ExprRewriter;
20use crate::optimizer::optimizer_context::OptimizerContextRef;
21use crate::optimizer::property::FunctionalDependencySet;
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub struct LocalityProvider<PlanRef> {
27    pub input: PlanRef,
28    pub locality_columns: Vec<usize>,
30}
31
32impl<PlanRef: GenericPlanRef> LocalityProvider<PlanRef> {
33    pub fn new(input: PlanRef, locality_columns: Vec<usize>) -> Self {
34        Self {
35            input,
36            locality_columns,
37        }
38    }
39
40    pub fn fields_pretty<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> {
41        let locality_columns_display = self
42            .locality_columns
43            .iter()
44            .map(|&i| Pretty::display(&FieldDisplay(self.input.schema().fields.get(i).unwrap())))
45            .collect();
46        vec![("locality_columns", Pretty::Array(locality_columns_display))]
47    }
48}
49
50impl<PlanRef: GenericPlanRef> GenericPlanNode for LocalityProvider<PlanRef> {
51    fn schema(&self) -> Schema {
52        self.input.schema().clone()
53    }
54
55    fn stream_key(&self) -> Option<Vec<usize>> {
56        let mut stream_key = self.locality_columns.clone();
57        if let Some(input_stream_key) = self.input.stream_key() {
58            for col in input_stream_key {
59                if !stream_key.contains(col) {
60                    stream_key.push(*col);
61                }
62            }
63        } else {
64            return None;
65        }
66        Some(stream_key)
67    }
68
69    fn ctx(&self) -> OptimizerContextRef {
70        self.input.ctx()
71    }
72
73    fn functional_dependency(&self) -> FunctionalDependencySet {
74        self.input.functional_dependency().clone()
75    }
76}
77
78impl<PlanRef: GenericPlanRef> LocalityProvider<PlanRef> {
79    pub fn rewrite_exprs(&mut self, _r: &mut dyn ExprRewriter) {
80        }
82
83    pub fn visit_exprs(&self, _v: &mut dyn crate::expr::ExprVisitor) {
84        }
86}
87
88impl_distill_unit_from_fields!(LocalityProvider, GenericPlanRef);