risingwave_frontend/optimizer/plan_node/
logical_locality_provider.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16
17use super::generic::GenericPlanRef;
18use super::utils::impl_distill_by_unit;
19use super::{
20    BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject,
21    PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamExchange, StreamPlanRef, ToBatch,
22    ToStream, generic,
23};
24use crate::error::Result;
25use crate::expr::{ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::{
28    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
29};
30use crate::optimizer::property::RequiredDist;
31use crate::utils::{ColIndexMapping, Condition};
32
33/// `LogicalLocalityProvider` provides locality for operators during backfilling.
34/// It buffers input data into a state table using locality columns as primary key prefix.
35///
36/// The `LocalityProvider` has 2 states:
37/// - One is used to buffer data during backfilling and provide data locality.
38/// - The other one is a progress table like normal backfill operator to track the backfilling progress of itself.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct LogicalLocalityProvider {
41    pub base: PlanBase<Logical>,
42    core: generic::LocalityProvider<PlanRef>,
43}
44
45impl LogicalLocalityProvider {
46    pub fn new(input: PlanRef, locality_columns: Vec<usize>) -> Self {
47        assert!(!locality_columns.is_empty());
48        let core = generic::LocalityProvider::new(input, locality_columns);
49        let base = PlanBase::new_logical_with_core(&core);
50        LogicalLocalityProvider { base, core }
51    }
52
53    pub fn create(input: PlanRef, locality_columns: Vec<usize>) -> PlanRef {
54        LogicalLocalityProvider::new(input, locality_columns).into()
55    }
56
57    pub fn locality_columns(&self) -> &[usize] {
58        &self.core.locality_columns
59    }
60}
61
62impl PlanTreeNodeUnary<Logical> for LogicalLocalityProvider {
63    fn input(&self) -> PlanRef {
64        self.core.input.clone()
65    }
66
67    fn clone_with_input(&self, input: PlanRef) -> Self {
68        Self::new(input, self.locality_columns().to_vec())
69    }
70
71    fn rewrite_with_input(
72        &self,
73        input: PlanRef,
74        input_col_change: ColIndexMapping,
75    ) -> (Self, ColIndexMapping) {
76        let locality_columns = self
77            .locality_columns()
78            .iter()
79            .map(|&i| input_col_change.map(i))
80            .collect();
81
82        (Self::new(input, locality_columns), input_col_change)
83    }
84}
85
86impl_plan_tree_node_for_unary! { Logical, LogicalLocalityProvider}
87impl_distill_by_unit!(LogicalLocalityProvider, core, "LogicalLocalityProvider");
88
89impl ColPrunable for LogicalLocalityProvider {
90    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
91        // No pruning.
92        let input_required_cols = (0..self.input().schema().len()).collect_vec();
93        LogicalProject::with_out_col_idx(
94            self.clone_with_input(self.input().prune_col(&input_required_cols, ctx))
95                .into(),
96            required_cols.iter().cloned(),
97        )
98        .into()
99    }
100}
101
102impl PredicatePushdown for LogicalLocalityProvider {
103    fn predicate_pushdown(
104        &self,
105        predicate: Condition,
106        ctx: &mut PredicatePushdownContext,
107    ) -> PlanRef {
108        let new_input = self.input().predicate_pushdown(predicate, ctx);
109        let new_provider = self.clone_with_input(new_input);
110        new_provider.into()
111    }
112}
113
114impl ToBatch for LogicalLocalityProvider {
115    fn to_batch(&self) -> Result<BatchPlanRef> {
116        // LocalityProvider is a streaming-only operator
117        Err(crate::error::ErrorCode::NotSupported(
118            "LocalityProvider in batch mode".to_owned(),
119            "LocalityProvider is only supported in streaming mode for backfilling".to_owned(),
120        )
121        .into())
122    }
123}
124
125impl ToStream for LogicalLocalityProvider {
126    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
127        use super::StreamLocalityProvider;
128
129        let input = self.input().to_stream(ctx)?;
130        // Use `shard_by_exact_key` instead of `shard_by_key`, because locality provider will change the `stream_key` to include locality columns.
131        // If we use `shard_by_key`, it is possible that the locality columns are (`a`, `b`), but input stream key is only (`a`).
132        // In this case, `shard_by_key` will only shuffle by `a`. once `b` is changed, we will meet  U- and U+ should have same stream key error.
133        // Though we can let locality provider stream key to include its distribution columns only to fix the error,
134        // using `shard_by_exact_key` for another reason that it can provide better locality by `shard_by_key`.
135        // For example, if locality columns are (`a`, `b`), and we use `shard_by_key` with key (`b`),
136        // then all data with same `a` but different b will be shuffled to different nodes, which hurts locality.
137        let required_dist =
138            RequiredDist::shard_by_exact_key(self.input().schema().len(), self.locality_columns());
139        let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
140        let input = if input.as_stream_exchange().is_none() {
141            // Force a no shuffle exchange to ensure locality provider is in its own fragment.
142            // This is important to ensure the backfill ordering can recognize and build
143            // the dependency graph among different backfill-needed fragments.
144            StreamExchange::new_no_shuffle(input).into()
145        } else {
146            input
147        };
148        let stream_core = generic::LocalityProvider::new(input, self.locality_columns().to_vec());
149        Ok(StreamLocalityProvider::new(stream_core).into())
150    }
151
152    fn logical_rewrite_for_stream(
153        &self,
154        ctx: &mut RewriteStreamContext,
155    ) -> Result<(PlanRef, ColIndexMapping)> {
156        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
157        let (locality_provider, out_col_change) = self.rewrite_with_input(input, input_col_change);
158        Ok((locality_provider.into(), out_col_change))
159    }
160}
161
162impl ExprRewritable<Logical> for LogicalLocalityProvider {
163    fn has_rewritable_expr(&self) -> bool {
164        false
165    }
166
167    fn rewrite_exprs(&self, _r: &mut dyn ExprRewriter) -> PlanRef {
168        self.clone().into()
169    }
170}
171
172impl ExprVisitable for LogicalLocalityProvider {
173    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {
174        // No expressions to visit
175    }
176}
177
178impl LogicalLocalityProvider {
179    pub fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
180        if columns == self.locality_columns() {
181            Some(self.clone().into())
182        } else if let Some(better_input) = self.input().try_better_locality(columns) {
183            Some(better_input)
184        } else {
185            Some(Self::new(self.input(), columns.to_owned()).into())
186        }
187    }
188}