risingwave_frontend/optimizer/plan_node/
logical_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_expr::bail;
18
19use super::stream::StreamPlanNodeMetadata;
20use super::{
21    ColPrunable, ColumnPruningContext, ExprRewritable, ExprVisitable, Logical, LogicalFilter,
22    LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
23    PredicatePushdownContext, ToBatch, ToStream, ToStreamContext, generic,
24};
25use crate::binder::BoundFillStrategy;
26use crate::error::Result;
27use crate::expr::{ExprImpl, InputRef};
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::utils::{ColIndexMapping, Condition};
30
31/// `LogicalGapFill` implements [`super::Logical`] to represent a gap-filling operation on a time
32/// series.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalGapFill {
35    pub base: PlanBase<super::Logical>,
36    core: generic::GapFill<PlanRef>,
37}
38
39impl LogicalGapFill {
40    pub fn new(
41        input: PlanRef,
42        time_col: InputRef,
43        interval: ExprImpl,
44        fill_strategies: Vec<BoundFillStrategy>,
45    ) -> Self {
46        let core = generic::GapFill {
47            input,
48            time_col,
49            interval,
50            fill_strategies,
51        };
52        let base = PlanBase::new_logical_with_core(&core);
53        Self { base, core }
54    }
55
56    pub fn time_col(&self) -> &InputRef {
57        &self.core.time_col
58    }
59
60    pub fn interval(&self) -> &ExprImpl {
61        &self.core.interval
62    }
63
64    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
65        &self.core.fill_strategies
66    }
67}
68
69impl PlanTreeNodeUnary<Logical> for LogicalGapFill {
70    fn input(&self) -> PlanRef {
71        self.core.input.clone()
72    }
73
74    fn clone_with_input(&self, input: PlanRef) -> Self {
75        Self::new(
76            input,
77            self.time_col().clone(),
78            self.interval().clone(),
79            self.fill_strategies().to_vec(),
80        )
81    }
82}
83
84impl_plan_tree_node_for_unary! { Logical, LogicalGapFill }
85impl_distill_by_unit!(LogicalGapFill, core, "LogicalGapFill");
86
87impl ColPrunable for LogicalGapFill {
88    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
89        let input_col_num = self.input().schema().len();
90        let mut input_required = FixedBitSet::from_iter(required_cols.iter().copied());
91        input_required.insert(self.time_col().index());
92        input_required.union_with(&self.interval().collect_input_refs(input_col_num));
93        self.fill_strategies()
94            .iter()
95            .for_each(|s| input_required.insert(s.target_col.index()));
96
97        let input_required_cols: Vec<_> = input_required.ones().collect();
98        let mut col_index_mapping =
99            ColIndexMapping::with_remaining_columns(&input_required_cols, input_col_num);
100
101        let mut new_core = self.core.clone();
102        new_core.input = self.input().prune_col(&input_required_cols, ctx);
103        new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
104
105        let logical_gap_fill = Self {
106            base: PlanBase::new_logical_with_core(&new_core),
107            core: new_core,
108        }
109        .into();
110
111        if input_required_cols == required_cols {
112            logical_gap_fill
113        } else {
114            let output_required_cols = required_cols
115                .iter()
116                .map(|&idx| col_index_mapping.map(idx))
117                .collect_vec();
118            let src_size = logical_gap_fill.schema().len();
119            LogicalProject::with_mapping(
120                logical_gap_fill,
121                ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
122            )
123            .into()
124        }
125    }
126}
127
128impl ExprRewritable<Logical> for LogicalGapFill {
129    fn has_rewritable_expr(&self) -> bool {
130        true
131    }
132
133    fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
134        let mut core = self.core.clone();
135        core.rewrite_exprs(r);
136        Self {
137            base: self.base.clone(),
138            core,
139        }
140        .into()
141    }
142}
143
144impl ExprVisitable for LogicalGapFill {
145    fn visit_exprs(&self, v: &mut dyn crate::expr::ExprVisitor) {
146        self.core.visit_exprs(v);
147    }
148}
149
150impl PredicatePushdown for LogicalGapFill {
151    fn predicate_pushdown(
152        &self,
153        predicate: Condition,
154        _ctx: &mut PredicatePushdownContext,
155    ) -> PlanRef {
156        LogicalFilter::create(self.clone().into(), predicate)
157    }
158}
159
160impl ToBatch for LogicalGapFill {
161    fn to_batch(&self) -> Result<super::BatchPlanRef> {
162        bail!("BatchGapFill is not implemented yet")
163    }
164}
165
166impl ToStream for LogicalGapFill {
167    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<super::StreamPlanRef> {
168        use super::{StreamEowcGapFill, StreamGapFill};
169        use crate::error::ErrorCode;
170        use crate::optimizer::property::RequiredDist;
171
172        let stream_input = self.input().to_stream(ctx)?;
173
174        // GapFill (both normal and EOWC) always uses singleton distribution for correctness.
175        // It needs to see complete time series data to identify and fill gaps properly.
176        let new_input = RequiredDist::single().streaming_enforce_if_not_satisfies(stream_input)?;
177
178        // Common validation: time column must be the sole primary key for both modes
179        let time_col_idx = self.core.time_col.index();
180        let input_stream_key = new_input.expect_stream_key();
181        if !(input_stream_key.len() == 1 && input_stream_key[0] == time_col_idx) {
182            return Err(ErrorCode::NotSupported(
183                "GAP_FILL requires the time column to be the sole primary key.".to_owned(),
184                format!("The time column (index {}) must be the only primary key. Found stream key: {:?}", time_col_idx, input_stream_key),
185            )
186            .into());
187        }
188
189        let core = generic::GapFill {
190            input: new_input.clone(),
191            time_col: self.core.time_col.clone(),
192            interval: self.core.interval.clone(),
193            fill_strategies: self.core.fill_strategies.clone(),
194        };
195
196        if ctx.emit_on_window_close() {
197            // EOWC mode requires watermark on time column
198            let input_watermark_cols = new_input.watermark_columns();
199            if !input_watermark_cols.contains(time_col_idx) {
200                return Err(ErrorCode::NotSupported(
201                    "GAP_FILL with EMIT ON WINDOW CLOSE requires a watermark on the time column."
202                        .to_owned(),
203                    format!(
204                        "Please define a watermark on the time column (column index {}).",
205                        time_col_idx
206                    ),
207                )
208                .into());
209            }
210
211            Ok(StreamEowcGapFill::new(core).into())
212        } else {
213            Ok(StreamGapFill::new(core).into())
214        }
215    }
216
217    fn logical_rewrite_for_stream(
218        &self,
219        _ctx: &mut super::convert::RewriteStreamContext,
220    ) -> Result<(PlanRef, ColIndexMapping)> {
221        let (input, mut col_index_mapping) = self.input().logical_rewrite_for_stream(_ctx)?;
222        let mut new_core = self.core.clone();
223        new_core.input = input;
224
225        if col_index_mapping.is_identity() {
226            return Ok((
227                Self {
228                    base: self.base.clone_with_new_plan_id(),
229                    core: new_core,
230                }
231                .into(),
232                col_index_mapping,
233            ));
234        }
235
236        new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
237
238        Ok((
239            Self {
240                base: PlanBase::new_logical_with_core(&new_core),
241                core: new_core,
242            }
243            .into(),
244            col_index_mapping,
245        ))
246    }
247}