risingwave_frontend/optimizer/plan_node/
stream_gap_fill.rs1use risingwave_common::catalog::Field;
16use risingwave_common::types::DataType;
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::stream_node::NodeBody;
19
20use super::generic::GenericPlanNode;
21use super::utils::TableCatalogBuilder;
22use super::{
23 ExprRewritable, ExprVisitable, PlanBase, PlanRef, PlanTreeNodeUnary, Stream, StreamNode,
24 generic,
25};
26use crate::binder::BoundFillStrategy;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
28use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
29use crate::optimizer::plan_node::utils::impl_distill_by_unit;
30use crate::optimizer::property::Distribution;
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamGapFill {
37 pub base: PlanBase<super::Stream>,
38 core: generic::GapFill<PlanRef<Stream>>,
39}
40
41impl StreamGapFill {
42 pub fn new(core: generic::GapFill<PlanRef<Stream>>) -> Self {
43 let input = &core.input;
44
45 let base = PlanBase::new_stream_with_core(
49 &core,
50 Distribution::Single,
51 input.stream_kind(),
52 false, input.watermark_columns().clone(),
54 input.columns_monotonicity().clone(),
55 );
56 Self { base, core }
57 }
58
59 pub fn new_with_args(
61 input: PlanRef<Stream>,
62 time_col: InputRef,
63 interval: ExprImpl,
64 fill_strategies: Vec<BoundFillStrategy>,
65 ) -> Self {
66 let core = generic::GapFill {
67 input,
68 time_col,
69 interval,
70 fill_strategies,
71 };
72 Self::new(core)
73 }
74
75 pub fn time_col(&self) -> &InputRef {
76 &self.core.time_col
77 }
78
79 pub fn interval(&self) -> &ExprImpl {
80 &self.core.interval
81 }
82
83 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
84 &self.core.fill_strategies
85 }
86
87 fn infer_state_table(&self) -> crate::TableCatalog {
88 let mut tbl_builder = TableCatalogBuilder::default();
89
90 let out_schema = self.core.schema();
91 for field in out_schema.fields() {
92 tbl_builder.add_column(field);
93 }
94
95 let time_col_idx = self.time_col().index();
98 tbl_builder.add_order_column(time_col_idx, OrderType::ascending());
99
100 tbl_builder.add_column(&Field::with_name(DataType::Boolean, "is_filled"));
102 tbl_builder.build(vec![], 0)
103 }
104}
105
106impl PlanTreeNodeUnary<Stream> for StreamGapFill {
107 fn input(&self) -> PlanRef<Stream> {
108 self.core.input.clone()
109 }
110
111 fn clone_with_input(&self, input: PlanRef<Stream>) -> Self {
112 let mut core = self.core.clone();
113 core.input = input;
114 Self::new(core)
115 }
116}
117
118impl_plan_tree_node_for_unary! { Stream, StreamGapFill }
119impl_distill_by_unit!(StreamGapFill, core, "StreamGapFill");
120
121impl StreamNode for StreamGapFill {
122 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
123 use risingwave_pb::stream_plan::*;
124
125 let fill_strategies: Vec<String> = self
126 .fill_strategies()
127 .iter()
128 .map(|strategy| match strategy.strategy {
129 crate::binder::FillStrategy::Locf => "locf".to_owned(),
130 crate::binder::FillStrategy::Interpolate => "interpolate".to_owned(),
131 crate::binder::FillStrategy::Null => "null".to_owned(),
132 })
133 .collect();
134
135 let state_table = self
136 .infer_state_table()
137 .with_id(state.gen_table_id_wrapped())
138 .to_internal_table_prost();
139
140 NodeBody::GapFill(Box::new(GapFillNode {
141 time_column_index: self.time_col().index() as u32,
142 interval: Some(self.interval().to_expr_proto()),
143 fill_columns: self
144 .fill_strategies()
145 .iter()
146 .map(|strategy| strategy.target_col.index() as u32)
147 .collect(),
148 fill_strategies,
149 state_table: Some(state_table),
150 }))
151 }
152}
153
154impl ExprRewritable<Stream> for StreamGapFill {
155 fn has_rewritable_expr(&self) -> bool {
156 true
157 }
158
159 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef<Stream> {
160 let mut core = self.core.clone();
161 core.rewrite_exprs(r);
162 Self {
163 base: self.base.clone_with_new_plan_id(),
164 core,
165 }
166 .into()
167 }
168}
169
170impl ExprVisitable for StreamGapFill {
171 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
172 self.core.visit_exprs(v)
173 }
174}