risingwave_frontend/optimizer/plan_node/
stream_eowc_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::sort_util::OrderType;
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17
18use super::generic::GenericPlanNode;
19use super::utils::TableCatalogBuilder;
20use super::{
21    ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, StreamNode,
22    generic,
23};
24use crate::TableCatalog;
25use crate::binder::BoundFillStrategy;
26use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
27use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::optimizer::property::Distribution;
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32/// `StreamEowcGapFill` implements [`super::Stream`] to represent a gap-filling operation on a time
33/// series in streaming mode.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct StreamEowcGapFill {
36    pub base: PlanBase<super::Stream>,
37    core: generic::GapFill<PlanRef<Stream>>,
38}
39
40impl StreamEowcGapFill {
41    pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
42        let input = &core.input;
43
44        // Force singleton distribution for GapFill operations.
45        // GapFill requires access to all data across time ranges to correctly identify and fill gaps, so that missing intervals can be detected and filled appropriately.
46        let base = PlanBase::new_stream_with_core(
47            &core,
48            Distribution::Single,
49            input.stream_kind(),
50            true, // provides EOWC semantics
51            input.watermark_columns().clone(),
52            input.columns_monotonicity().clone(),
53        );
54        Self { base, core }
55    }
56
57    // Legacy constructor for backward compatibility
58    pub fn new_with_args(
59        input: PlanRef<Stream>,
60        time_col: InputRef,
61        interval: ExprImpl,
62        fill_strategies: Vec<BoundFillStrategy>,
63    ) -> Self {
64        let core = generic::GapFill {
65            input,
66            time_col,
67            interval,
68            fill_strategies,
69        };
70        Self::new(core)
71    }
72
73    pub fn time_col(&self) -> &InputRef {
74        &self.core.time_col
75    }
76
77    pub fn interval(&self) -> &ExprImpl {
78        &self.core.interval
79    }
80
81    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
82        &self.core.fill_strategies
83    }
84
85    fn infer_buffer_table(&self) -> TableCatalog {
86        let mut tbl_builder = TableCatalogBuilder::default();
87
88        let out_schema = self.core.schema();
89        for field in out_schema.fields() {
90            tbl_builder.add_column(field);
91        }
92
93        // Just use time column as the primary key since SortBuffer requires it for ordering
94        let time_col_idx = self.time_col().index();
95        tbl_builder.add_order_column(time_col_idx, OrderType::ascending());
96
97        tbl_builder.build(vec![], 0)
98    }
99
100    fn infer_prev_row_table(&self) -> TableCatalog {
101        let mut tbl_builder = TableCatalogBuilder::default();
102
103        for field in self.core.schema().fields() {
104            tbl_builder.add_column(field);
105        }
106
107        if !self.core.schema().fields().is_empty() {
108            tbl_builder.add_order_column(0, OrderType::ascending());
109        }
110
111        tbl_builder.build(vec![], 0)
112    }
113}
114
115impl PlanTreeNodeUnary<Stream> for StreamEowcGapFill {
116    fn input(&self) -> PlanRef<Stream> {
117        self.core.input.clone()
118    }
119
120    fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
121        let mut core = self.core.clone();
122        core.input = input;
123        Self::new(core)
124    }
125}
126
127impl_plan_tree_node_for_unary! { Stream, StreamEowcGapFill }
128impl_distill_by_unit!(StreamEowcGapFill, core, "StreamEowcGapFill");
129
130impl StreamNode for StreamEowcGapFill {
131    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
132        use risingwave_pb::stream_plan::*;
133
134        let fill_strategies: Vec<String> = self
135            .fill_strategies()
136            .iter()
137            .map(|strategy| match strategy.strategy {
138                crate::binder::FillStrategy::Locf => "locf".to_owned(),
139                crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
140                crate::binder::FillStrategy::Null => "null".to_owned(),
141            })
142            .collect();
143
144        let buffer_table = self
145            .infer_buffer_table()
146            .with_id(state.gen_table_id_wrapped())
147            .to_internal_table_prost();
148
149        let prev_row_table = self
150            .infer_prev_row_table()
151            .with_id(state.gen_table_id_wrapped())
152            .to_internal_table_prost();
153
154        NodeBody::EowcGapFill(Box::new(EowcGapFillNode {
155            time_column_index: self.time_col().index() as u32,
156            interval: Some(self.interval().to_expr_proto()),
157            fill_columns: self
158                .fill_strategies()
159                .iter()
160                .map(|strategy| strategy.target_col.index() as u32)
161                .collect(),
162            fill_strategies,
163            buffer_table: Some(buffer_table),
164            prev_row_table: Some(prev_row_table),
165        }))
166    }
167}
168
169impl ExprRewritable<Stream> for StreamEowcGapFill {
170    fn has_rewritable_expr(&self) -> bool {
171        true
172    }
173
174    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
175        let mut core = self.core.clone();
176        core.rewrite_exprs(r);
177        Self {
178            base: self.base.clone_with_new_plan_id(),
179            core,
180        }
181        .into()
182    }
183}
184
185impl ExprVisitable for StreamEowcGapFill {
186    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
187        self.core.visit_exprs(v)
188    }
189}