risingwave_frontend/optimizer/plan_node/
stream_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::Field;
16use risingwave_common::types::DataType;
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::stream_node::NodeBody;
19
20use super::generic::GenericPlanNode;
21use super::utils::TableCatalogBuilder;
22use super::{
23    ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, TryToStreamPb,
24    generic,
25};
26use crate::binder::BoundFillStrategy;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
28use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
29use crate::optimizer::plan_node::utils::impl_distill_by_unit;
30use crate::optimizer::property::Distribution;
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34/// `StreamGapFill` implements [`super::Stream`] to represent a gap-filling operation on a time
35/// series in normal streaming mode (without EOWC semantics).
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamGapFill {
38    pub base: PlanBase<super::Stream>,
39    core: generic::GapFill<PlanRef<Stream>>,
40}
41
42impl StreamGapFill {
43    pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
44        let input = &core.input;
45
46        // Use singleton distribution for normal streaming GapFill.
47        // Similar to EOWC version, gap filling requires seeing all data
48        // to correctly identify and fill gaps across time series.
49        let base = PlanBase::new_stream_with_core(
50            &core,
51            Distribution::Single,
52            input.stream_kind(),
53            false, // does NOT provide EOWC semantics
54            input.watermark_columns().clone(),
55            input.columns_monotonicity().clone(),
56        );
57        Self { base, core }
58    }
59
60    // Legacy constructor for backward compatibility
61    pub fn new_with_args(
62        input: PlanRef<Stream>,
63        time_col: InputRef,
64        interval: ExprImpl,
65        fill_strategies: Vec<BoundFillStrategy>,
66    ) -> Self {
67        let core = generic::GapFill {
68            input,
69            time_col,
70            interval,
71            fill_strategies,
72        };
73        Self::new(core)
74    }
75
76    pub fn time_col(&self) -> &InputRef {
77        &self.core.time_col
78    }
79
80    pub fn interval(&self) -> &ExprImpl {
81        &self.core.interval
82    }
83
84    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
85        &self.core.fill_strategies
86    }
87
88    fn infer_state_table(&self) -> crate::TableCatalog {
89        let mut tbl_builder = TableCatalogBuilder::default();
90
91        let out_schema = self.core.schema();
92        for field in out_schema.fields() {
93            tbl_builder.add_column(field);
94        }
95
96        // For singleton distribution, use simplified primary key design:
97        // Just use time column as the primary key for ordering
98        let time_col_idx = self.time_col().index();
99        tbl_builder.add_order_column(time_col_idx, OrderType::ascending());
100
101        // Add is_filled flag column for gap fill state tracking
102        tbl_builder.add_column(&Field::with_name(DataType::Boolean, "is_filled"));
103        tbl_builder.build(vec![], 0)
104    }
105}
106
107impl PlanTreeNodeUnary<Stream> for StreamGapFill {
108    fn input(&self) -> PlanRef<Stream> {
109        self.core.input.clone()
110    }
111
112    fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
113        let mut core = self.core.clone();
114        core.input = input;
115        Self::new(core)
116    }
117}
118
119impl_plan_tree_node_for_unary! { Stream, StreamGapFill }
120impl_distill_by_unit!(StreamGapFill, core, "StreamGapFill");
121
122impl TryToStreamPb for StreamGapFill {
123    fn try_to_stream_prost_body(
124        &self,
125        state: &mut BuildFragmentGraphState,
126    ) -> SchedulerResult<NodeBody> {
127        use risingwave_pb::stream_plan::*;
128
129        let fill_strategies: Vec<String> = self
130            .fill_strategies()
131            .iter()
132            .map(|strategy| match strategy.strategy {
133                crate::binder::FillStrategy::Locf => "locf".to_owned(),
134                crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
135                crate::binder::FillStrategy::Null => "null".to_owned(),
136            })
137            .collect();
138
139        let state_table = self
140            .infer_state_table()
141            .with_id(state.gen_table_id_wrapped())
142            .to_internal_table_prost();
143
144        Ok(NodeBody::GapFill(Box::new(GapFillNode {
145            time_column_index: self.time_col().index() as u32,
146            interval: Some(self.interval().to_expr_proto_checked_pure(
147                self.stream_kind().is_retract(),
148                "gap filling interval",
149            )?),
150            fill_columns: self
151                .fill_strategies()
152                .iter()
153                .map(|strategy| strategy.target_col.index() as u32)
154                .collect(),
155            fill_strategies,
156            state_table: Some(state_table),
157        })))
158    }
159}
160
161impl ExprRewritable<Stream> for StreamGapFill {
162    fn has_rewritable_expr(&self) -> bool {
163        true
164    }
165
166    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
167        let mut core = self.core.clone();
168        core.rewrite_exprs(r);
169        Self {
170            base: self.base.clone_with_new_plan_id(),
171            core,
172        }
173        .into()
174    }
175}
176
177impl ExprVisitable for StreamGapFill {
178    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
179        self.core.visit_exprs(v)
180    }
181}