Skip to main content

risingwave_frontend/optimizer/plan_node/generic/
gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::Schema;
17
18use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
19use crate::OptimizerContextRef;
20use crate::binder::BoundFillStrategy;
21use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
22use crate::optimizer::plan_node::ColIndexMapping;
23use crate::optimizer::plan_node::utils::childless_record;
24use crate::optimizer::property::FunctionalDependencySet;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct GapFill<PlanRef> {
28    pub input: PlanRef,
29    pub time_col: InputRef,
30    pub interval: ExprImpl,
31    pub fill_strategies: Vec<BoundFillStrategy>,
32    pub partition_by_cols: Vec<InputRef>,
33}
34
35impl<PlanRef: GenericPlanRef> GapFill<PlanRef> {
36    pub fn partition_key_indices(&self) -> Vec<usize> {
37        self.partition_by_cols.iter().map(|c| c.index()).collect()
38    }
39
40    pub fn stream_key_indices(&self) -> Option<Vec<usize>> {
41        let mut stream_key = self.partition_key_indices();
42
43        if !stream_key.contains(&self.time_col.index()) {
44            stream_key.push(self.time_col.index());
45        }
46
47        for &key in self.input.stream_key()? {
48            if !stream_key.contains(&key) {
49                stream_key.push(key);
50            }
51        }
52
53        Some(stream_key)
54    }
55
56    pub fn pointer_key_indices(&self) -> Option<Vec<usize>> {
57        let time_col_idx = self.time_col.index();
58        let partition_key_indices = self.partition_key_indices();
59        let mut pointer_key_indices = vec![time_col_idx];
60
61        for &key in self.input.stream_key()? {
62            if key != time_col_idx
63                && !partition_key_indices.contains(&key)
64                && !pointer_key_indices.contains(&key)
65            {
66                pointer_key_indices.push(key);
67            }
68        }
69
70        Some(pointer_key_indices)
71    }
72}
73
74impl<PlanRef: GenericPlanRef> GenericPlanNode for GapFill<PlanRef> {
75    fn schema(&self) -> Schema {
76        self.input.schema().clone()
77    }
78
79    fn functional_dependency(&self) -> FunctionalDependencySet {
80        match self.stream_key_indices() {
81            Some(stream_key) => FunctionalDependencySet::with_key(self.schema().len(), &stream_key),
82            None => FunctionalDependencySet::new(self.schema().len()),
83        }
84    }
85
86    fn stream_key(&self) -> Option<Vec<usize>> {
87        self.stream_key_indices()
88    }
89
90    fn ctx(&self) -> OptimizerContextRef {
91        self.input.ctx()
92    }
93}
94
95impl<PlanRef: GenericPlanRef> crate::optimizer::plan_node::expr_visitable::ExprVisitable
96    for GapFill<PlanRef>
97{
98    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
99        v.visit_expr(&self.time_col.clone().into());
100        v.visit_expr(&self.interval);
101    }
102}
103
104impl<PlanRef> GapFill<PlanRef> {
105    pub fn rewrite_with_col_index_mapping(&mut self, mapping: &mut ColIndexMapping) {
106        let expr: ExprImpl = self.time_col.clone().into();
107        self.time_col = mapping
108            .rewrite_expr(expr)
109            .as_input_ref()
110            .expect("time_col must be an InputRef after rewrite")
111            .as_ref()
112            .clone();
113
114        self.fill_strategies.iter_mut().for_each(|s| {
115            let expr: ExprImpl = s.target_col.clone().into();
116            s.target_col = mapping
117                .rewrite_expr(expr)
118                .as_input_ref()
119                .expect("target_col must be an InputRef after rewrite")
120                .as_ref()
121                .clone();
122        });
123        self.interval = mapping.rewrite_expr(self.interval.clone());
124
125        self.partition_by_cols.iter_mut().for_each(|c| {
126            let expr: ExprImpl = c.clone().into();
127            *c = mapping
128                .rewrite_expr(expr)
129                .as_input_ref()
130                .expect("partition_by_col must be an InputRef after rewrite")
131                .as_ref()
132                .clone();
133        });
134    }
135
136    pub fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
137        self.interval = r.rewrite_expr(self.interval.clone());
138
139        let expr: ExprImpl = self.time_col.clone().into();
140        let rewritten = r.rewrite_expr(expr);
141        self.time_col = rewritten
142            .as_input_ref()
143            .expect("time_col must be an InputRef after rewrite")
144            .as_ref()
145            .clone();
146
147        self.fill_strategies.iter_mut().for_each(|s| {
148            let expr: ExprImpl = s.target_col.clone().into();
149            let rewritten = r.rewrite_expr(expr);
150            s.target_col = rewritten
151                .as_input_ref()
152                .expect("target_col must be an InputRef after rewrite")
153                .as_ref()
154                .clone();
155        });
156
157        self.partition_by_cols.iter_mut().for_each(|c| {
158            let expr: ExprImpl = c.clone().into();
159            let rewritten = r.rewrite_expr(expr);
160            *c = rewritten
161                .as_input_ref()
162                .expect("partition_by_col must be an InputRef after rewrite")
163                .as_ref()
164                .clone();
165        });
166    }
167}
168
169impl<PlanRef: GenericPlanRef> DistillUnit for GapFill<PlanRef> {
170    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
171        let mut fields = vec![
172            ("time_col", Pretty::debug(&self.time_col)),
173            ("interval", Pretty::debug(&self.interval)),
174            ("fill_strategies", Pretty::debug(&self.fill_strategies)),
175        ];
176        if !self.partition_by_cols.is_empty() {
177            fields.push(("partition_by", Pretty::debug(&self.partition_by_cols)));
178        }
179        childless_record(name, fields)
180    }
181}