risingwave_frontend/optimizer/plan_node/generic/
gap_fill.rs1use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::Schema;
17
18use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
19use crate::OptimizerContextRef;
20use crate::binder::BoundFillStrategy;
21use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
22use crate::optimizer::plan_node::ColIndexMapping;
23use crate::optimizer::plan_node::utils::childless_record;
24use crate::optimizer::property::FunctionalDependencySet;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct GapFill<PlanRef> {
28 pub input: PlanRef,
29 pub time_col: InputRef,
30 pub interval: ExprImpl,
31 pub fill_strategies: Vec<BoundFillStrategy>,
32 pub partition_by_cols: Vec<InputRef>,
33}
34
35impl<PlanRef: GenericPlanRef> GapFill<PlanRef> {
36 pub fn partition_key_indices(&self) -> Vec<usize> {
37 self.partition_by_cols.iter().map(|c| c.index()).collect()
38 }
39
40 pub fn stream_key_indices(&self) -> Option<Vec<usize>> {
41 let mut stream_key = self.partition_key_indices();
42
43 if !stream_key.contains(&self.time_col.index()) {
44 stream_key.push(self.time_col.index());
45 }
46
47 for &key in self.input.stream_key()? {
48 if !stream_key.contains(&key) {
49 stream_key.push(key);
50 }
51 }
52
53 Some(stream_key)
54 }
55
56 pub fn pointer_key_indices(&self) -> Option<Vec<usize>> {
57 let time_col_idx = self.time_col.index();
58 let partition_key_indices = self.partition_key_indices();
59 let mut pointer_key_indices = vec![time_col_idx];
60
61 for &key in self.input.stream_key()? {
62 if key != time_col_idx
63 && !partition_key_indices.contains(&key)
64 && !pointer_key_indices.contains(&key)
65 {
66 pointer_key_indices.push(key);
67 }
68 }
69
70 Some(pointer_key_indices)
71 }
72}
73
74impl<PlanRef: GenericPlanRef> GenericPlanNode for GapFill<PlanRef> {
75 fn schema(&self) -> Schema {
76 self.input.schema().clone()
77 }
78
79 fn functional_dependency(&self) -> FunctionalDependencySet {
80 match self.stream_key_indices() {
81 Some(stream_key) => FunctionalDependencySet::with_key(self.schema().len(), &stream_key),
82 None => FunctionalDependencySet::new(self.schema().len()),
83 }
84 }
85
86 fn stream_key(&self) -> Option<Vec<usize>> {
87 self.stream_key_indices()
88 }
89
90 fn ctx(&self) -> OptimizerContextRef {
91 self.input.ctx()
92 }
93}
94
95impl<PlanRef: GenericPlanRef> crate::optimizer::plan_node::expr_visitable::ExprVisitable
96 for GapFill<PlanRef>
97{
98 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
99 v.visit_expr(&self.time_col.clone().into());
100 v.visit_expr(&self.interval);
101 }
102}
103
104impl<PlanRef> GapFill<PlanRef> {
105 pub fn rewrite_with_col_index_mapping(&mut self, mapping: &mut ColIndexMapping) {
106 let expr: ExprImpl = self.time_col.clone().into();
107 self.time_col = mapping
108 .rewrite_expr(expr)
109 .as_input_ref()
110 .expect("time_col must be an InputRef after rewrite")
111 .as_ref()
112 .clone();
113
114 self.fill_strategies.iter_mut().for_each(|s| {
115 let expr: ExprImpl = s.target_col.clone().into();
116 s.target_col = mapping
117 .rewrite_expr(expr)
118 .as_input_ref()
119 .expect("target_col must be an InputRef after rewrite")
120 .as_ref()
121 .clone();
122 });
123 self.interval = mapping.rewrite_expr(self.interval.clone());
124
125 self.partition_by_cols.iter_mut().for_each(|c| {
126 let expr: ExprImpl = c.clone().into();
127 *c = mapping
128 .rewrite_expr(expr)
129 .as_input_ref()
130 .expect("partition_by_col must be an InputRef after rewrite")
131 .as_ref()
132 .clone();
133 });
134 }
135
136 pub fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
137 self.interval = r.rewrite_expr(self.interval.clone());
138
139 let expr: ExprImpl = self.time_col.clone().into();
140 let rewritten = r.rewrite_expr(expr);
141 self.time_col = rewritten
142 .as_input_ref()
143 .expect("time_col must be an InputRef after rewrite")
144 .as_ref()
145 .clone();
146
147 self.fill_strategies.iter_mut().for_each(|s| {
148 let expr: ExprImpl = s.target_col.clone().into();
149 let rewritten = r.rewrite_expr(expr);
150 s.target_col = rewritten
151 .as_input_ref()
152 .expect("target_col must be an InputRef after rewrite")
153 .as_ref()
154 .clone();
155 });
156
157 self.partition_by_cols.iter_mut().for_each(|c| {
158 let expr: ExprImpl = c.clone().into();
159 let rewritten = r.rewrite_expr(expr);
160 *c = rewritten
161 .as_input_ref()
162 .expect("partition_by_col must be an InputRef after rewrite")
163 .as_ref()
164 .clone();
165 });
166 }
167}
168
169impl<PlanRef: GenericPlanRef> DistillUnit for GapFill<PlanRef> {
170 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
171 let mut fields = vec![
172 ("time_col", Pretty::debug(&self.time_col)),
173 ("interval", Pretty::debug(&self.interval)),
174 ("fill_strategies", Pretty::debug(&self.fill_strategies)),
175 ];
176 if !self.partition_by_cols.is_empty() {
177 fields.push(("partition_by", Pretty::debug(&self.partition_by_cols)));
178 }
179 childless_record(name, fields)
180 }
181}