risingwave_frontend/optimizer/plan_node/
stream_eowc_gap_fill.rs1use risingwave_common::util::sort_util::OrderType;
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17
18use super::generic::GenericPlanNode;
19use super::utils::TableCatalogBuilder;
20use super::{
21 ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, TryToStreamPb,
22 generic,
23};
24use crate::TableCatalog;
25use crate::binder::BoundFillStrategy;
26use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
27use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::optimizer::property::Distribution;
30use crate::scheduler::SchedulerResult;
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamEowcGapFill {
37 pub base: PlanBase<super::Stream>,
38 core: generic::GapFill<PlanRef<Stream>>,
39}
40
41impl StreamEowcGapFill {
42 pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
43 let input = &core.input;
44
45 let base = PlanBase::new_stream_with_core(
48 &core,
49 Distribution::Single,
50 input.stream_kind(),
51 true, input.watermark_columns().clone(),
53 input.columns_monotonicity().clone(),
54 );
55 Self { base, core }
56 }
57
58 pub fn new_with_args(
60 input: PlanRef<Stream>,
61 time_col: InputRef,
62 interval: ExprImpl,
63 fill_strategies: Vec<BoundFillStrategy>,
64 ) -> Self {
65 let core = generic::GapFill {
66 input,
67 time_col,
68 interval,
69 fill_strategies,
70 };
71 Self::new(core)
72 }
73
74 pub fn time_col(&self) -> &InputRef {
75 &self.core.time_col
76 }
77
78 pub fn interval(&self) -> &ExprImpl {
79 &self.core.interval
80 }
81
82 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
83 &self.core.fill_strategies
84 }
85
86 fn infer_buffer_table(&self) -> TableCatalog {
87 let mut tbl_builder = TableCatalogBuilder::default();
88
89 let out_schema = self.core.schema();
90 for field in out_schema.fields() {
91 tbl_builder.add_column(field);
92 }
93
94 let time_col_idx = self.time_col().index();
96 tbl_builder.add_order_column(time_col_idx, OrderType::ascending());
97
98 tbl_builder.build(vec![], 0)
99 }
100
101 fn infer_prev_row_table(&self) -> TableCatalog {
102 let mut tbl_builder = TableCatalogBuilder::default();
103
104 for field in self.core.schema().fields() {
105 tbl_builder.add_column(field);
106 }
107
108 if !self.core.schema().fields().is_empty() {
109 tbl_builder.add_order_column(0, OrderType::ascending());
110 }
111
112 tbl_builder.build(vec![], 0)
113 }
114}
115
116impl PlanTreeNodeUnary<Stream> for StreamEowcGapFill {
117 fn input(&self) -> PlanRef<Stream> {
118 self.core.input.clone()
119 }
120
121 fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
122 let mut core = self.core.clone();
123 core.input = input;
124 Self::new(core)
125 }
126}
127
128impl_plan_tree_node_for_unary! { Stream, StreamEowcGapFill }
129impl_distill_by_unit!(StreamEowcGapFill, core, "StreamEowcGapFill");
130
131impl TryToStreamPb for StreamEowcGapFill {
132 fn try_to_stream_prost_body(
133 &self,
134 state: &mut BuildFragmentGraphState,
135 ) -> SchedulerResult<NodeBody> {
136 use risingwave_pb::stream_plan::*;
137
138 let fill_strategies: Vec<String> = self
139 .fill_strategies()
140 .iter()
141 .map(|strategy| match strategy.strategy {
142 crate::binder::FillStrategy::Locf => "locf".to_owned(),
143 crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
144 crate::binder::FillStrategy::Null => "null".to_owned(),
145 })
146 .collect();
147
148 let buffer_table = self
149 .infer_buffer_table()
150 .with_id(state.gen_table_id_wrapped())
151 .to_internal_table_prost();
152
153 let prev_row_table = self
154 .infer_prev_row_table()
155 .with_id(state.gen_table_id_wrapped())
156 .to_internal_table_prost();
157
158 Ok(NodeBody::EowcGapFill(Box::new(EowcGapFillNode {
159 time_column_index: self.time_col().index() as u32,
160 interval: Some(self.interval().to_expr_proto_checked_pure(
161 self.stream_kind().is_retract(),
162 "gap filling interval",
163 )?),
164 fill_columns: self
165 .fill_strategies()
166 .iter()
167 .map(|strategy| strategy.target_col.index() as u32)
168 .collect(),
169 fill_strategies,
170 buffer_table: Some(buffer_table),
171 prev_row_table: Some(prev_row_table),
172 })))
173 }
174}
175
176impl ExprRewritable<Stream> for StreamEowcGapFill {
177 fn has_rewritable_expr(&self) -> bool {
178 true
179 }
180
181 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
182 let mut core = self.core.clone();
183 core.rewrite_exprs(r);
184 Self {
185 base: self.base.clone_with_new_plan_id(),
186 core,
187 }
188 .into()
189 }
190}
191
192impl ExprVisitable for StreamEowcGapFill {
193 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
194 self.core.visit_exprs(v)
195 }
196}