Skip to main content

risingwave_frontend/optimizer/plan_node/
logical_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_expr::bail;
18
19use super::stream::StreamPlanNodeMetadata;
20use super::{
21    ColPrunable, ColumnPruningContext, ExprRewritable, ExprVisitable, Logical, LogicalFilter,
22    LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
23    PredicatePushdownContext, ToBatch, ToStream, ToStreamContext, generic,
24};
25use crate::binder::BoundFillStrategy;
26use crate::error::Result;
27use crate::expr::{ExprImpl, InputRef};
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::utils::{ColIndexMapping, Condition};
30
31/// `LogicalGapFill` implements [`super::Logical`] to represent a gap-filling operation on a time
32/// series.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalGapFill {
35    pub base: PlanBase<super::Logical>,
36    core: generic::GapFill<PlanRef>,
37}
38
39impl LogicalGapFill {
40    pub fn new(
41        input: PlanRef,
42        time_col: InputRef,
43        interval: ExprImpl,
44        fill_strategies: Vec<BoundFillStrategy>,
45        partition_by_cols: Vec<InputRef>,
46    ) -> Self {
47        let core = generic::GapFill {
48            input,
49            time_col,
50            interval,
51            fill_strategies,
52            partition_by_cols,
53        };
54        let base = PlanBase::new_logical_with_core(&core);
55        Self { base, core }
56    }
57
58    pub fn time_col(&self) -> &InputRef {
59        &self.core.time_col
60    }
61
62    pub fn interval(&self) -> &ExprImpl {
63        &self.core.interval
64    }
65
66    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
67        &self.core.fill_strategies
68    }
69
70    pub fn partition_by_cols(&self) -> &[InputRef] {
71        &self.core.partition_by_cols
72    }
73}
74
75impl PlanTreeNodeUnary<Logical> for LogicalGapFill {
76    fn input(&self) -> PlanRef {
77        self.core.input.clone()
78    }
79
80    fn clone_with_input(&self, input: PlanRef) -> Self {
81        Self::new(
82            input,
83            self.time_col().clone(),
84            self.interval().clone(),
85            self.fill_strategies().to_vec(),
86            self.partition_by_cols().to_vec(),
87        )
88    }
89}
90
91impl_plan_tree_node_for_unary! { Logical, LogicalGapFill }
92impl_distill_by_unit!(LogicalGapFill, core, "LogicalGapFill");
93
94impl ColPrunable for LogicalGapFill {
95    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
96        let input_col_num = self.input().schema().len();
97        let mut input_required = FixedBitSet::with_capacity(input_col_num);
98        for &required_col in required_cols {
99            input_required.insert(required_col);
100        }
101        input_required.insert(self.time_col().index());
102        input_required.union_with(&self.interval().collect_input_refs(input_col_num));
103        self.fill_strategies()
104            .iter()
105            .for_each(|s| input_required.insert(s.target_col.index()));
106        self.partition_by_cols()
107            .iter()
108            .for_each(|c| input_required.insert(c.index()));
109
110        let input_required_cols: Vec<_> = input_required.ones().collect();
111        let mut col_index_mapping =
112            ColIndexMapping::with_remaining_columns(&input_required_cols, input_col_num);
113
114        let mut new_core = self.core.clone();
115        new_core.input = self.input().prune_col(&input_required_cols, ctx);
116        new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
117
118        let logical_gap_fill = Self {
119            base: PlanBase::new_logical_with_core(&new_core),
120            core: new_core,
121        }
122        .into();
123
124        if input_required_cols == required_cols {
125            logical_gap_fill
126        } else {
127            let output_required_cols = required_cols
128                .iter()
129                .map(|&idx| col_index_mapping.map(idx))
130                .collect_vec();
131            let src_size = logical_gap_fill.schema().len();
132            LogicalProject::with_mapping(
133                logical_gap_fill,
134                ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
135            )
136            .into()
137        }
138    }
139}
140
141impl ExprRewritable<Logical> for LogicalGapFill {
142    fn has_rewritable_expr(&self) -> bool {
143        true
144    }
145
146    fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
147        let mut core = self.core.clone();
148        core.rewrite_exprs(r);
149        Self {
150            base: self.base.clone(),
151            core,
152        }
153        .into()
154    }
155}
156
157impl ExprVisitable for LogicalGapFill {
158    fn visit_exprs(&self, v: &mut dyn crate::expr::ExprVisitor) {
159        self.core.visit_exprs(v);
160    }
161}
162
163impl PredicatePushdown for LogicalGapFill {
164    fn predicate_pushdown(
165        &self,
166        predicate: Condition,
167        _ctx: &mut PredicatePushdownContext,
168    ) -> PlanRef {
169        LogicalFilter::create(self.clone().into(), predicate)
170    }
171}
172
173impl ToBatch for LogicalGapFill {
174    fn to_batch(&self) -> Result<super::BatchPlanRef> {
175        bail!("BatchGapFill is not implemented yet")
176    }
177}
178
179impl ToStream for LogicalGapFill {
180    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<super::StreamPlanRef> {
181        use super::{StreamEowcGapFill, StreamEowcSort, StreamGapFill};
182        use crate::error::ErrorCode;
183        use crate::optimizer::property::RequiredDist;
184
185        let stream_input = self.input().to_stream(ctx)?;
186        let partition_cols = &self.core.partition_by_cols;
187
188        // Choose distribution based on partition_by_cols
189        let new_input = if partition_cols.is_empty() {
190            // No partition: singleton distribution (backward compatible)
191            RequiredDist::single().streaming_enforce_if_not_satisfies(stream_input)?
192        } else {
193            // With partition: hash distribute by partition columns
194            let partition_indices: Vec<usize> = partition_cols.iter().map(|c| c.index()).collect();
195            RequiredDist::shard_by_key(stream_input.schema().len(), &partition_indices)
196                .streaming_enforce_if_not_satisfies(stream_input)?
197        };
198
199        // No stream key validation - gap fill manages its own state internally
200        // using (partition_cols, time_col, upstream_stream_key) as state table PK.
201
202        if ctx.emit_on_window_close() {
203            // EOWC mode requires watermark on time column
204            let time_col_idx = self.core.time_col.index();
205            let input_watermark_cols = new_input.watermark_columns();
206            if !input_watermark_cols.contains(time_col_idx) {
207                return Err(ErrorCode::NotSupported(
208                    "GAP_FILL with EMIT ON WINDOW CLOSE requires a watermark on the time column."
209                        .to_owned(),
210                    format!(
211                        "Please define a watermark on the time column (column index {}).",
212                        time_col_idx
213                    ),
214                )
215                .into());
216            }
217
218            let sorted_input = StreamEowcSort::new(new_input, time_col_idx).into();
219            let core = generic::GapFill {
220                input: sorted_input,
221                time_col: self.core.time_col.clone(),
222                interval: self.core.interval.clone(),
223                fill_strategies: self.core.fill_strategies.clone(),
224                partition_by_cols: self.core.partition_by_cols.clone(),
225            };
226            Ok(StreamEowcGapFill::new(core).into())
227        } else {
228            let core = generic::GapFill {
229                input: new_input,
230                time_col: self.core.time_col.clone(),
231                interval: self.core.interval.clone(),
232                fill_strategies: self.core.fill_strategies.clone(),
233                partition_by_cols: self.core.partition_by_cols.clone(),
234            };
235            Ok(StreamGapFill::new(core).into())
236        }
237    }
238
239    fn logical_rewrite_for_stream(
240        &self,
241        _ctx: &mut super::convert::RewriteStreamContext,
242    ) -> Result<(PlanRef, ColIndexMapping)> {
243        let (input, mut col_index_mapping) = self.input().logical_rewrite_for_stream(_ctx)?;
244        let mut new_core = self.core.clone();
245        new_core.input = input;
246
247        if col_index_mapping.is_identity() {
248            return Ok((
249                Self {
250                    base: PlanBase::new_logical_with_core(&new_core),
251                    core: new_core,
252                }
253                .into(),
254                col_index_mapping,
255            ));
256        }
257
258        new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
259
260        Ok((
261            Self {
262                base: PlanBase::new_logical_with_core(&new_core),
263                core: new_core,
264            }
265            .into(),
266            col_index_mapping,
267        ))
268    }
269}