risingwave_frontend/optimizer/plan_node/
stream_eowc_gap_fill.rs1use risingwave_common::util::sort_util::OrderType;
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17
18use super::generic::GenericPlanNode;
19use super::utils::TableCatalogBuilder;
20use super::{
21 ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, TryToStreamPb,
22 generic,
23};
24use crate::TableCatalog;
25use crate::binder::BoundFillStrategy;
26use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
27use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
30use crate::scheduler::SchedulerResult;
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamEowcGapFill {
37 pub base: PlanBase<super::Stream>,
38 core: generic::GapFill<PlanRef<Stream>>,
39}
40
41impl StreamEowcGapFill {
42 pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
43 let input = &core.input;
44
45 let dist = if core.partition_by_cols.is_empty() {
46 Distribution::Single
47 } else {
48 let partition_indices: Vec<usize> =
49 core.partition_by_cols.iter().map(|c| c.index()).collect();
50 Distribution::HashShard(partition_indices)
51 };
52
53 let mut watermark_columns = WatermarkColumns::new();
57 for partition_col in &core.partition_by_cols {
58 let idx = partition_col.index();
59 if let Some(group) = input.watermark_columns().get_group(idx) {
60 watermark_columns.insert(idx, group);
61 }
62 }
63
64 let mut columns_monotonicity = MonotonicityMap::new();
68 if core.partition_by_cols.is_empty() {
69 let idx = core.time_col.index();
70 columns_monotonicity.insert(idx, input.columns_monotonicity()[idx]);
71 } else {
72 for partition_col in &core.partition_by_cols {
73 let idx = partition_col.index();
74 columns_monotonicity.insert(idx, input.columns_monotonicity()[idx]);
75 }
76 }
77
78 let base = PlanBase::new_stream_with_core(
79 &core,
80 dist,
81 input.stream_kind(),
82 true, watermark_columns,
84 columns_monotonicity,
85 );
86 Self { base, core }
87 }
88
89 pub fn new_with_args(
91 input: PlanRef<Stream>,
92 time_col: InputRef,
93 interval: ExprImpl,
94 fill_strategies: Vec<BoundFillStrategy>,
95 ) -> Self {
96 let core = generic::GapFill {
97 input,
98 time_col,
99 interval,
100 fill_strategies,
101 partition_by_cols: vec![],
102 };
103 Self::new(core)
104 }
105
106 pub fn time_col(&self) -> &InputRef {
107 &self.core.time_col
108 }
109
110 pub fn interval(&self) -> &ExprImpl {
111 &self.core.interval
112 }
113
114 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
115 &self.core.fill_strategies
116 }
117
118 fn infer_prev_row_table(&self) -> TableCatalog {
119 let mut tbl_builder = TableCatalogBuilder::default();
120
121 for field in self.core.schema().fields() {
122 tbl_builder.add_column(field);
123 }
124
125 if self.core.partition_by_cols.is_empty() {
126 if !self.core.schema().fields().is_empty() {
128 tbl_builder.add_order_column(0, OrderType::ascending());
129 }
130 tbl_builder.build(vec![], 0)
131 } else {
132 for pc in &self.core.partition_by_cols {
134 tbl_builder.add_order_column(pc.index(), OrderType::ascending());
135 }
136 let dist_key_indices: Vec<usize> = self
137 .core
138 .partition_by_cols
139 .iter()
140 .map(|c| c.index())
141 .collect();
142 tbl_builder.build(dist_key_indices, 0)
143 }
144 }
145}
146
147impl PlanTreeNodeUnary<Stream> for StreamEowcGapFill {
148 fn input(&self) -> PlanRef<Stream> {
149 self.core.input.clone()
150 }
151
152 fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
153 let mut core = self.core.clone();
154 core.input = input;
155 Self::new(core)
156 }
157}
158
159impl_plan_tree_node_for_unary! { Stream, StreamEowcGapFill }
160impl_distill_by_unit!(StreamEowcGapFill, core, "StreamEowcGapFill");
161
162impl TryToStreamPb for StreamEowcGapFill {
163 fn try_to_stream_prost_body(
164 &self,
165 state: &mut BuildFragmentGraphState,
166 ) -> SchedulerResult<NodeBody> {
167 use risingwave_pb::stream_plan::*;
168
169 let fill_strategies: Vec<String> = self
170 .fill_strategies()
171 .iter()
172 .map(|strategy| match strategy.strategy {
173 crate::binder::FillStrategy::Locf => "locf".to_owned(),
174 crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
175 crate::binder::FillStrategy::Null => "null".to_owned(),
176 })
177 .collect();
178
179 let prev_row_table = self
180 .infer_prev_row_table()
181 .with_id(state.gen_table_id_wrapped())
182 .to_internal_table_prost();
183
184 Ok(NodeBody::EowcGapFill(Box::new(EowcGapFillNode {
185 time_column_index: self.time_col().index() as u32,
186 interval: Some(self.interval().to_expr_proto_checked_pure(
187 self.stream_kind().is_retract(),
188 "gap filling interval",
189 )?),
190 fill_columns: self
191 .fill_strategies()
192 .iter()
193 .map(|strategy| strategy.target_col.index() as u32)
194 .collect(),
195 fill_strategies,
196 prev_row_table: Some(prev_row_table),
197 partition_by_indices: self
198 .core
199 .partition_by_cols
200 .iter()
201 .map(|c| c.index() as u32)
202 .collect(),
203 })))
204 }
205}
206
207impl ExprRewritable<Stream> for StreamEowcGapFill {
208 fn has_rewritable_expr(&self) -> bool {
209 true
210 }
211
212 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
213 let mut core = self.core.clone();
214 core.rewrite_exprs(r);
215 Self {
216 base: self.base.clone_with_new_plan_id(),
217 core,
218 }
219 .into()
220 }
221}
222
223impl ExprVisitable for StreamEowcGapFill {
224 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
225 self.core.visit_exprs(v)
226 }
227}