risingwave_frontend/optimizer/plan_node/
logical_locality_provider.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16
17use super::generic::GenericPlanRef;
18use super::utils::impl_distill_by_unit;
19use super::{
20    BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject,
21    PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamExchange, StreamPlanRef, ToBatch,
22    ToStream, generic,
23};
24use crate::error::Result;
25use crate::expr::{ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::{
28    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
29};
30use crate::optimizer::property::RequiredDist;
31use crate::utils::{ColIndexMapping, Condition};
32
33/// `LogicalLocalityProvider` provides locality for operators during backfilling.
34/// It buffers input data into a state table using locality columns as primary key prefix.
35///
36/// The `LocalityProvider` has 2 states:
37/// - One is used to buffer data during backfilling and provide data locality.
38/// - The other one is a progress table like normal backfill operator to track the backfilling progress of itself.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct LogicalLocalityProvider {
41    pub base: PlanBase<Logical>,
42    core: generic::LocalityProvider<PlanRef>,
43}
44
45impl LogicalLocalityProvider {
46    pub fn new(input: PlanRef, locality_columns: Vec<usize>) -> Self {
47        assert!(!locality_columns.is_empty());
48        let core = generic::LocalityProvider::new(input, locality_columns);
49        let base = PlanBase::new_logical_with_core(&core);
50        LogicalLocalityProvider { base, core }
51    }
52
53    pub fn create(input: PlanRef, locality_columns: Vec<usize>) -> PlanRef {
54        LogicalLocalityProvider::new(input, locality_columns).into()
55    }
56
57    pub fn locality_columns(&self) -> &[usize] {
58        &self.core.locality_columns
59    }
60}
61
62impl PlanTreeNodeUnary<Logical> for LogicalLocalityProvider {
63    fn input(&self) -> PlanRef {
64        self.core.input.clone()
65    }
66
67    fn clone_with_input(&self, input: PlanRef) -> Self {
68        Self::new(input, self.locality_columns().to_vec())
69    }
70
71    fn rewrite_with_input(
72        &self,
73        input: PlanRef,
74        input_col_change: ColIndexMapping,
75    ) -> (Self, ColIndexMapping) {
76        let locality_columns = self
77            .locality_columns()
78            .iter()
79            .map(|&i| input_col_change.map(i))
80            .collect();
81
82        (Self::new(input, locality_columns), input_col_change)
83    }
84}
85
86impl_plan_tree_node_for_unary! { Logical, LogicalLocalityProvider}
87impl_distill_by_unit!(LogicalLocalityProvider, core, "LogicalLocalityProvider");
88
89impl ColPrunable for LogicalLocalityProvider {
90    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
91        // No pruning.
92        let input_required_cols = (0..self.input().schema().len()).collect_vec();
93        LogicalProject::with_out_col_idx(
94            self.clone_with_input(self.input().prune_col(&input_required_cols, ctx))
95                .into(),
96            required_cols.iter().cloned(),
97        )
98        .into()
99    }
100}
101
102impl PredicatePushdown for LogicalLocalityProvider {
103    fn predicate_pushdown(
104        &self,
105        predicate: Condition,
106        ctx: &mut PredicatePushdownContext,
107    ) -> PlanRef {
108        let new_input = self.input().predicate_pushdown(predicate, ctx);
109        let new_provider = self.clone_with_input(new_input);
110        new_provider.into()
111    }
112}
113
114impl ToBatch for LogicalLocalityProvider {
115    fn to_batch(&self) -> Result<BatchPlanRef> {
116        // LocalityProvider is a streaming-only operator
117        Err(crate::error::ErrorCode::NotSupported(
118            "LocalityProvider in batch mode".to_owned(),
119            "LocalityProvider is only supported in streaming mode for backfilling".to_owned(),
120        )
121        .into())
122    }
123}
124
125impl ToStream for LogicalLocalityProvider {
126    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
127        use super::StreamLocalityProvider;
128
129        let input = self.input().to_stream(ctx)?;
130        let required_dist =
131            RequiredDist::shard_by_key(self.input().schema().len(), self.locality_columns());
132        let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
133        let input = if input.as_stream_exchange().is_none() {
134            // Force a no shuffle exchange to ensure locality provider is in its own fragment.
135            // This is important to ensure the backfill ordering can recognize and build
136            // the dependency graph among different backfill-needed fragments.
137            StreamExchange::new_no_shuffle(input).into()
138        } else {
139            input
140        };
141        let stream_core = generic::LocalityProvider::new(input, self.locality_columns().to_vec());
142        Ok(StreamLocalityProvider::new(stream_core).into())
143    }
144
145    fn logical_rewrite_for_stream(
146        &self,
147        ctx: &mut RewriteStreamContext,
148    ) -> Result<(PlanRef, ColIndexMapping)> {
149        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
150        let (locality_provider, out_col_change) = self.rewrite_with_input(input, input_col_change);
151        Ok((locality_provider.into(), out_col_change))
152    }
153}
154
155impl ExprRewritable<Logical> for LogicalLocalityProvider {
156    fn has_rewritable_expr(&self) -> bool {
157        false
158    }
159
160    fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef {
161        self.clone().into()
162    }
163}
164
165impl ExprVisitable for LogicalLocalityProvider {
166    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {
167        // No expressions to visit
168    }
169}
170
171impl LogicalLocalityProvider {
172    pub fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
173        if columns == self.locality_columns() {
174            Some(self.clone().into())
175        } else if let Some(better_input) = self.input().try_better_locality(columns) {
176            Some(better_input)
177        } else {
178            Some(Self::new(self.input(), columns.to_owned()).into())
179        }
180    }
181}