1use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::types::Interval;
18
19use super::generic::{GenericPlanNode, GenericPlanRef};
20use super::utils::impl_distill_by_unit;
21use super::{
22 BatchHopWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef,
23 PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream,
24 gen_filter_and_pushdown, generic,
25};
26use crate::error::Result;
27use crate::expr::{ExprType, FunctionCall, InputRef};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::{
30 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
31};
32use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalHopWindow {
37 pub base: PlanBase<Logical>,
38 core: generic::HopWindow<PlanRef>,
39}
40
41impl LogicalHopWindow {
42 pub const ADDITION_COLUMN_LEN: usize = 2;
45
46 fn new(
49 input: PlanRef,
50 time_col: InputRef,
51 window_slide: Interval,
52 window_size: Interval,
53 window_offset: Interval,
54 output_indices: Option<Vec<usize>>,
55 ) -> Self {
56 let output_indices =
58 output_indices.unwrap_or_else(|| (0..input.schema().len() + 2).collect_vec());
59 let core = generic::HopWindow {
60 input,
61 time_col,
62 window_slide,
63 window_size,
64 window_offset,
65 output_indices,
66 };
67
68 let ctx = core.ctx();
69
70 let base = PlanBase::new_logical(
71 ctx,
72 core.schema(),
73 core.stream_key(),
74 core.functional_dependency(),
75 );
76
77 LogicalHopWindow { base, core }
78 }
79
80 pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
81 self.core.into_parts()
82 }
83
84 pub fn output_indices_are_trivial(&self) -> bool {
85 self.output_indices() == &(0..self.core.internal_column_num()).collect_vec()
86 }
87
88 pub fn create(
91 input: PlanRef,
92 time_col: InputRef,
93 window_slide: Interval,
94 window_size: Interval,
95 window_offset: Interval,
96 ) -> PlanRef {
97 let input = LogicalFilter::create_with_expr(
98 input,
99 FunctionCall::new(ExprType::IsNotNull, vec![time_col.clone().into()])
100 .unwrap()
101 .into(),
102 );
103 Self::new(
104 input,
105 time_col,
106 window_slide,
107 window_size,
108 window_offset,
109 None,
110 )
111 .into()
112 }
113
114 pub fn output_window_start_col_idx(&self) -> Option<usize> {
115 self.core.output_window_start_col_idx()
116 }
117
118 pub fn output_window_end_col_idx(&self) -> Option<usize> {
119 self.core.output_window_end_col_idx()
120 }
121
122 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
123 self.core.o2i_col_mapping()
124 }
125
126 pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
127 self.core.output2internal_col_mapping()
128 }
129
130 pub fn clone_with_output_indices(&self, output_indices: Vec<usize>) -> Self {
131 Self::new(
132 self.input(),
133 self.core.time_col.clone(),
134 self.core.window_slide,
135 self.core.window_size,
136 self.core.window_offset,
137 Some(output_indices),
138 )
139 }
140
141 pub fn output_indices(&self) -> &Vec<usize> {
143 &self.core.output_indices
144 }
145}
146
147impl PlanTreeNodeUnary for LogicalHopWindow {
148 fn input(&self) -> PlanRef {
149 self.core.input.clone()
150 }
151
152 fn clone_with_input(&self, input: PlanRef) -> Self {
153 Self::new(
154 input,
155 self.core.time_col.clone(),
156 self.core.window_slide,
157 self.core.window_size,
158 self.core.window_offset,
159 Some(self.core.output_indices.clone()),
160 )
161 }
162
163 fn rewrite_with_input(
164 &self,
165 input: PlanRef,
166 input_col_change: ColIndexMapping,
167 ) -> (Self, ColIndexMapping) {
168 let mut time_col = self.core.time_col.clone();
169 time_col.index = input_col_change.map(time_col.index);
170 let mut columns_to_be_kept = Vec::new();
171 let new_output_indices = self
172 .core
173 .output_indices
174 .iter()
175 .enumerate()
176 .filter_map(|(i, &idx)| match input_col_change.try_map(idx) {
177 Some(new_idx) => {
178 columns_to_be_kept.push(i);
179 Some(new_idx)
180 }
181 None => {
182 if idx == self.core.internal_window_start_col_idx() {
183 columns_to_be_kept.push(i);
184 Some(input.schema().len())
185 } else if idx == self.core.internal_window_end_col_idx() {
186 columns_to_be_kept.push(i);
187 Some(input.schema().len() + 1)
188 } else {
189 None
190 }
191 }
192 })
193 .collect_vec();
194 let new_hop = Self::new(
195 input,
196 time_col,
197 self.core.window_slide,
198 self.core.window_size,
199 self.core.window_offset,
200 Some(new_output_indices),
201 );
202 (
203 new_hop,
204 ColIndexMapping::with_remaining_columns(
205 &columns_to_be_kept,
206 self.core.output_indices.len(),
207 ),
208 )
209 }
210}
211
212impl_plan_tree_node_for_unary! {LogicalHopWindow}
213impl_distill_by_unit!(LogicalHopWindow, core, "LogicalHopWindow");
214
215impl ColPrunable for LogicalHopWindow {
216 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
217 let o2i = self.o2i_col_mapping();
218 let input_required_cols = {
219 let mut tmp = FixedBitSet::with_capacity(self.schema().len());
220 tmp.extend(required_cols.iter().copied());
221 tmp = o2i.rewrite_bitset(&tmp);
222 tmp.put(self.core.time_col.index());
225 tmp.ones().collect_vec()
226 };
227 let input = self.input().prune_col(&input_required_cols, ctx);
228 let input_change = ColIndexMapping::with_remaining_columns(
229 &input_required_cols,
230 self.input().schema().len(),
231 );
232 let (new_hop, _) = self.rewrite_with_input(input, input_change.clone());
233 let output_cols = {
234 #[derive(Copy, Clone, Debug)]
237 enum IndexType {
238 Input(usize),
239 WindowStart,
240 WindowEnd,
241 }
242 let output2internal = self.output2internal_col_mapping();
243 let input_required_cols = required_cols
245 .iter()
246 .filter_map(|&idx| {
247 if let Some(idx) = o2i.try_map(idx) {
248 Some(IndexType::Input(idx))
249 } else if let Some(idx) = output2internal.try_map(idx) {
250 if idx == self.core.internal_window_start_col_idx() {
251 Some(IndexType::WindowStart)
252 } else if idx == self.core.internal_window_end_col_idx() {
253 Some(IndexType::WindowEnd)
254 } else {
255 None
256 }
257 } else {
258 None
259 }
260 })
261 .collect_vec();
262 input_required_cols
264 .iter()
265 .filter_map(|&idx| match idx {
266 IndexType::Input(x) => input_change.try_map(x),
267 IndexType::WindowStart => Some(new_hop.core.internal_window_start_col_idx()),
268 IndexType::WindowEnd => Some(new_hop.core.internal_window_end_col_idx()),
269 })
270 .collect_vec()
271 };
272 new_hop.clone_with_output_indices(output_cols).into()
273 }
274}
275
276impl ExprRewritable for LogicalHopWindow {}
277
278impl ExprVisitable for LogicalHopWindow {}
279
280impl PredicatePushdown for LogicalHopWindow {
281 fn predicate_pushdown(
284 &self,
285 predicate: Condition,
286 ctx: &mut PredicatePushdownContext,
287 ) -> PlanRef {
288 let mut window_columns = FixedBitSet::with_capacity(self.schema().len());
289
290 let window_start_idx = self.core.internal_window_start_col_idx();
291 let window_end_idx = self.core.internal_window_end_col_idx();
292 for (i, v) in self.output_indices().iter().enumerate() {
293 if *v == window_start_idx || *v == window_end_idx {
294 window_columns.insert(i);
295 }
296 }
297 let (time_window_pred, pushed_predicate) = predicate.split_disjoint(&window_columns);
298 let mut mapping = self.o2i_col_mapping();
299 let pushed_predicate = pushed_predicate.rewrite_expr(&mut mapping);
300 gen_filter_and_pushdown(self, time_window_pred, pushed_predicate, ctx)
301 }
302}
303
304impl ToBatch for LogicalHopWindow {
305 fn to_batch(&self) -> Result<PlanRef> {
306 let new_input = self.input().to_batch()?;
307 let mut new_logical = self.core.clone();
308 new_logical.input = new_input;
309 let (window_start_exprs, window_end_exprs) =
310 new_logical.derive_window_start_and_end_exprs()?;
311 Ok(BatchHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
312 }
313}
314
315impl ToStream for LogicalHopWindow {
316 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
317 let new_input = self.input().to_stream(ctx)?;
318 let mut new_logical = self.core.clone();
319 new_logical.input = new_input;
320 let (window_start_exprs, window_end_exprs) =
321 new_logical.derive_window_start_and_end_exprs()?;
322 Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
323 }
324
325 fn logical_rewrite_for_stream(
326 &self,
327 ctx: &mut RewriteStreamContext,
328 ) -> Result<(PlanRef, ColIndexMapping)> {
329 let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
330 let (hop, out_col_change) = self.rewrite_with_input(input, input_col_change);
331 let (input, time_col, window_slide, window_size, window_offset, mut output_indices) =
332 hop.into_parts();
333 if !output_indices.contains(&input.schema().len())
334 && !output_indices.contains(&(input.schema().len() + 1))
335 {
338 output_indices.push(input.schema().len());
339 }
340 let i2o = self.core.i2o_col_mapping();
341 output_indices.extend(
342 input
343 .expect_stream_key()
344 .iter()
345 .cloned()
346 .filter(|i| i2o.try_map(*i).is_none()),
347 );
348 let new_hop = Self::new(
349 input,
350 time_col,
351 window_slide,
352 window_size,
353 window_offset,
354 Some(output_indices),
355 );
356 Ok((new_hop.into(), out_col_change))
357 }
358}
359
360#[cfg(test)]
361mod test {
362 use std::collections::HashSet;
363
364 use risingwave_common::catalog::{Field, Schema};
365 use risingwave_common::types::DataType;
366
367 use super::*;
368 use crate::Explain;
369 use crate::optimizer::optimizer_context::OptimizerContext;
370 use crate::optimizer::plan_node::LogicalValues;
371 use crate::optimizer::property::FunctionalDependency;
372 #[tokio::test]
373 async fn test_prune_hop_window_with_order_required() {
384 let ctx = OptimizerContext::mock().await;
385 let fields: Vec<Field> = vec![
386 Field::with_name(DataType::Date, "date"),
387 Field::with_name(DataType::Int32, "v1"),
388 Field::with_name(DataType::Int32, "v2"),
389 ];
390 let values = LogicalValues::new(
391 vec![],
392 Schema {
393 fields: fields.clone(),
394 },
395 ctx,
396 );
397 let hop_window: PlanRef = LogicalHopWindow::new(
398 values.into(),
399 InputRef::new(0, DataType::Date),
400 Interval::from_month_day_usec(0, 1, 0),
401 Interval::from_month_day_usec(0, 3, 0),
402 Interval::from_month_day_usec(0, 0, 0),
403 None,
404 )
405 .into();
406 let required_cols = vec![4, 2, 3];
408 let plan = hop_window.prune_col(
409 &required_cols,
410 &mut ColumnPruningContext::new(hop_window.clone()),
411 );
412 println!(
413 "{}\n{}",
414 hop_window.explain_to_string(),
415 plan.explain_to_string()
416 );
417 let hop_window = plan.as_logical_hop_window().unwrap();
419 assert_eq!(hop_window.core.output_indices, vec![3, 1, 2]);
420 assert_eq!(hop_window.schema().fields().len(), 3);
421
422 let values = hop_window.input();
423 let values = values.as_logical_values().unwrap();
424 assert_eq!(values.schema().fields().len(), 2);
425 assert_eq!(values.schema().fields()[0], fields[0]);
426 assert_eq!(values.schema().fields()[1], fields[2]);
427
428 let required_cols = (0..plan.schema().len()).collect_vec();
429 let plan2 = plan.prune_col(&required_cols, &mut ColumnPruningContext::new(plan.clone()));
430 assert_eq!(plan2.schema(), plan.schema());
431 }
432
433 #[tokio::test]
434 async fn fd_derivation_hop_window() {
435 let ctx = OptimizerContext::mock().await;
442 let fields: Vec<Field> = vec![
443 Field::with_name(DataType::Date, "date"),
444 Field::with_name(DataType::Int32, "v1"),
445 Field::with_name(DataType::Int32, "v2"),
446 ];
447 let mut values = LogicalValues::new(vec![], Schema { fields }, ctx);
448 values
450 .base
451 .functional_dependency_mut()
452 .add_functional_dependency_by_column_indices(&[0, 1], &[2]);
453 let hop_window: PlanRef = LogicalHopWindow::new(
454 values.into(),
455 InputRef::new(0, DataType::Date),
456 Interval::from_month_day_usec(0, 1, 0),
457 Interval::from_month_day_usec(0, 3, 0),
458 Interval::from_month_day_usec(0, 0, 0),
459 None,
460 )
461 .into();
462 let fd_set: HashSet<_> = hop_window
463 .functional_dependency()
464 .as_dependencies()
465 .iter()
466 .cloned()
467 .collect();
468 let expected_fd_set: HashSet<_> = [
469 FunctionalDependency::with_indices(5, &[0, 1], &[2]),
470 FunctionalDependency::with_indices(5, &[3], &[4]),
471 FunctionalDependency::with_indices(5, &[4], &[3]),
472 ]
473 .into_iter()
474 .collect();
475 assert_eq!(fd_set, expected_fd_set);
476 }
477}