risingwave_frontend/optimizer/plan_node/
logical_locality_provider.rs1use itertools::Itertools;
16
17use super::generic::GenericPlanRef;
18use super::utils::impl_distill_by_unit;
19use super::{
20 BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject,
21 PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamExchange, StreamPlanRef, ToBatch,
22 ToStream, generic,
23};
24use crate::error::Result;
25use crate::expr::{ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::{
28 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
29};
30use crate::optimizer::property::RequiredDist;
31use crate::utils::{ColIndexMapping, Condition};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct LogicalLocalityProvider {
41 pub base: PlanBase<Logical>,
42 core: generic::LocalityProvider<PlanRef>,
43}
44
45impl LogicalLocalityProvider {
46 pub fn new(input: PlanRef, locality_columns: Vec<usize>) -> Self {
47 assert!(!locality_columns.is_empty());
48 let core = generic::LocalityProvider::new(input, locality_columns);
49 let base = PlanBase::new_logical_with_core(&core);
50 LogicalLocalityProvider { base, core }
51 }
52
53 pub fn create(input: PlanRef, locality_columns: Vec<usize>) -> PlanRef {
54 LogicalLocalityProvider::new(input, locality_columns).into()
55 }
56
57 pub fn locality_columns(&self) -> &[usize] {
58 &self.core.locality_columns
59 }
60}
61
62impl PlanTreeNodeUnary<Logical> for LogicalLocalityProvider {
63 fn input(&self) -> PlanRef {
64 self.core.input.clone()
65 }
66
67 fn clone_with_input(&self, input: PlanRef) -> Self {
68 Self::new(input, self.locality_columns().to_vec())
69 }
70
71 fn rewrite_with_input(
72 &self,
73 input: PlanRef,
74 input_col_change: ColIndexMapping,
75 ) -> (Self, ColIndexMapping) {
76 let locality_columns = self
77 .locality_columns()
78 .iter()
79 .map(|&i| input_col_change.map(i))
80 .collect();
81
82 (Self::new(input, locality_columns), input_col_change)
83 }
84}
85
86impl_plan_tree_node_for_unary! { Logical, LogicalLocalityProvider}
87impl_distill_by_unit!(LogicalLocalityProvider, core, "LogicalLocalityProvider");
88
89impl ColPrunable for LogicalLocalityProvider {
90 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
91 let input_required_cols = (0..self.input().schema().len()).collect_vec();
93 LogicalProject::with_out_col_idx(
94 self.clone_with_input(self.input().prune_col(&input_required_cols, ctx))
95 .into(),
96 required_cols.iter().cloned(),
97 )
98 .into()
99 }
100}
101
102impl PredicatePushdown for LogicalLocalityProvider {
103 fn predicate_pushdown(
104 &self,
105 predicate: Condition,
106 ctx: &mut PredicatePushdownContext,
107 ) -> PlanRef {
108 let new_input = self.input().predicate_pushdown(predicate, ctx);
109 let new_provider = self.clone_with_input(new_input);
110 new_provider.into()
111 }
112}
113
114impl ToBatch for LogicalLocalityProvider {
115 fn to_batch(&self) -> Result<BatchPlanRef> {
116 Err(crate::error::ErrorCode::NotSupported(
118 "LocalityProvider in batch mode".to_owned(),
119 "LocalityProvider is only supported in streaming mode for backfilling".to_owned(),
120 )
121 .into())
122 }
123}
124
125impl ToStream for LogicalLocalityProvider {
126 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
127 use super::StreamLocalityProvider;
128
129 let input = self.input().to_stream(ctx)?;
130 let required_dist =
131 RequiredDist::shard_by_key(self.input().schema().len(), self.locality_columns());
132 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
133 let input = if input.as_stream_exchange().is_none() {
134 StreamExchange::new_no_shuffle(input).into()
138 } else {
139 input
140 };
141 let stream_core = generic::LocalityProvider::new(input, self.locality_columns().to_vec());
142 Ok(StreamLocalityProvider::new(stream_core).into())
143 }
144
145 fn logical_rewrite_for_stream(
146 &self,
147 ctx: &mut RewriteStreamContext,
148 ) -> Result<(PlanRef, ColIndexMapping)> {
149 let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
150 let (locality_provider, out_col_change) = self.rewrite_with_input(input, input_col_change);
151 Ok((locality_provider.into(), out_col_change))
152 }
153}
154
155impl ExprRewritable<Logical> for LogicalLocalityProvider {
156 fn has_rewritable_expr(&self) -> bool {
157 false
158 }
159
160 fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef {
161 self.clone().into()
162 }
163}
164
165impl ExprVisitable for LogicalLocalityProvider {
166 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {
167 }
169}
170
171impl LogicalLocalityProvider {
172 pub fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
173 if columns == self.locality_columns() {
174 Some(self.clone().into())
175 } else if let Some(better_input) = self.input().try_better_locality(columns) {
176 Some(better_input)
177 } else {
178 Some(Self::new(self.input(), columns.to_owned()).into())
179 }
180 }
181}