risingwave_frontend/optimizer/plan_node/
stream_gap_fill.rs1use risingwave_common::util::sort_util::OrderType;
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17
18use super::generic::GenericPlanNode;
19use super::utils::TableCatalogBuilder;
20use super::{
21 ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, TryToStreamPb,
22 generic,
23};
24use crate::binder::BoundFillStrategy;
25use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::impl_distill_by_unit;
28use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
29use crate::scheduler::SchedulerResult;
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct StreamGapFill {
36 pub base: PlanBase<super::Stream>,
37 core: generic::GapFill<PlanRef<Stream>>,
38}
39
40impl StreamGapFill {
41 pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
42 let input = &core.input;
43 let partition_indices = core.partition_key_indices();
44 let distinct_partition_key_count = partition_indices
45 .iter()
46 .copied()
47 .collect::<std::collections::HashSet<_>>()
48 .len();
49 assert_eq!(
50 partition_indices.len(),
51 distinct_partition_key_count,
52 "stream gap fill expects canonicalized partition_by columns",
53 );
54 assert!(
55 !core
56 .pointer_key_indices()
57 .expect("stream gap fill input should have stream key")
58 .is_empty(),
59 "stream gap fill pointer key should not be empty",
60 );
61
62 let dist = if core.partition_by_cols.is_empty() {
63 Distribution::Single
64 } else {
65 Distribution::HashShard(partition_indices)
66 };
67
68 let base = PlanBase::new_stream_with_core(
71 &core,
72 dist,
73 input.stream_kind(),
74 false, WatermarkColumns::new(),
76 MonotonicityMap::new(),
77 );
78 Self { base, core }
79 }
80
81 pub fn new_with_args(
83 input: PlanRef<Stream>,
84 time_col: InputRef,
85 interval: ExprImpl,
86 fill_strategies: Vec<BoundFillStrategy>,
87 ) -> Self {
88 let core = generic::GapFill {
89 input,
90 time_col,
91 interval,
92 fill_strategies,
93 partition_by_cols: vec![],
94 };
95 Self::new(core)
96 }
97
98 pub fn time_col(&self) -> &InputRef {
99 &self.core.time_col
100 }
101
102 pub fn interval(&self) -> &ExprImpl {
103 &self.core.interval
104 }
105
106 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
107 &self.core.fill_strategies
108 }
109
110 fn pointer_key_indices(&self) -> Vec<usize> {
111 self.core
112 .pointer_key_indices()
113 .expect("stream gap fill input should have stream key")
114 }
115
116 fn infer_state_table(&self) -> crate::TableCatalog {
117 let mut tbl_builder = TableCatalogBuilder::default();
118
119 let out_schema = self.core.schema();
120 for field in out_schema.fields() {
121 tbl_builder.add_column(field);
122 }
123
124 let state_key_indices = self
125 .core
126 .stream_key_indices()
127 .expect("stream gap fill input should have stream key");
128
129 for key_idx in state_key_indices {
131 tbl_builder.add_order_column(key_idx, OrderType::ascending());
132 }
133
134 let partition_key_indices = self.core.partition_key_indices();
135 let read_prefix_len_hint = partition_key_indices.len();
136 tbl_builder.build(partition_key_indices, read_prefix_len_hint)
137 }
138}
139
140impl PlanTreeNodeUnary<Stream> for StreamGapFill {
141 fn input(&self) -> PlanRef<Stream> {
142 self.core.input.clone()
143 }
144
145 fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
146 let mut core = self.core.clone();
147 core.input = input;
148 Self::new(core)
149 }
150}
151
152impl_plan_tree_node_for_unary! { Stream, StreamGapFill }
153impl_distill_by_unit!(StreamGapFill, core, "StreamGapFill");
154
155impl TryToStreamPb for StreamGapFill {
156 fn try_to_stream_prost_body(
157 &self,
158 state: &mut BuildFragmentGraphState,
159 ) -> SchedulerResult<NodeBody> {
160 use risingwave_pb::stream_plan::*;
161
162 let fill_strategies: Vec<String> = self
163 .fill_strategies()
164 .iter()
165 .map(|strategy| match strategy.strategy {
166 crate::binder::FillStrategy::Locf => "locf".to_owned(),
167 crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
168 crate::binder::FillStrategy::Null => "null".to_owned(),
169 })
170 .collect();
171
172 let state_table = self
173 .infer_state_table()
174 .with_id(state.gen_table_id_wrapped())
175 .to_internal_table_prost();
176
177 Ok(NodeBody::GapFill(Box::new(GapFillNode {
178 pointer_key_indices: self
179 .pointer_key_indices()
180 .into_iter()
181 .map(|idx| idx as u32)
182 .collect(),
183 time_column_index: self.time_col().index() as u32,
184 interval: Some(self.interval().to_expr_proto_checked_pure(
185 self.stream_kind().is_retract(),
186 "gap filling interval",
187 )?),
188 fill_columns: self
189 .fill_strategies()
190 .iter()
191 .map(|strategy| strategy.target_col.index() as u32)
192 .collect(),
193 fill_strategies,
194 state_table: Some(state_table),
195 partition_by_indices: self
196 .core
197 .partition_by_cols
198 .iter()
199 .map(|c| c.index() as u32)
200 .collect(),
201 })))
202 }
203}
204
205impl ExprRewritable<Stream> for StreamGapFill {
206 fn has_rewritable_expr(&self) -> bool {
207 true
208 }
209
210 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
211 let mut core = self.core.clone();
212 core.rewrite_exprs(r);
213 Self {
214 base: self.base.clone_with_new_plan_id(),
215 core,
216 }
217 .into()
218 }
219}
220
221impl ExprVisitable for StreamGapFill {
222 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
223 self.core.visit_exprs(v)
224 }
225}