risingwave_frontend/optimizer/plan_node/
logical_gap_fill.rs1use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_expr::bail;
18
19use super::stream::StreamPlanNodeMetadata;
20use super::{
21 ColPrunable, ColumnPruningContext, ExprRewritable, ExprVisitable, Logical, LogicalFilter,
22 LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
23 PredicatePushdownContext, ToBatch, ToStream, ToStreamContext, generic,
24};
25use crate::binder::BoundFillStrategy;
26use crate::error::Result;
27use crate::expr::{ExprImpl, InputRef};
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::utils::{ColIndexMapping, Condition};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalGapFill {
35 pub base: PlanBase<super::Logical>,
36 core: generic::GapFill<PlanRef>,
37}
38
39impl LogicalGapFill {
40 pub fn new(
41 input: PlanRef,
42 time_col: InputRef,
43 interval: ExprImpl,
44 fill_strategies: Vec<BoundFillStrategy>,
45 ) -> Self {
46 let core = generic::GapFill {
47 input,
48 time_col,
49 interval,
50 fill_strategies,
51 };
52 let base = PlanBase::new_logical_with_core(&core);
53 Self { base, core }
54 }
55
56 pub fn time_col(&self) -> &InputRef {
57 &self.core.time_col
58 }
59
60 pub fn interval(&self) -> &ExprImpl {
61 &self.core.interval
62 }
63
64 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
65 &self.core.fill_strategies
66 }
67}
68
69impl PlanTreeNodeUnary<Logical> for LogicalGapFill {
70 fn input(&self) -> PlanRef {
71 self.core.input.clone()
72 }
73
74 fn clone_with_input(&self, input: PlanRef) -> Self {
75 Self::new(
76 input,
77 self.time_col().clone(),
78 self.interval().clone(),
79 self.fill_strategies().to_vec(),
80 )
81 }
82}
83
84impl_plan_tree_node_for_unary! { Logical, LogicalGapFill }
85impl_distill_by_unit!(LogicalGapFill, core, "LogicalGapFill");
86
87impl ColPrunable for LogicalGapFill {
88 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
89 let input_col_num = self.input().schema().len();
90 let mut input_required = FixedBitSet::from_iter(required_cols.iter().copied());
91 input_required.insert(self.time_col().index());
92 input_required.union_with(&self.interval().collect_input_refs(input_col_num));
93 self.fill_strategies()
94 .iter()
95 .for_each(|s| input_required.insert(s.target_col.index()));
96
97 let input_required_cols: Vec<_> = input_required.ones().collect();
98 let mut col_index_mapping =
99 ColIndexMapping::with_remaining_columns(&input_required_cols, input_col_num);
100
101 let mut new_core = self.core.clone();
102 new_core.input = self.input().prune_col(&input_required_cols, ctx);
103 new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
104
105 let logical_gap_fill = Self {
106 base: PlanBase::new_logical_with_core(&new_core),
107 core: new_core,
108 }
109 .into();
110
111 if input_required_cols == required_cols {
112 logical_gap_fill
113 } else {
114 let output_required_cols = required_cols
115 .iter()
116 .map(|&idx| col_index_mapping.map(idx))
117 .collect_vec();
118 let src_size = logical_gap_fill.schema().len();
119 LogicalProject::with_mapping(
120 logical_gap_fill,
121 ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
122 )
123 .into()
124 }
125 }
126}
127
128impl ExprRewritable<Logical> for LogicalGapFill {
129 fn has_rewritable_expr(&self) -> bool {
130 true
131 }
132
133 fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
134 let mut core = self.core.clone();
135 core.rewrite_exprs(r);
136 Self {
137 base: self.base.clone(),
138 core,
139 }
140 .into()
141 }
142}
143
144impl ExprVisitable for LogicalGapFill {
145 fn visit_exprs(&self, v: &mut dyn crate::expr::ExprVisitor) {
146 self.core.visit_exprs(v);
147 }
148}
149
150impl PredicatePushdown for LogicalGapFill {
151 fn predicate_pushdown(
152 &self,
153 predicate: Condition,
154 _ctx: &mut PredicatePushdownContext,
155 ) -> PlanRef {
156 LogicalFilter::create(self.clone().into(), predicate)
157 }
158}
159
160impl ToBatch for LogicalGapFill {
161 fn to_batch(&self) -> Result<super::BatchPlanRef> {
162 bail!("BatchGapFill is not implemented yet")
163 }
164}
165
166impl ToStream for LogicalGapFill {
167 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<super::StreamPlanRef> {
168 use super::{StreamEowcGapFill, StreamGapFill};
169 use crate::error::ErrorCode;
170 use crate::optimizer::property::RequiredDist;
171
172 let stream_input = self.input().to_stream(ctx)?;
173
174 let new_input = RequiredDist::single().streaming_enforce_if_not_satisfies(stream_input)?;
177
178 let time_col_idx = self.core.time_col.index();
180 let input_stream_key = new_input.expect_stream_key();
181 if !(input_stream_key.len() == 1 && input_stream_key[0] == time_col_idx) {
182 return Err(ErrorCode::NotSupported(
183 "GAP_FILL requires the time column to be the sole primary key.".to_owned(),
184 format!("The time column (index {}) must be the only primary key. Found stream key: {:?}", time_col_idx, input_stream_key),
185 )
186 .into());
187 }
188
189 let core = generic::GapFill {
190 input: new_input.clone(),
191 time_col: self.core.time_col.clone(),
192 interval: self.core.interval.clone(),
193 fill_strategies: self.core.fill_strategies.clone(),
194 };
195
196 if ctx.emit_on_window_close() {
197 let input_watermark_cols = new_input.watermark_columns();
199 if !input_watermark_cols.contains(time_col_idx) {
200 return Err(ErrorCode::NotSupported(
201 "GAP_FILL with EMIT ON WINDOW CLOSE requires a watermark on the time column."
202 .to_owned(),
203 format!(
204 "Please define a watermark on the time column (column index {}).",
205 time_col_idx
206 ),
207 )
208 .into());
209 }
210
211 Ok(StreamEowcGapFill::new(core).into())
212 } else {
213 Ok(StreamGapFill::new(core).into())
214 }
215 }
216
217 fn logical_rewrite_for_stream(
218 &self,
219 _ctx: &mut super::convert::RewriteStreamContext,
220 ) -> Result<(PlanRef, ColIndexMapping)> {
221 let (input, mut col_index_mapping) = self.input().logical_rewrite_for_stream(_ctx)?;
222 let mut new_core = self.core.clone();
223 new_core.input = input;
224
225 if col_index_mapping.is_identity() {
226 return Ok((
227 Self {
228 base: self.base.clone_with_new_plan_id(),
229 core: new_core,
230 }
231 .into(),
232 col_index_mapping,
233 ));
234 }
235
236 new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
237
238 Ok((
239 Self {
240 base: PlanBase::new_logical_with_core(&new_core),
241 core: new_core,
242 }
243 .into(),
244 col_index_mapping,
245 ))
246 }
247}