risingwave_frontend/optimizer/plan_node/generic/
gap_fill.rs1use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::Schema;
17
18use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
19use crate::OptimizerContextRef;
20use crate::binder::BoundFillStrategy;
21use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
22use crate::optimizer::plan_node::ColIndexMapping;
23use crate::optimizer::plan_node::utils::childless_record;
24use crate::optimizer::property::FunctionalDependencySet;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct GapFill<PlanRef> {
28 pub input: PlanRef,
29 pub time_col: InputRef,
30 pub interval: ExprImpl,
31 pub fill_strategies: Vec<BoundFillStrategy>,
32}
33
34impl<PlanRef: GenericPlanRef> GenericPlanNode for GapFill<PlanRef> {
35 fn schema(&self) -> Schema {
36 self.input.schema().clone()
37 }
38
39 fn functional_dependency(&self) -> FunctionalDependencySet {
40 self.input.functional_dependency().clone()
41 }
42
43 fn stream_key(&self) -> Option<Vec<usize>> {
44 self.input.stream_key().map(|key| key.to_vec())
45 }
46
47 fn ctx(&self) -> OptimizerContextRef {
48 self.input.ctx()
49 }
50}
51
52impl<PlanRef: GenericPlanRef> crate::optimizer::plan_node::expr_visitable::ExprVisitable
53 for GapFill<PlanRef>
54{
55 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
56 v.visit_expr(&self.time_col.clone().into());
57 v.visit_expr(&self.interval);
58 }
59}
60
61impl<PlanRef> GapFill<PlanRef> {
62 pub fn rewrite_with_col_index_mapping(&mut self, mapping: &mut ColIndexMapping) {
63 let expr: ExprImpl = self.time_col.clone().into();
64 self.time_col = mapping
65 .rewrite_expr(expr)
66 .as_input_ref()
67 .expect("time_col must be an InputRef after rewrite")
68 .as_ref()
69 .clone();
70
71 self.fill_strategies.iter_mut().for_each(|s| {
72 let expr: ExprImpl = s.target_col.clone().into();
73 s.target_col = mapping
74 .rewrite_expr(expr)
75 .as_input_ref()
76 .expect("target_col must be an InputRef after rewrite")
77 .as_ref()
78 .clone();
79 });
80 self.interval = mapping.rewrite_expr(self.interval.clone());
81 }
82
83 pub fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
84 self.interval = r.rewrite_expr(self.interval.clone());
85
86 let expr: ExprImpl = self.time_col.clone().into();
87 let rewritten = r.rewrite_expr(expr);
88 self.time_col = rewritten
89 .as_input_ref()
90 .expect("time_col must be an InputRef after rewrite")
91 .as_ref()
92 .clone();
93
94 self.fill_strategies.iter_mut().for_each(|s| {
95 let expr: ExprImpl = s.target_col.clone().into();
96 let rewritten = r.rewrite_expr(expr);
97 s.target_col = rewritten
98 .as_input_ref()
99 .expect("target_col must be an InputRef after rewrite")
100 .as_ref()
101 .clone();
102 });
103 }
104}
105
106impl<PlanRef: GenericPlanRef> DistillUnit for GapFill<PlanRef> {
107 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
108 let fields = vec![
109 ("time_col", Pretty::debug(&self.time_col)),
110 ("interval", Pretty::debug(&self.interval)),
111 ("fill_strategies", Pretty::debug(&self.fill_strategies)),
112 ];
113 childless_record(name, fields)
114 }
115}