risingwave_frontend/optimizer/plan_node/
logical_gap_fill.rs1use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_expr::bail;
18
19use super::stream::StreamPlanNodeMetadata;
20use super::{
21 ColPrunable, ColumnPruningContext, ExprRewritable, ExprVisitable, Logical, LogicalFilter,
22 LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
23 PredicatePushdownContext, ToBatch, ToStream, ToStreamContext, generic,
24};
25use crate::binder::BoundFillStrategy;
26use crate::error::Result;
27use crate::expr::{ExprImpl, InputRef};
28use crate::optimizer::plan_node::utils::impl_distill_by_unit;
29use crate::utils::{ColIndexMapping, Condition};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalGapFill {
35 pub base: PlanBase<super::Logical>,
36 core: generic::GapFill<PlanRef>,
37}
38
39impl LogicalGapFill {
40 pub fn new(
41 input: PlanRef,
42 time_col: InputRef,
43 interval: ExprImpl,
44 fill_strategies: Vec<BoundFillStrategy>,
45 partition_by_cols: Vec<InputRef>,
46 ) -> Self {
47 let core = generic::GapFill {
48 input,
49 time_col,
50 interval,
51 fill_strategies,
52 partition_by_cols,
53 };
54 let base = PlanBase::new_logical_with_core(&core);
55 Self { base, core }
56 }
57
58 pub fn time_col(&self) -> &InputRef {
59 &self.core.time_col
60 }
61
62 pub fn interval(&self) -> &ExprImpl {
63 &self.core.interval
64 }
65
66 pub fn fill_strategies(&self) -> &[BoundFillStrategy] {
67 &self.core.fill_strategies
68 }
69
70 pub fn partition_by_cols(&self) -> &[InputRef] {
71 &self.core.partition_by_cols
72 }
73}
74
75impl PlanTreeNodeUnary<Logical> for LogicalGapFill {
76 fn input(&self) -> PlanRef {
77 self.core.input.clone()
78 }
79
80 fn clone_with_input(&self, input: PlanRef) -> Self {
81 Self::new(
82 input,
83 self.time_col().clone(),
84 self.interval().clone(),
85 self.fill_strategies().to_vec(),
86 self.partition_by_cols().to_vec(),
87 )
88 }
89}
90
91impl_plan_tree_node_for_unary! { Logical, LogicalGapFill }
92impl_distill_by_unit!(LogicalGapFill, core, "LogicalGapFill");
93
94impl ColPrunable for LogicalGapFill {
95 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
96 let input_col_num = self.input().schema().len();
97 let mut input_required = FixedBitSet::with_capacity(input_col_num);
98 for &required_col in required_cols {
99 input_required.insert(required_col);
100 }
101 input_required.insert(self.time_col().index());
102 input_required.union_with(&self.interval().collect_input_refs(input_col_num));
103 self.fill_strategies()
104 .iter()
105 .for_each(|s| input_required.insert(s.target_col.index()));
106 self.partition_by_cols()
107 .iter()
108 .for_each(|c| input_required.insert(c.index()));
109
110 let input_required_cols: Vec<_> = input_required.ones().collect();
111 let mut col_index_mapping =
112 ColIndexMapping::with_remaining_columns(&input_required_cols, input_col_num);
113
114 let mut new_core = self.core.clone();
115 new_core.input = self.input().prune_col(&input_required_cols, ctx);
116 new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
117
118 let logical_gap_fill = Self {
119 base: PlanBase::new_logical_with_core(&new_core),
120 core: new_core,
121 }
122 .into();
123
124 if input_required_cols == required_cols {
125 logical_gap_fill
126 } else {
127 let output_required_cols = required_cols
128 .iter()
129 .map(|&idx| col_index_mapping.map(idx))
130 .collect_vec();
131 let src_size = logical_gap_fill.schema().len();
132 LogicalProject::with_mapping(
133 logical_gap_fill,
134 ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
135 )
136 .into()
137 }
138 }
139}
140
141impl ExprRewritable<Logical> for LogicalGapFill {
142 fn has_rewritable_expr(&self) -> bool {
143 true
144 }
145
146 fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
147 let mut core = self.core.clone();
148 core.rewrite_exprs(r);
149 Self {
150 base: self.base.clone(),
151 core,
152 }
153 .into()
154 }
155}
156
157impl ExprVisitable for LogicalGapFill {
158 fn visit_exprs(&self, v: &mut dyn crate::expr::ExprVisitor) {
159 self.core.visit_exprs(v);
160 }
161}
162
163impl PredicatePushdown for LogicalGapFill {
164 fn predicate_pushdown(
165 &self,
166 predicate: Condition,
167 _ctx: &mut PredicatePushdownContext,
168 ) -> PlanRef {
169 LogicalFilter::create(self.clone().into(), predicate)
170 }
171}
172
173impl ToBatch for LogicalGapFill {
174 fn to_batch(&self) -> Result<super::BatchPlanRef> {
175 bail!("BatchGapFill is not implemented yet")
176 }
177}
178
179impl ToStream for LogicalGapFill {
180 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<super::StreamPlanRef> {
181 use super::{StreamEowcGapFill, StreamEowcSort, StreamGapFill};
182 use crate::error::ErrorCode;
183 use crate::optimizer::property::RequiredDist;
184
185 let stream_input = self.input().to_stream(ctx)?;
186 let partition_cols = &self.core.partition_by_cols;
187
188 let new_input = if partition_cols.is_empty() {
190 RequiredDist::single().streaming_enforce_if_not_satisfies(stream_input)?
192 } else {
193 let partition_indices: Vec<usize> = partition_cols.iter().map(|c| c.index()).collect();
195 RequiredDist::shard_by_key(stream_input.schema().len(), &partition_indices)
196 .streaming_enforce_if_not_satisfies(stream_input)?
197 };
198
199 if ctx.emit_on_window_close() {
203 let time_col_idx = self.core.time_col.index();
205 let input_watermark_cols = new_input.watermark_columns();
206 if !input_watermark_cols.contains(time_col_idx) {
207 return Err(ErrorCode::NotSupported(
208 "GAP_FILL with EMIT ON WINDOW CLOSE requires a watermark on the time column."
209 .to_owned(),
210 format!(
211 "Please define a watermark on the time column (column index {}).",
212 time_col_idx
213 ),
214 )
215 .into());
216 }
217
218 let sorted_input = StreamEowcSort::new(new_input, time_col_idx).into();
219 let core = generic::GapFill {
220 input: sorted_input,
221 time_col: self.core.time_col.clone(),
222 interval: self.core.interval.clone(),
223 fill_strategies: self.core.fill_strategies.clone(),
224 partition_by_cols: self.core.partition_by_cols.clone(),
225 };
226 Ok(StreamEowcGapFill::new(core).into())
227 } else {
228 let core = generic::GapFill {
229 input: new_input,
230 time_col: self.core.time_col.clone(),
231 interval: self.core.interval.clone(),
232 fill_strategies: self.core.fill_strategies.clone(),
233 partition_by_cols: self.core.partition_by_cols.clone(),
234 };
235 Ok(StreamGapFill::new(core).into())
236 }
237 }
238
239 fn logical_rewrite_for_stream(
240 &self,
241 _ctx: &mut super::convert::RewriteStreamContext,
242 ) -> Result<(PlanRef, ColIndexMapping)> {
243 let (input, mut col_index_mapping) = self.input().logical_rewrite_for_stream(_ctx)?;
244 let mut new_core = self.core.clone();
245 new_core.input = input;
246
247 if col_index_mapping.is_identity() {
248 return Ok((
249 Self {
250 base: PlanBase::new_logical_with_core(&new_core),
251 core: new_core,
252 }
253 .into(),
254 col_index_mapping,
255 ));
256 }
257
258 new_core.rewrite_with_col_index_mapping(&mut col_index_mapping);
259
260 Ok((
261 Self {
262 base: PlanBase::new_logical_with_core(&new_core),
263 core: new_core,
264 }
265 .into(),
266 col_index_mapping,
267 ))
268 }
269}