risingwave_frontend/optimizer/plan_node/generic/
locality_provider.rs1use pretty_xmlish::Pretty;
16use risingwave_common::catalog::{FieldDisplay, Schema};
17
18use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
19use crate::expr::ExprRewriter;
20use crate::optimizer::optimizer_context::OptimizerContextRef;
21use crate::optimizer::property::FunctionalDependencySet;
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub struct LocalityProvider<PlanRef> {
27 pub input: PlanRef,
28 pub locality_columns: Vec<usize>,
30}
31
32impl<PlanRef: GenericPlanRef> LocalityProvider<PlanRef> {
33 pub fn new(input: PlanRef, locality_columns: Vec<usize>) -> Self {
34 Self {
35 input,
36 locality_columns,
37 }
38 }
39
40 pub fn fields_pretty<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> {
41 let locality_columns_display = self
42 .locality_columns
43 .iter()
44 .map(|&i| Pretty::display(&FieldDisplay(self.input.schema().fields.get(i).unwrap())))
45 .collect();
46 vec![("locality_columns", Pretty::Array(locality_columns_display))]
47 }
48}
49
50impl<PlanRef: GenericPlanRef> GenericPlanNode for LocalityProvider<PlanRef> {
51 fn schema(&self) -> Schema {
52 self.input.schema().clone()
53 }
54
55 fn stream_key(&self) -> Option<Vec<usize>> {
56 let mut stream_key = self.locality_columns.clone();
57 let input_stream_key = self.input.stream_key()?;
58 for col in input_stream_key {
59 if !stream_key.contains(col) {
60 stream_key.push(*col);
61 }
62 }
63 Some(stream_key)
64 }
65
66 fn ctx(&self) -> OptimizerContextRef {
67 self.input.ctx()
68 }
69
70 fn functional_dependency(&self) -> FunctionalDependencySet {
71 self.input.functional_dependency().clone()
72 }
73}
74
75impl<PlanRef: GenericPlanRef> LocalityProvider<PlanRef> {
76 pub fn rewrite_exprs(&mut self, _r: &mut dyn ExprRewriter) {
77 }
79
80 pub fn visit_exprs(&self, _v: &mut dyn crate::expr::ExprVisitor) {
81 }
83}
84
85impl_distill_unit_from_fields!(LocalityProvider, GenericPlanRef);