risingwave_frontend/optimizer/plan_node/
stream_eowc_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::sort_util::OrderType;
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17
18use super::generic::GenericPlanNode;
19use super::utils::TableCatalogBuilder;
20use super::{
21    ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, TryToStreamPb,
22    generic,
23};
24use crate::TableCatalog;
25use crate::binder::BoundFillStrategy;
26use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
27use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::optimizer::property::Distribution;
30use crate::scheduler::SchedulerResult;
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33/// `StreamEowcGapFill` implements [`super::Stream`] to represent a gap-filling operation on a time
34/// series in streaming mode.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamEowcGapFill {
37    pub base: PlanBase<super::Stream>,
38    core: generic::GapFill<PlanRef<Stream>>,
39}
40
41impl StreamEowcGapFill {
42    pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
43        let input = &core.input;
44
45        // Force singleton distribution for GapFill operations.
46        // GapFill requires access to all data across time ranges to correctly identify and fill gaps, so that missing intervals can be detected and filled appropriately.
47        let base = PlanBase::new_stream_with_core(
48            &core,
49            Distribution::Single,
50            input.stream_kind(),
51            true, // provides EOWC semantics
52            input.watermark_columns().clone(),
53            input.columns_monotonicity().clone(),
54        );
55        Self { base, core }
56    }
57
58    // Legacy constructor for backward compatibility
59    pub fn new_with_args(
60        input: PlanRef<Stream>,
61        time_col: InputRef,
62        interval: ExprImpl,
63        fill_strategies: Vec<BoundFillStrategy>,
64    ) -> Self {
65        let core = generic::GapFill {
66            input,
67            time_col,
68            interval,
69            fill_strategies,
70        };
71        Self::new(core)
72    }
73
74    pub fn time_col(&self) -> &InputRef {
75        &self.core.time_col
76    }
77
78    pub fn interval(&self) -> &ExprImpl {
79        &self.core.interval
80    }
81
82    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
83        &self.core.fill_strategies
84    }
85
86    fn infer_buffer_table(&self) -> TableCatalog {
87        let mut tbl_builder = TableCatalogBuilder::default();
88
89        let out_schema = self.core.schema();
90        for field in out_schema.fields() {
91            tbl_builder.add_column(field);
92        }
93
94        // Just use time column as the primary key since SortBuffer requires it for ordering
95        let time_col_idx = self.time_col().index();
96        tbl_builder.add_order_column(time_col_idx, OrderType::ascending());
97
98        tbl_builder.build(vec![], 0)
99    }
100
101    fn infer_prev_row_table(&self) -> TableCatalog {
102        let mut tbl_builder = TableCatalogBuilder::default();
103
104        for field in self.core.schema().fields() {
105            tbl_builder.add_column(field);
106        }
107
108        if !self.core.schema().fields().is_empty() {
109            tbl_builder.add_order_column(0, OrderType::ascending());
110        }
111
112        tbl_builder.build(vec![], 0)
113    }
114}
115
116impl PlanTreeNodeUnary<Stream> for StreamEowcGapFill {
117    fn input(&self) -> PlanRef<Stream> {
118        self.core.input.clone()
119    }
120
121    fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
122        let mut core = self.core.clone();
123        core.input = input;
124        Self::new(core)
125    }
126}
127
128impl_plan_tree_node_for_unary! { Stream, StreamEowcGapFill }
129impl_distill_by_unit!(StreamEowcGapFill, core, "StreamEowcGapFill");
130
131impl TryToStreamPb for StreamEowcGapFill {
132    fn try_to_stream_prost_body(
133        &self,
134        state: &mut BuildFragmentGraphState,
135    ) -> SchedulerResult<NodeBody> {
136        use risingwave_pb::stream_plan::*;
137
138        let fill_strategies: Vec<String> = self
139            .fill_strategies()
140            .iter()
141            .map(|strategy| match strategy.strategy {
142                crate::binder::FillStrategy::Locf => "locf".to_owned(),
143                crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
144                crate::binder::FillStrategy::Null => "null".to_owned(),
145            })
146            .collect();
147
148        let buffer_table = self
149            .infer_buffer_table()
150            .with_id(state.gen_table_id_wrapped())
151            .to_internal_table_prost();
152
153        let prev_row_table = self
154            .infer_prev_row_table()
155            .with_id(state.gen_table_id_wrapped())
156            .to_internal_table_prost();
157
158        Ok(NodeBody::EowcGapFill(Box::new(EowcGapFillNode {
159            time_column_index: self.time_col().index() as u32,
160            interval: Some(self.interval().to_expr_proto_checked_pure(
161                self.stream_kind().is_retract(),
162                "gap filling interval",
163            )?),
164            fill_columns: self
165                .fill_strategies()
166                .iter()
167                .map(|strategy| strategy.target_col.index() as u32)
168                .collect(),
169            fill_strategies,
170            buffer_table: Some(buffer_table),
171            prev_row_table: Some(prev_row_table),
172        })))
173    }
174}
175
176impl ExprRewritable<Stream> for StreamEowcGapFill {
177    fn has_rewritable_expr(&self) -> bool {
178        true
179    }
180
181    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
182        let mut core = self.core.clone();
183        core.rewrite_exprs(r);
184        Self {
185            base: self.base.clone_with_new_plan_id(),
186            core,
187        }
188        .into()
189    }
190}
191
192impl ExprVisitable for StreamEowcGapFill {
193    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
194        self.core.visit_exprs(v)
195    }
196}