Skip to main content

risingwave_frontend/optimizer/plan_node/
stream_eowc_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::sort_util::OrderType;
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17
18use super::generic::GenericPlanNode;
19use super::utils::TableCatalogBuilder;
20use super::{
21    ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, TryToStreamPb,
22    generic,
23};
24use crate::TableCatalog;
25use crate::binder::BoundFillStrategy;
26use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
27use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
30use crate::scheduler::SchedulerResult;
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33/// `StreamEowcGapFill` implements [`super::Stream`] to represent a gap-filling operation on a time
34/// series in streaming mode.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamEowcGapFill {
37    pub base: PlanBase<super::Stream>,
38    core: generic::GapFill<PlanRef<Stream>>,
39}
40
41impl StreamEowcGapFill {
42    pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
43        let input = &core.input;
44
45        let dist = if core.partition_by_cols.is_empty() {
46            Distribution::Single
47        } else {
48            let partition_indices: Vec<usize> =
49                core.partition_by_cols.iter().map(|c| c.index()).collect();
50            Distribution::HashShard(partition_indices)
51        };
52
53        // A late anchor makes gap fill back-fill a partition below the already-forwarded time
54        // watermark, so the time watermark is never preserved. Only `PARTITION BY` columns are
55        // copied unchanged into generated rows, so only their watermark can be propagated.
56        let mut watermark_columns = WatermarkColumns::new();
57        for partition_col in &core.partition_by_cols {
58            let idx = partition_col.index();
59            if let Some(group) = input.watermark_columns().get_group(idx) {
60                watermark_columns.insert(idx, group);
61            }
62        }
63
64        // Without partitioning the output stays time-sorted, so the time column is monotonic.
65        // With partitioning, cross-partition interleaving breaks that; only the copied
66        // `PARTITION BY` columns keep their monotonicity.
67        let mut columns_monotonicity = MonotonicityMap::new();
68        if core.partition_by_cols.is_empty() {
69            let idx = core.time_col.index();
70            columns_monotonicity.insert(idx, input.columns_monotonicity()[idx]);
71        } else {
72            for partition_col in &core.partition_by_cols {
73                let idx = partition_col.index();
74                columns_monotonicity.insert(idx, input.columns_monotonicity()[idx]);
75            }
76        }
77
78        let base = PlanBase::new_stream_with_core(
79            &core,
80            dist,
81            input.stream_kind(),
82            true, // provides EOWC semantics
83            watermark_columns,
84            columns_monotonicity,
85        );
86        Self { base, core }
87    }
88
89    // Legacy constructor for backward compatibility
90    pub fn new_with_args(
91        input: PlanRef<Stream>,
92        time_col: InputRef,
93        interval: ExprImpl,
94        fill_strategies: Vec<BoundFillStrategy>,
95    ) -> Self {
96        let core = generic::GapFill {
97            input,
98            time_col,
99            interval,
100            fill_strategies,
101            partition_by_cols: vec![],
102        };
103        Self::new(core)
104    }
105
106    pub fn time_col(&self) -> &InputRef {
107        &self.core.time_col
108    }
109
110    pub fn interval(&self) -> &ExprImpl {
111        &self.core.interval
112    }
113
114    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
115        &self.core.fill_strategies
116    }
117
118    fn infer_prev_row_table(&self) -> TableCatalog {
119        let mut tbl_builder = TableCatalogBuilder::default();
120
121        for field in self.core.schema().fields() {
122            tbl_builder.add_column(field);
123        }
124
125        if self.core.partition_by_cols.is_empty() {
126            // No partition: single-row table, use first column as PK
127            if !self.core.schema().fields().is_empty() {
128                tbl_builder.add_order_column(0, OrderType::ascending());
129            }
130            tbl_builder.build(vec![], 0)
131        } else {
132            // With partition: PK = partition_cols (one prev_row per partition)
133            for pc in &self.core.partition_by_cols {
134                tbl_builder.add_order_column(pc.index(), OrderType::ascending());
135            }
136            let dist_key_indices: Vec<usize> = self
137                .core
138                .partition_by_cols
139                .iter()
140                .map(|c| c.index())
141                .collect();
142            tbl_builder.build(dist_key_indices, 0)
143        }
144    }
145}
146
147impl PlanTreeNodeUnary<Stream> for StreamEowcGapFill {
148    fn input(&self) -> PlanRef<Stream> {
149        self.core.input.clone()
150    }
151
152    fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
153        let mut core = self.core.clone();
154        core.input = input;
155        Self::new(core)
156    }
157}
158
159impl_plan_tree_node_for_unary! { Stream, StreamEowcGapFill }
160impl_distill_by_unit!(StreamEowcGapFill, core, "StreamEowcGapFill");
161
162impl TryToStreamPb for StreamEowcGapFill {
163    fn try_to_stream_prost_body(
164        &self,
165        state: &mut BuildFragmentGraphState,
166    ) -> SchedulerResult<NodeBody> {
167        use risingwave_pb::stream_plan::*;
168
169        let fill_strategies: Vec<String> = self
170            .fill_strategies()
171            .iter()
172            .map(|strategy| match strategy.strategy {
173                crate::binder::FillStrategy::Locf => "locf".to_owned(),
174                crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
175                crate::binder::FillStrategy::Null => "null".to_owned(),
176            })
177            .collect();
178
179        let prev_row_table = self
180            .infer_prev_row_table()
181            .with_id(state.gen_table_id_wrapped())
182            .to_internal_table_prost();
183
184        Ok(NodeBody::EowcGapFill(Box::new(EowcGapFillNode {
185            time_column_index: self.time_col().index() as u32,
186            interval: Some(self.interval().to_expr_proto_checked_pure(
187                self.stream_kind().is_retract(),
188                "gap filling interval",
189            )?),
190            fill_columns: self
191                .fill_strategies()
192                .iter()
193                .map(|strategy| strategy.target_col.index() as u32)
194                .collect(),
195            fill_strategies,
196            prev_row_table: Some(prev_row_table),
197            partition_by_indices: self
198                .core
199                .partition_by_cols
200                .iter()
201                .map(|c| c.index() as u32)
202                .collect(),
203        })))
204    }
205}
206
207impl ExprRewritable<Stream> for StreamEowcGapFill {
208    fn has_rewritable_expr(&self) -> bool {
209        true
210    }
211
212    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
213        let mut core = self.core.clone();
214        core.rewrite_exprs(r);
215        Self {
216            base: self.base.clone_with_new_plan_id(),
217            core,
218        }
219        .into()
220    }
221}
222
223impl ExprVisitable for StreamEowcGapFill {
224    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
225        self.core.visit_exprs(v)
226    }
227}