Skip to main content

risingwave_frontend/optimizer/plan_node/
stream_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::sort_util::OrderType;
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17
18use super::generic::GenericPlanNode;
19use super::utils::TableCatalogBuilder;
20use super::{
21    ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, TryToStreamPb,
22    generic,
23};
24use crate::binder::BoundFillStrategy;
25use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::impl_distill_by_unit;
28use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
29use crate::scheduler::SchedulerResult;
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32/// `StreamGapFill` implements [`super::Stream`] to represent a gap-filling operation on a time
33/// series in normal streaming mode (without EOWC semantics).
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct StreamGapFill {
36    pub base: PlanBase<super::Stream>,
37    core: generic::GapFill<PlanRef<Stream>>,
38}
39
40impl StreamGapFill {
41    pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
42        let input = &core.input;
43        let partition_indices = core.partition_key_indices();
44        let distinct_partition_key_count = partition_indices
45            .iter()
46            .copied()
47            .collect::<std::collections::HashSet<_>>()
48            .len();
49        assert_eq!(
50            partition_indices.len(),
51            distinct_partition_key_count,
52            "stream gap fill expects canonicalized partition_by columns",
53        );
54        assert!(
55            !core
56                .pointer_key_indices()
57                .expect("stream gap fill input should have stream key")
58                .is_empty(),
59            "stream gap fill pointer key should not be empty",
60        );
61
62        let dist = if core.partition_by_cols.is_empty() {
63            Distribution::Single
64        } else {
65            Distribution::HashShard(partition_indices)
66        };
67
68        // Gap fill back-fills below the current watermark and emits non-monotonic fill values,
69        // so it preserves neither watermark nor monotonicity on its output.
70        let base = PlanBase::new_stream_with_core(
71            &core,
72            dist,
73            input.stream_kind(),
74            false, // does NOT provide EOWC semantics
75            WatermarkColumns::new(),
76            MonotonicityMap::new(),
77        );
78        Self { base, core }
79    }
80
81    // Legacy constructor for backward compatibility
82    pub fn new_with_args(
83        input: PlanRef<Stream>,
84        time_col: InputRef,
85        interval: ExprImpl,
86        fill_strategies: Vec<BoundFillStrategy>,
87    ) -> Self {
88        let core = generic::GapFill {
89            input,
90            time_col,
91            interval,
92            fill_strategies,
93            partition_by_cols: vec![],
94        };
95        Self::new(core)
96    }
97
98    pub fn time_col(&self) -> &InputRef {
99        &self.core.time_col
100    }
101
102    pub fn interval(&self) -> &ExprImpl {
103        &self.core.interval
104    }
105
106    pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
107        &self.core.fill_strategies
108    }
109
110    fn pointer_key_indices(&self) -> Vec<usize> {
111        self.core
112            .pointer_key_indices()
113            .expect("stream gap fill input should have stream key")
114    }
115
116    fn infer_state_table(&self) -> crate::TableCatalog {
117        let mut tbl_builder = TableCatalogBuilder::default();
118
119        let out_schema = self.core.schema();
120        for field in out_schema.fields() {
121            tbl_builder.add_column(field);
122        }
123
124        let state_key_indices = self
125            .core
126            .stream_key_indices()
127            .expect("stream gap fill input should have stream key");
128
129        // PK: deduplicated (partition_cols..., time_col, upstream stream key...).
130        for key_idx in state_key_indices {
131            tbl_builder.add_order_column(key_idx, OrderType::ascending());
132        }
133
134        let partition_key_indices = self.core.partition_key_indices();
135        let read_prefix_len_hint = partition_key_indices.len();
136        tbl_builder.build(partition_key_indices, read_prefix_len_hint)
137    }
138}
139
140impl PlanTreeNodeUnary<Stream> for StreamGapFill {
141    fn input(&self) -> PlanRef<Stream> {
142        self.core.input.clone()
143    }
144
145    fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
146        let mut core = self.core.clone();
147        core.input = input;
148        Self::new(core)
149    }
150}
151
152impl_plan_tree_node_for_unary! { Stream, StreamGapFill }
153impl_distill_by_unit!(StreamGapFill, core, "StreamGapFill");
154
155impl TryToStreamPb for StreamGapFill {
156    fn try_to_stream_prost_body(
157        &self,
158        state: &mut BuildFragmentGraphState,
159    ) -> SchedulerResult<NodeBody> {
160        use risingwave_pb::stream_plan::*;
161
162        let fill_strategies: Vec<String> = self
163            .fill_strategies()
164            .iter()
165            .map(|strategy| match strategy.strategy {
166                crate::binder::FillStrategy::Locf => "locf".to_owned(),
167                crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
168                crate::binder::FillStrategy::Null => "null".to_owned(),
169            })
170            .collect();
171
172        let state_table = self
173            .infer_state_table()
174            .with_id(state.gen_table_id_wrapped())
175            .to_internal_table_prost();
176
177        Ok(NodeBody::GapFill(Box::new(GapFillNode {
178            pointer_key_indices: self
179                .pointer_key_indices()
180                .into_iter()
181                .map(|idx| idx as u32)
182                .collect(),
183            time_column_index: self.time_col().index() as u32,
184            interval: Some(self.interval().to_expr_proto_checked_pure(
185                self.stream_kind().is_retract(),
186                "gap filling interval",
187            )?),
188            fill_columns: self
189                .fill_strategies()
190                .iter()
191                .map(|strategy| strategy.target_col.index() as u32)
192                .collect(),
193            fill_strategies,
194            state_table: Some(state_table),
195            partition_by_indices: self
196                .core
197                .partition_by_cols
198                .iter()
199                .map(|c| c.index() as u32)
200                .collect(),
201        })))
202    }
203}
204
205impl ExprRewritable<Stream> for StreamGapFill {
206    fn has_rewritable_expr(&self) -> bool {
207        true
208    }
209
210    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
211        let mut core = self.core.clone();
212        core.rewrite_exprs(r);
213        Self {
214            base: self.base.clone_with_new_plan_id(),
215            core,
216        }
217        .into()
218    }
219}
220
221impl ExprVisitable for StreamGapFill {
222    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
223        self.core.visit_exprs(v)
224    }
225}