risingwave_frontend/optimizer/plan_node/
stream_gap_fill.rs1use risingwave_common::catalog::Field;
16use risingwave_common::types::DataType;
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::stream_node::NodeBody;
19
20use super::generic::GenericPlanNode;
21use super::utils::TableCatalogBuilder;
22use super::{
23 ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, TryToStreamPb,
24 generic,
25};
26use crate::binder::BoundFillStrategy;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
28use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
29use crate::optimizer::plan_node::utils::impl_distill_by_unit;
30use crate::optimizer::property::Distribution;
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamGapFill {
38 pub base: PlanBase<super::Stream>,
39 core: generic::GapFill<PlanRef<Stream>>,
40}
41
42impl StreamGapFill {
43 pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
44 let input = &core.input;
45
46 let base = PlanBase::new_stream_with_core(
50 &core,
51 Distribution::Single,
52 input.stream_kind(),
53 false, input.watermark_columns().clone(),
55 input.columns_monotonicity().clone(),
56 );
57 Self { base, core }
58 }
59
60 pub fn new_with_args(
62 input: PlanRef<Stream>,
63 time_col: InputRef,
64 interval: ExprImpl,
65 fill_strategies: Vec<BoundFillStrategy>,
66 ) -> Self {
67 let core = generic::GapFill {
68 input,
69 time_col,
70 interval,
71 fill_strategies,
72 };
73 Self::new(core)
74 }
75
76 pub fn time_col(&self) -> &InputRef {
77 &self.core.time_col
78 }
79
80 pub fn interval(&self) -> &ExprImpl {
81 &self.core.interval
82 }
83
84 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
85 &self.core.fill_strategies
86 }
87
88 fn infer_state_table(&self) -> crate::TableCatalog {
89 let mut tbl_builder = TableCatalogBuilder::default();
90
91 let out_schema = self.core.schema();
92 for field in out_schema.fields() {
93 tbl_builder.add_column(field);
94 }
95
96 let time_col_idx = self.time_col().index();
99 tbl_builder.add_order_column(time_col_idx, OrderType::ascending());
100
101 tbl_builder.add_column(&Field::with_name(DataType::Boolean, "is_filled"));
103 tbl_builder.build(vec![], 0)
104 }
105}
106
107impl PlanTreeNodeUnary<Stream> for StreamGapFill {
108 fn input(&self) -> PlanRef<Stream> {
109 self.core.input.clone()
110 }
111
112 fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
113 let mut core = self.core.clone();
114 core.input = input;
115 Self::new(core)
116 }
117}
118
119impl_plan_tree_node_for_unary! { Stream, StreamGapFill }
120impl_distill_by_unit!(StreamGapFill, core, "StreamGapFill");
121
122impl TryToStreamPb for StreamGapFill {
123 fn try_to_stream_prost_body(
124 &self,
125 state: &mut BuildFragmentGraphState,
126 ) -> SchedulerResult<NodeBody> {
127 use risingwave_pb::stream_plan::*;
128
129 let fill_strategies: Vec<String> = self
130 .fill_strategies()
131 .iter()
132 .map(|strategy| match strategy.strategy {
133 crate::binder::FillStrategy::Locf => "locf".to_owned(),
134 crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
135 crate::binder::FillStrategy::Null => "null".to_owned(),
136 })
137 .collect();
138
139 let state_table = self
140 .infer_state_table()
141 .with_id(state.gen_table_id_wrapped())
142 .to_internal_table_prost();
143
144 Ok(NodeBody::GapFill(Box::new(GapFillNode {
145 time_column_index: self.time_col().index() as u32,
146 interval: Some(self.interval().to_expr_proto_checked_pure(
147 self.stream_kind().is_retract(),
148 "gap filling interval",
149 )?),
150 fill_columns: self
151 .fill_strategies()
152 .iter()
153 .map(|strategy| strategy.target_col.index() as u32)
154 .collect(),
155 fill_strategies,
156 state_table: Some(state_table),
157 })))
158 }
159}
160
161impl ExprRewritable<Stream> for StreamGapFill {
162 fn has_rewritable_expr(&self) -> bool {
163 true
164 }
165
166 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
167 let mut core = self.core.clone();
168 core.rewrite_exprs(r);
169 Self {
170 base: self.base.clone_with_new_plan_id(),
171 core,
172 }
173 .into()
174 }
175}
176
177impl ExprVisitable for StreamGapFill {
178 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
179 self.core.visit_exprs(v)
180 }
181}