risingwave_frontend/optimizer/plan_node/generic/
gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::Schema;
17
18use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
19use crate::OptimizerContextRef;
20use crate::binder::BoundFillStrategy;
21use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
22use crate::optimizer::plan_node::ColIndexMapping;
23use crate::optimizer::plan_node::utils::childless_record;
24use crate::optimizer::property::FunctionalDependencySet;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct GapFill<PlanRef> {
28    pub input: PlanRef,
29    pub time_col: InputRef,
30    pub interval: ExprImpl,
31    pub fill_strategies: Vec<BoundFillStrategy>,
32}
33
34impl<PlanRef: GenericPlanRef> GenericPlanNode for GapFill<PlanRef> {
35    fn schema(&self) -> Schema {
36        self.input.schema().clone()
37    }
38
39    fn functional_dependency(&self) -> FunctionalDependencySet {
40        self.input.functional_dependency().clone()
41    }
42
43    fn stream_key(&self) -> Option<Vec<usize>> {
44        self.input.stream_key().map(|key| key.to_vec())
45    }
46
47    fn ctx(&self) -> OptimizerContextRef {
48        self.input.ctx()
49    }
50}
51
52impl<PlanRef: GenericPlanRef> crate::optimizer::plan_node::expr_visitable::ExprVisitable
53    for GapFill<PlanRef>
54{
55    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
56        v.visit_expr(&self.time_col.clone().into());
57        v.visit_expr(&self.interval);
58    }
59}
60
61impl<PlanRef> GapFill<PlanRef> {
62    pub fn rewrite_with_col_index_mapping(&mut self, mapping: &mut ColIndexMapping) {
63        let expr: ExprImpl = self.time_col.clone().into();
64        self.time_col = mapping
65            .rewrite_expr(expr)
66            .as_input_ref()
67            .expect("time_col must be an InputRef after rewrite")
68            .as_ref()
69            .clone();
70
71        self.fill_strategies.iter_mut().for_each(|s| {
72            let expr: ExprImpl = s.target_col.clone().into();
73            s.target_col = mapping
74                .rewrite_expr(expr)
75                .as_input_ref()
76                .expect("target_col must be an InputRef after rewrite")
77                .as_ref()
78                .clone();
79        });
80        self.interval = mapping.rewrite_expr(self.interval.clone());
81    }
82
83    pub fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
84        self.interval = r.rewrite_expr(self.interval.clone());
85
86        let expr: ExprImpl = self.time_col.clone().into();
87        let rewritten = r.rewrite_expr(expr);
88        self.time_col = rewritten
89            .as_input_ref()
90            .expect("time_col must be an InputRef after rewrite")
91            .as_ref()
92            .clone();
93
94        self.fill_strategies.iter_mut().for_each(|s| {
95            let expr: ExprImpl = s.target_col.clone().into();
96            let rewritten = r.rewrite_expr(expr);
97            s.target_col = rewritten
98                .as_input_ref()
99                .expect("target_col must be an InputRef after rewrite")
100                .as_ref()
101                .clone();
102        });
103    }
104}
105
106impl<PlanRef: GenericPlanRef> DistillUnit for GapFill<PlanRef> {
107    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
108        let fields = vec![
109            ("time_col", Pretty::debug(&self.time_col)),
110            ("interval", Pretty::debug(&self.interval)),
111            ("fill_strategies", Pretty::debug(&self.fill_strategies)),
112        ];
113        childless_record(name, fields)
114    }
115}