risingwave_frontend/optimizer/plan_node/
stream_eowc_gap_fill.rs1use risingwave_common::util::sort_util::OrderType;
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17
18use super::generic::GenericPlanNode;
19use super::utils::TableCatalogBuilder;
20use super::{
21 ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, StreamNode,
22 generic,
23};
24use crate::TableCatalog;
25use crate::binder::BoundFillStrategy;
26use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
27use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::optimizer::property::Distribution;
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct StreamEowcGapFill {
36 pub base: PlanBase<super::Stream>,
37 core: generic::GapFill<PlanRef<Stream>>,
38}
39
40impl StreamEowcGapFill {
41 pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
42 let input = &core.input;
43
44 let base = PlanBase::new_stream_with_core(
47 &core,
48 Distribution::Single,
49 input.stream_kind(),
50 true, input.watermark_columns().clone(),
52 input.columns_monotonicity().clone(),
53 );
54 Self { base, core }
55 }
56
57 pub fn new_with_args(
59 input: PlanRef<Stream>,
60 time_col: InputRef,
61 interval: ExprImpl,
62 fill_strategies: Vec<BoundFillStrategy>,
63 ) -> Self {
64 let core = generic::GapFill {
65 input,
66 time_col,
67 interval,
68 fill_strategies,
69 };
70 Self::new(core)
71 }
72
73 pub fn time_col(&self) -> &InputRef {
74 &self.core.time_col
75 }
76
77 pub fn interval(&self) -> &ExprImpl {
78 &self.core.interval
79 }
80
81 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
82 &self.core.fill_strategies
83 }
84
85 fn infer_buffer_table(&self) -> TableCatalog {
86 let mut tbl_builder = TableCatalogBuilder::default();
87
88 let out_schema = self.core.schema();
89 for field in out_schema.fields() {
90 tbl_builder.add_column(field);
91 }
92
93 let time_col_idx = self.time_col().index();
95 tbl_builder.add_order_column(time_col_idx, OrderType::ascending());
96
97 tbl_builder.build(vec![], 0)
98 }
99
100 fn infer_prev_row_table(&self) -> TableCatalog {
101 let mut tbl_builder = TableCatalogBuilder::default();
102
103 for field in self.core.schema().fields() {
104 tbl_builder.add_column(field);
105 }
106
107 if !self.core.schema().fields().is_empty() {
108 tbl_builder.add_order_column(0, OrderType::ascending());
109 }
110
111 tbl_builder.build(vec![], 0)
112 }
113}
114
115impl PlanTreeNodeUnary<Stream> for StreamEowcGapFill {
116 fn input(&self) -> PlanRef<Stream> {
117 self.core.input.clone()
118 }
119
120 fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
121 let mut core = self.core.clone();
122 core.input = input;
123 Self::new(core)
124 }
125}
126
127impl_plan_tree_node_for_unary! { Stream, StreamEowcGapFill }
128impl_distill_by_unit!(StreamEowcGapFill, core, "StreamEowcGapFill");
129
130impl StreamNode for StreamEowcGapFill {
131 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
132 use risingwave_pb::stream_plan::*;
133
134 let fill_strategies: Vec<String> = self
135 .fill_strategies()
136 .iter()
137 .map(|strategy| match strategy.strategy {
138 crate::binder::FillStrategy::Locf => "locf".to_owned(),
139 crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
140 crate::binder::FillStrategy::Null => "null".to_owned(),
141 })
142 .collect();
143
144 let buffer_table = self
145 .infer_buffer_table()
146 .with_id(state.gen_table_id_wrapped())
147 .to_internal_table_prost();
148
149 let prev_row_table = self
150 .infer_prev_row_table()
151 .with_id(state.gen_table_id_wrapped())
152 .to_internal_table_prost();
153
154 NodeBody::EowcGapFill(Box::new(EowcGapFillNode {
155 time_column_index: self.time_col().index() as u32,
156 interval: Some(self.interval().to_expr_proto()),
157 fill_columns: self
158 .fill_strategies()
159 .iter()
160 .map(|strategy| strategy.target_col.index() as u32)
161 .collect(),
162 fill_strategies,
163 buffer_table: Some(buffer_table),
164 prev_row_table: Some(prev_row_table),
165 }))
166 }
167}
168
169impl ExprRewritable<Stream> for StreamEowcGapFill {
170 fn has_rewritable_expr(&self) -> bool {
171 true
172 }
173
174 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
175 let mut core = self.core.clone();
176 core.rewrite_exprs(r);
177 Self {
178 base: self.base.clone_with_new_plan_id(),
179 core,
180 }
181 .into()
182 }
183}
184
185impl ExprVisitable for StreamEowcGapFill {
186 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
187 self.core.visit_exprs(v)
188 }
189}