risingwave_frontend/optimizer/plan_node/
logical_gap_fill.rs1use super::stream::StreamPlanNodeMetadata;
18use super::{
19 ColPrunable, ColumnPruningContext, ExprRewritable, ExprVisitable, Logical, LogicalFilter,
20 LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
21 PredicatePushdownContext, ToBatch, ToStream, ToStreamContext, generic,
22};
23use crate::binder::BoundFillStrategy;
24use crate::error::Result;
25use crate::expr::{ExprImpl, InputRef};
26use crate::optimizer::plan_node::utils::impl_distill_by_unit;
27use crate::utils::{ColIndexMapping, Condition};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct LogicalGapFill {
33 pub base: PlanBase<super::Logical>,
34 core: generic::GapFill<PlanRef>,
35}
36
37impl LogicalGapFill {
38 pub fn new(
39 input: PlanRef,
40 time_col: InputRef,
41 interval: ExprImpl,
42 fill_strategies: Vec<BoundFillStrategy>,
43 ) -> Self {
44 let core = generic::GapFill {
45 input,
46 time_col,
47 interval,
48 fill_strategies,
49 };
50 let base = PlanBase::new_logical_with_core(&core);
51 Self { base, core }
52 }
53
54 pub fn time_col(&self) -> &InputRef {
55 &self.core.time_col
56 }
57
58 pub fn interval(&self) -> &ExprImpl {
59 &self.core.interval
60 }
61
62 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
63 &self.core.fill_strategies
64 }
65}
66
67impl PlanTreeNodeUnary<Logical> for LogicalGapFill {
68 fn input(&self) -> PlanRef {
69 self.core.input.clone()
70 }
71
72 fn clone_with_input(&self, input: PlanRef) -> Self {
73 Self::new(
74 input,
75 self.time_col().clone(),
76 self.interval().clone(),
77 self.fill_strategies().to_vec(),
78 )
79 }
80}
81
82impl_plan_tree_node_for_unary! { Logical, LogicalGapFill }
83impl_distill_by_unit!(LogicalGapFill, core, "LogicalGapFill");
84
85impl ColPrunable for LogicalGapFill {
86 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
87 let new_input = self.input().prune_col(required_cols, ctx);
89 self.clone_with_input(new_input).into()
90 }
91}
92
93impl ExprRewritable<Logical> for LogicalGapFill {
94 fn has_rewritable_expr(&self) -> bool {
95 true
96 }
97
98 fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
99 let mut core = self.core.clone();
100 core.rewrite_exprs(r);
101 Self {
102 base: self.base.clone(),
103 core,
104 }
105 .into()
106 }
107}
108
109impl ExprVisitable for LogicalGapFill {
110 fn visit_exprs(&self, v: &mut dyn crate::expr::ExprVisitor) {
111 self.core.visit_exprs(v);
112 }
113}
114
115impl PredicatePushdown for LogicalGapFill {
116 fn predicate_pushdown(
117 &self,
118 predicate: Condition,
119 _ctx: &mut PredicatePushdownContext,
120 ) -> PlanRef {
121 LogicalFilter::create(self.clone().into(), predicate)
122 }
123}
124
125impl ToBatch for LogicalGapFill {
126 fn to_batch(&self) -> Result<super::BatchPlanRef> {
127 unimplemented!("batch gap fill")
128 }
129}
130
131impl ToStream for LogicalGapFill {
132 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<super::StreamPlanRef> {
133 use super::{StreamEowcGapFill, StreamGapFill};
134 use crate::error::ErrorCode;
135 use crate::optimizer::property::RequiredDist;
136
137 let stream_input = self.input().to_stream(ctx)?;
138
139 let new_input = RequiredDist::single().streaming_enforce_if_not_satisfies(stream_input)?;
142
143 let time_col_idx = self.core.time_col.index();
145 let input_stream_key = new_input.expect_stream_key();
146 if !(input_stream_key.len() == 1 && input_stream_key[0] == time_col_idx) {
147 return Err(ErrorCode::NotSupported(
148 "GAP_FILL requires the time column to be the sole primary key.".to_owned(),
149 format!("The time column (index {}) must be the only primary key. Found stream key: {:?}", time_col_idx, input_stream_key),
150 )
151 .into());
152 }
153
154 let core = generic::GapFill {
155 input: new_input.clone(),
156 time_col: self.core.time_col.clone(),
157 interval: self.core.interval.clone(),
158 fill_strategies: self.core.fill_strategies.clone(),
159 };
160
161 if ctx.emit_on_window_close() {
162 let input_watermark_cols = new_input.watermark_columns();
164 if !input_watermark_cols.contains(time_col_idx) {
165 return Err(ErrorCode::NotSupported(
166 "GAP_FILL with EMIT ON WINDOW CLOSE requires a watermark on the time column."
167 .to_owned(),
168 format!(
169 "Please define a watermark on the time column (column index {}).",
170 time_col_idx
171 ),
172 )
173 .into());
174 }
175
176 Ok(StreamEowcGapFill::new(core).into())
177 } else {
178 Ok(StreamGapFill::new(core).into())
179 }
180 }
181
182 fn logical_rewrite_for_stream(
183 &self,
184 _ctx: &mut super::convert::RewriteStreamContext,
185 ) -> Result<(PlanRef, ColIndexMapping)> {
186 let (input, mut col_index_mapping) = self.input().logical_rewrite_for_stream(_ctx)?;
187 let mut new_core = self.core.clone();
188 new_core.input = input;
189
190 if col_index_mapping.is_identity() {
191 return Ok((
192 Self {
193 base: self.base.clone_with_new_plan_id(),
194 core: new_core,
195 }
196 .into(),
197 col_index_mapping,
198 ));
199 }
200
201 new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
202
203 Ok((
204 Self {
205 base: self.base.clone_with_new_plan_id(),
206 core: new_core,
207 }
208 .into(),
209 col_index_mapping,
210 ))
211 }
212}