risingwave_frontend/optimizer/plan_node/
stream_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::Field;
16use risingwave_common::types::DataType;
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::stream_node::NodeBody;
19
20use super::generic::GenericPlanNode;
21use super::utils::TableCatalogBuilder;
22use super::{
23    ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, StreamNode,
24    generic,
25};
26use crate::binder::BoundFillStrategy;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
28use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
29use crate::optimizer::plan_node::utils::impl_distill_by_unit;
30use crate::optimizer::property::Distribution;
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33/// `StreamGapFill` implements [`super::Stream`] to represent a gap-filling operation on a time
34/// series in normal streaming mode (without EOWC semantics).
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamGapFill {
37    pub base: PlanBase<super::Stream>,
38    core: generic::GapFill<PlanRef<Stream>>,
39}
40
41impl StreamGapFill {
42    pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
43        let input = &core.input;
44
45        // Use singleton distribution for normal streaming GapFill.
46        // Similar to EOWC version, gap filling requires seeing all data
47        // to correctly identify and fill gaps across time series.
48        let base = PlanBase::new_stream_with_core(
49            &core,
50            Distribution::Single,
51            input.stream_kind(),
52            false, // does NOT provide EOWC semantics
53            input.watermark_columns().clone(),
54            input.columns_monotonicity().clone(),
55        );
56        Self { base, core }
57    }
58
59    // Legacy constructor for backward compatibility
60    pub fn new_with_args(
61        input: PlanRef<Stream>,
62        time_col: InputRef,
63        interval: ExprImpl,
64        fill_strategies: Vec<BoundFillStrategy>,
65    ) -> Self {
66        let core = generic::GapFill {
67            input,
68            time_col,
69            interval,
70            fill_strategies,
71        };
72        Self::new(core)
73    }
74
75    pub fn time_col(&self) -> &InputRef {
76        &self.core.time_col
77    }
78
79    pub fn interval(&self) -> &ExprImpl {
80        &self.core.interval
81    }
82
83    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
84        &self.core.fill_strategies
85    }
86
87    fn infer_state_table(&self) -> crate::TableCatalog {
88        let mut tbl_builder = TableCatalogBuilder::default();
89
90        let out_schema = self.core.schema();
91        for field in out_schema.fields() {
92            tbl_builder.add_column(field);
93        }
94
95        // For singleton distribution, use simplified primary key design:
96        // Just use time column as the primary key for ordering
97        let time_col_idx = self.time_col().index();
98        tbl_builder.add_order_column(time_col_idx, OrderType::ascending());
99
100        // Add is_filled flag column for gap fill state tracking
101        tbl_builder.add_column(&Field::with_name(DataType::Boolean, "is_filled"));
102        tbl_builder.build(vec![], 0)
103    }
104}
105
106impl PlanTreeNodeUnary<Stream> for StreamGapFill {
107    fn input(&self) -> PlanRef<Stream> {
108        self.core.input.clone()
109    }
110
111    fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
112        let mut core = self.core.clone();
113        core.input = input;
114        Self::new(core)
115    }
116}
117
118impl_plan_tree_node_for_unary! { Stream, StreamGapFill }
119impl_distill_by_unit!(StreamGapFill, core, "StreamGapFill");
120
121impl StreamNode for StreamGapFill {
122    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
123        use risingwave_pb::stream_plan::*;
124
125        let fill_strategies: Vec<String> = self
126            .fill_strategies()
127            .iter()
128            .map(|strategy| match strategy.strategy {
129                crate::binder::FillStrategy::Locf => "locf".to_owned(),
130                crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
131                crate::binder::FillStrategy::Null => "null".to_owned(),
132            })
133            .collect();
134
135        let state_table = self
136            .infer_state_table()
137            .with_id(state.gen_table_id_wrapped())
138            .to_internal_table_prost();
139
140        NodeBody::GapFill(Box::new(GapFillNode {
141            time_column_index: self.time_col().index() as u32,
142            interval: Some(self.interval().to_expr_proto()),
143            fill_columns: self
144                .fill_strategies()
145                .iter()
146                .map(|strategy| strategy.target_col.index() as u32)
147                .collect(),
148            fill_strategies,
149            state_table: Some(state_table),
150        }))
151    }
152}
153
154impl ExprRewritable<Stream> for StreamGapFill {
155    fn has_rewritable_expr(&self) -> bool {
156        true
157    }
158
159    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
160        let mut core = self.core.clone();
161        core.rewrite_exprs(r);
162        Self {
163            base: self.base.clone_with_new_plan_id(),
164            core,
165        }
166        .into()
167    }
168}
169
170impl ExprVisitable for StreamGapFill {
171    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
172        self.core.visit_exprs(v)
173    }
174}