risingwave_frontend/optimizer/plan_node/
logical_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Minimal imports for prototype
16
17use super::stream::StreamPlanNodeMetadata;
18use super::{
19    ColPrunable, ColumnPruningContext, ExprRewritable, ExprVisitable, Logical, LogicalFilter,
20    LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
21    PredicatePushdownContext, ToBatch, ToStream, ToStreamContext, generic,
22};
23use crate::binder::BoundFillStrategy;
24use crate::error::Result;
25use crate::expr::{ExprImpl, InputRef};
26use crate::optimizer::plan_node::utils::impl_distill_by_unit;
27use crate::utils::{ColIndexMapping, Condition};
28
29/// `LogicalGapFill` implements [`super::Logical`] to represent a gap-filling operation on a time
30/// series.
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct LogicalGapFill {
33    pub base: PlanBase<super::Logical>,
34    core: generic::GapFill<PlanRef>,
35}
36
37impl LogicalGapFill {
38    pub fn new(
39        input: PlanRef,
40        time_col: InputRef,
41        interval: ExprImpl,
42        fill_strategies: Vec<BoundFillStrategy>,
43    ) -> Self {
44        let core = generic::GapFill {
45            input,
46            time_col,
47            interval,
48            fill_strategies,
49        };
50        let base = PlanBase::new_logical_with_core(&core);
51        Self { base, core }
52    }
53
54    pub fn time_col(&self) -> &InputRef {
55        &self.core.time_col
56    }
57
58    pub fn interval(&self) -> &ExprImpl {
59        &self.core.interval
60    }
61
62    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
63        &self.core.fill_strategies
64    }
65}
66
67impl PlanTreeNodeUnary<Logical> for LogicalGapFill {
68    fn input(&self) -> PlanRef {
69        self.core.input.clone()
70    }
71
72    fn clone_with_input(&self, input: PlanRef) -> Self {
73        Self::new(
74            input,
75            self.time_col().clone(),
76            self.interval().clone(),
77            self.fill_strategies().to_vec(),
78        )
79    }
80}
81
82impl_plan_tree_node_for_unary! { Logical, LogicalGapFill }
83impl_distill_by_unit!(LogicalGapFill, core, "LogicalGapFill");
84
85impl ColPrunable for LogicalGapFill {
86    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
87        // For minimal prototype: simply pass through all columns without optimization
88        let new_input = self.input().prune_col(required_cols, ctx);
89        self.clone_with_input(new_input).into()
90    }
91}
92
93impl ExprRewritable<Logical> for LogicalGapFill {
94    fn has_rewritable_expr(&self) -> bool {
95        true
96    }
97
98    fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
99        let mut core = self.core.clone();
100        core.rewrite_exprs(r);
101        Self {
102            base: self.base.clone(),
103            core,
104        }
105        .into()
106    }
107}
108
109impl ExprVisitable for LogicalGapFill {
110    fn visit_exprs(&self, v: &mut dyn crate::expr::ExprVisitor) {
111        self.core.visit_exprs(v);
112    }
113}
114
115impl PredicatePushdown for LogicalGapFill {
116    fn predicate_pushdown(
117        &self,
118        predicate: Condition,
119        _ctx: &mut PredicatePushdownContext,
120    ) -> PlanRef {
121        LogicalFilter::create(self.clone().into(), predicate)
122    }
123}
124
125impl ToBatch for LogicalGapFill {
126    fn to_batch(&self) -> Result<super::BatchPlanRef> {
127        unimplemented!("batch gap fill")
128    }
129}
130
131impl ToStream for LogicalGapFill {
132    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<super::StreamPlanRef> {
133        use super::{StreamEowcGapFill, StreamGapFill};
134        use crate::error::ErrorCode;
135        use crate::optimizer::property::RequiredDist;
136
137        let stream_input = self.input().to_stream(ctx)?;
138
139        // GapFill (both normal and EOWC) always uses singleton distribution for correctness.
140        // It needs to see complete time series data to identify and fill gaps properly.
141        let new_input = RequiredDist::single().streaming_enforce_if_not_satisfies(stream_input)?;
142
143        // Common validation: time column must be the sole primary key for both modes
144        let time_col_idx = self.core.time_col.index();
145        let input_stream_key = new_input.expect_stream_key();
146        if !(input_stream_key.len() == 1 && input_stream_key[0] == time_col_idx) {
147            return Err(ErrorCode::NotSupported(
148                "GAP_FILL requires the time column to be the sole primary key.".to_owned(),
149                format!("The time column (index {}) must be the only primary key. Found stream key: {:?}", time_col_idx, input_stream_key),
150            )
151            .into());
152        }
153
154        let core = generic::GapFill {
155            input: new_input.clone(),
156            time_col: self.core.time_col.clone(),
157            interval: self.core.interval.clone(),
158            fill_strategies: self.core.fill_strategies.clone(),
159        };
160
161        if ctx.emit_on_window_close() {
162            // EOWC mode requires watermark on time column
163            let input_watermark_cols = new_input.watermark_columns();
164            if !input_watermark_cols.contains(time_col_idx) {
165                return Err(ErrorCode::NotSupported(
166                    "GAP_FILL with EMIT ON WINDOW CLOSE requires a watermark on the time column."
167                        .to_owned(),
168                    format!(
169                        "Please define a watermark on the time column (column index {}).",
170                        time_col_idx
171                    ),
172                )
173                .into());
174            }
175
176            Ok(StreamEowcGapFill::new(core).into())
177        } else {
178            Ok(StreamGapFill::new(core).into())
179        }
180    }
181
182    fn logical_rewrite_for_stream(
183        &self,
184        _ctx: &mut super::convert::RewriteStreamContext,
185    ) -> Result<(PlanRef, ColIndexMapping)> {
186        let (input, mut col_index_mapping) = self.input().logical_rewrite_for_stream(_ctx)?;
187        let mut new_core = self.core.clone();
188        new_core.input = input;
189
190        if col_index_mapping.is_identity() {
191            return Ok((
192                Self {
193                    base: self.base.clone_with_new_plan_id(),
194                    core: new_core,
195                }
196                .into(),
197                col_index_mapping,
198            ));
199        }
200
201        new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
202
203        Ok((
204            Self {
205                base: self.base.clone_with_new_plan_id(),
206                core: new_core,
207            }
208            .into(),
209            col_index_mapping,
210        ))
211    }
212}