1use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
19use risingwave_common::{bail_not_implemented, not_implemented};
20use risingwave_expr::aggregate::{AggType, PbAggKind, agg_types};
21use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind};
22
23use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder};
24use super::utils::impl_distill_by_unit;
25use super::{
26 BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter,
27 LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
28 StreamEowcOverWindow, StreamEowcSort, StreamOverWindow, ToBatch, ToStream,
29 gen_filter_and_pushdown,
30};
31use crate::error::{ErrorCode, Result, RwError};
32use crate::expr::{
33 AggCall, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef,
34 WindowFunction,
35};
36use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
37use crate::optimizer::plan_node::logical_agg::LogicalAggBuilder;
38use crate::optimizer::plan_node::{
39 ColumnPruningContext, Literal, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
40};
41use crate::optimizer::property::RequiredDist;
42use crate::utils::{ColIndexMapping, Condition, IndexSet};
43
44struct LogicalOverWindowBuilder<'a> {
45 input_proj_builder: &'a ProjectBuilder,
47 window_functions: &'a mut Vec<WindowFunction>,
49 error: Option<RwError>,
51}
52
53impl<'a> LogicalOverWindowBuilder<'a> {
54 fn new(
55 input_proj_builder: &'a ProjectBuilder,
56 window_functions: &'a mut Vec<WindowFunction>,
57 ) -> Result<Self> {
58 Ok(Self {
59 input_proj_builder,
60 window_functions,
61 error: None,
62 })
63 }
64
65 fn rewrite_selected_items(&mut self, selected_items: Vec<ExprImpl>) -> Result<Vec<ExprImpl>> {
66 let mut rewritten_items = vec![];
67 for expr in selected_items {
68 let rewritten_expr = self.rewrite_expr(expr);
69 if let Some(error) = self.error.take() {
70 return Err(error);
71 } else {
72 rewritten_items.push(rewritten_expr);
73 }
74 }
75 Ok(rewritten_items)
76 }
77
78 fn schema_over_window_start_offset(&self) -> usize {
79 self.input_proj_builder.exprs_len()
80 }
81
82 fn push_window_func(&mut self, window_func: WindowFunction) -> InputRef {
83 if let Some((pos, existing)) = self
84 .window_functions
85 .iter()
86 .find_position(|&w| w == &window_func)
87 {
88 return InputRef::new(
89 self.schema_over_window_start_offset() + pos,
90 existing.return_type.clone(),
91 );
92 }
93 let index = self.schema_over_window_start_offset() + self.window_functions.len();
94 let data_type = window_func.return_type.clone();
95 self.window_functions.push(window_func);
96 InputRef::new(index, data_type)
97 }
98
99 fn try_rewrite_window_function(&mut self, window_func: WindowFunction) -> Result<ExprImpl> {
100 let WindowFunction {
101 kind,
102 args,
103 return_type,
104 partition_by,
105 order_by,
106 ignore_nulls,
107 frame,
108 } = window_func;
109
110 let new_expr = if let WindowFuncKind::Aggregate(agg_type) = &kind
111 && matches!(agg_type, agg_types::rewritten!())
112 {
113 let agg_call = AggCall::new(
114 agg_type.clone(),
115 args,
116 false,
117 order_by,
118 Condition::true_cond(),
119 vec![],
120 )?;
121 LogicalAggBuilder::general_rewrite_agg_call(agg_call, |agg_call| {
122 Ok(self.push_window_func(
123 WindowFunction::new(
125 WindowFuncKind::Aggregate(agg_call.agg_type),
126 agg_call.args.clone(),
127 false, partition_by.clone(),
129 agg_call.order_by.clone(),
130 frame.clone(),
131 )?,
132 ))
133 })?
134 } else {
135 ExprImpl::from(self.push_window_func(WindowFunction::new(
136 kind,
137 args,
138 ignore_nulls,
139 partition_by,
140 order_by,
141 frame,
142 )?))
143 };
144
145 assert_eq!(new_expr.return_type(), return_type);
146 Ok(new_expr)
147 }
148}
149
150impl ExprRewriter for LogicalOverWindowBuilder<'_> {
151 fn rewrite_window_function(&mut self, window_func: WindowFunction) -> ExprImpl {
152 let dummy = Literal::new(None, window_func.return_type()).into();
153 match self.try_rewrite_window_function(window_func) {
154 Ok(expr) => expr,
155 Err(err) => {
156 self.error = Some(err);
157 dummy
158 }
159 }
160 }
161
162 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
163 let input_expr = input_ref.into();
164 let index = self.input_proj_builder.expr_index(&input_expr).unwrap();
165 ExprImpl::from(InputRef::new(index, input_expr.return_type()))
166 }
167}
168
169struct OverWindowProjectBuilder<'a> {
171 builder: &'a mut ProjectBuilder,
172 error: Option<ErrorCode>,
173}
174
175impl<'a> OverWindowProjectBuilder<'a> {
176 fn new(builder: &'a mut ProjectBuilder) -> Self {
177 Self {
178 builder,
179 error: None,
180 }
181 }
182
183 fn try_visit_window_function(
184 &mut self,
185 window_function: &WindowFunction,
186 ) -> std::result::Result<(), ErrorCode> {
187 if let WindowFuncKind::Aggregate(agg_type) = &window_function.kind
188 && matches!(
189 agg_type,
190 AggType::Builtin(
191 PbAggKind::StddevPop
192 | PbAggKind::StddevSamp
193 | PbAggKind::VarPop
194 | PbAggKind::VarSamp
195 )
196 )
197 {
198 let input = window_function.args.iter().exactly_one().unwrap();
199 let squared_input_expr = ExprImpl::from(
200 FunctionCall::new(ExprType::Multiply, vec![input.clone(), input.clone()]).unwrap(),
201 );
202 self.builder
203 .add_expr(&squared_input_expr)
204 .map_err(|err| not_implemented!("{err} inside args"))?;
205 }
206 for arg in &window_function.args {
207 self.builder
208 .add_expr(arg)
209 .map_err(|err| not_implemented!("{err} inside args"))?;
210 }
211 for partition_by in &window_function.partition_by {
212 self.builder
213 .add_expr(partition_by)
214 .map_err(|err| not_implemented!("{err} inside partition_by"))?;
215 }
216 for order_by in window_function.order_by.sort_exprs.iter().map(|e| &e.expr) {
217 self.builder
218 .add_expr(order_by)
219 .map_err(|err| not_implemented!("{err} inside order_by"))?;
220 }
221 Ok(())
222 }
223}
224
225impl ExprVisitor for OverWindowProjectBuilder<'_> {
226 fn visit_window_function(&mut self, window_function: &WindowFunction) {
227 if let Err(e) = self.try_visit_window_function(window_function) {
228 self.error = Some(e);
229 }
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Eq, Hash)]
237pub struct LogicalOverWindow {
238 pub base: PlanBase<Logical>,
239 core: OverWindow<PlanRef>,
240}
241
242impl LogicalOverWindow {
243 pub fn new(calls: Vec<PlanWindowFunction>, input: PlanRef) -> Self {
244 let core = OverWindow::new(calls, input);
245 let base = PlanBase::new_logical_with_core(&core);
246 Self { base, core }
247 }
248
249 fn build_input_proj(input: PlanRef, select_exprs: &[ExprImpl]) -> Result<ProjectBuilder> {
250 let mut input_proj_builder = ProjectBuilder::default();
251 for (idx, field) in input.schema().fields().iter().enumerate() {
253 input_proj_builder
254 .add_expr(&InputRef::new(idx, field.data_type()).into())
255 .map_err(|err| not_implemented!("{err} inside input"))?;
256 }
257 let mut build_input_proj_visitor = OverWindowProjectBuilder::new(&mut input_proj_builder);
258 for expr in select_exprs {
259 build_input_proj_visitor.visit_expr(expr);
260 if let Some(error) = build_input_proj_visitor.error.take() {
261 return Err(error.into());
262 }
263 }
264 Ok(input_proj_builder)
265 }
266
267 pub fn create(input: PlanRef, select_exprs: Vec<ExprImpl>) -> Result<(PlanRef, Vec<ExprImpl>)> {
268 let input_proj_builder = Self::build_input_proj(input.clone(), &select_exprs)?;
269
270 let mut window_functions = vec![];
271 let mut over_window_builder =
272 LogicalOverWindowBuilder::new(&input_proj_builder, &mut window_functions)?;
273
274 let rewritten_selected_items = over_window_builder.rewrite_selected_items(select_exprs)?;
275
276 for window_func in &window_functions {
277 if window_func.kind.is_numbering() && window_func.order_by.sort_exprs.is_empty() {
278 return Err(ErrorCode::InvalidInputSyntax(format!(
279 "window rank function without order by: {:?}",
280 window_func
281 ))
282 .into());
283 }
284 }
285
286 let plan_window_funcs = window_functions
287 .into_iter()
288 .map(|x| Self::convert_window_function(x, &input_proj_builder))
289 .try_collect()?;
290
291 Ok((
292 Self::new(
293 plan_window_funcs,
294 LogicalProject::with_core(input_proj_builder.build(input)).into(),
295 )
296 .into(),
297 rewritten_selected_items,
298 ))
299 }
300
301 fn convert_window_function(
302 window_function: WindowFunction,
303 input_proj_builder: &ProjectBuilder,
304 ) -> Result<PlanWindowFunction> {
305 let order_by = window_function
306 .order_by
307 .sort_exprs
308 .into_iter()
309 .map(|e| {
310 ColumnOrder::new(
311 input_proj_builder.expr_index(&e.expr).unwrap(),
312 e.order_type,
313 )
314 })
315 .collect_vec();
316 let partition_by = window_function
317 .partition_by
318 .into_iter()
319 .map(|e| InputRef::new(input_proj_builder.expr_index(&e).unwrap(), e.return_type()))
320 .collect_vec();
321
322 let mut args = window_function.args;
323 let (kind, frame) = match window_function.kind {
324 WindowFuncKind::RowNumber | WindowFuncKind::Rank | WindowFuncKind::DenseRank => {
325 (
328 window_function.kind,
329 Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
330 )
331 }
332 WindowFuncKind::Lag | WindowFuncKind::Lead => {
333 assert!(!window_function.ignore_nulls); let offset = if args.len() > 1 {
340 let offset_expr = args.remove(1);
341 if !offset_expr.return_type().is_int() {
342 return Err(ErrorCode::InvalidInputSyntax(format!(
343 "the `offset` of `{}` function should be integer",
344 window_function.kind
345 ))
346 .into());
347 }
348 let const_offset = offset_expr
349 .cast_implicit(&DataType::Int64)?
350 .try_fold_const();
351 if const_offset.is_none() {
352 bail_not_implemented!(
355 "non-const `offset` of `lag`/`lead` is not supported yet"
356 );
357 }
358 const_offset.unwrap()?.map(|v| *v.as_int64()).unwrap_or(1)
359 } else {
360 1
361 };
362 let sign = if window_function.kind == WindowFuncKind::Lag {
363 -1
364 } else {
365 1
366 };
367 let abs_offset = offset.unsigned_abs() as usize;
368 let frame = if sign * offset <= 0 {
369 Frame::rows(
370 FrameBound::Preceding(abs_offset),
371 FrameBound::Preceding(abs_offset),
372 )
373 } else {
374 Frame::rows(
375 FrameBound::Following(abs_offset),
376 FrameBound::Following(abs_offset),
377 )
378 };
379
380 (
381 WindowFuncKind::Aggregate(AggType::Builtin(PbAggKind::FirstValue)),
382 frame,
383 )
384 }
385 WindowFuncKind::Aggregate(_) => {
386 let frame = window_function.frame.unwrap_or({
387 if order_by.is_empty() {
390 Frame::rows(
391 FrameBound::UnboundedPreceding,
392 FrameBound::UnboundedFollowing,
393 )
394 } else {
395 Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow)
396 }
397 });
398 (window_function.kind, frame)
399 }
400 };
401
402 let args = args
403 .into_iter()
404 .map(|e| InputRef::new(input_proj_builder.expr_index(&e).unwrap(), e.return_type()))
405 .collect_vec();
406
407 Ok(PlanWindowFunction {
408 kind,
409 return_type: window_function.return_type,
410 args,
411 ignore_nulls: window_function.ignore_nulls,
412 partition_by,
413 order_by,
414 frame,
415 })
416 }
417
418 pub fn window_functions(&self) -> &[PlanWindowFunction] {
419 &self.core.window_functions
420 }
421
422 pub fn partition_key_indices(&self) -> Vec<usize> {
423 self.core.partition_key_indices()
424 }
425
426 pub fn order_key(&self) -> &[ColumnOrder] {
427 self.core.order_key()
428 }
429
430 #[must_use]
431 fn rewrite_with_input_and_window(
432 &self,
433 input: PlanRef,
434 window_functions: &[PlanWindowFunction],
435 input_col_change: ColIndexMapping,
436 ) -> Self {
437 let window_functions = window_functions
438 .iter()
439 .cloned()
440 .map(|mut window_function| {
441 window_function.args.iter_mut().for_each(|i| {
442 *i = InputRef::new(input_col_change.map(i.index()), i.return_type())
443 });
444 window_function.order_by.iter_mut().for_each(|o| {
445 o.column_index = input_col_change.map(o.column_index);
446 });
447 window_function.partition_by.iter_mut().for_each(|i| {
448 *i = InputRef::new(input_col_change.map(i.index()), i.return_type())
449 });
450 window_function
451 })
452 .collect();
453 Self::new(window_functions, input)
454 }
455
456 pub fn split_with_rule(&self, groups: Vec<Vec<usize>>) -> PlanRef {
457 assert!(groups.iter().flatten().all_unique());
458 assert!(
459 groups
460 .iter()
461 .flatten()
462 .all(|&idx| idx < self.window_functions().len())
463 );
464
465 let input_len = self.input().schema().len();
466 let original_out_fields = (0..input_len + self.window_functions().len()).collect_vec();
467 let mut out_fields = original_out_fields.clone();
468 let mut cur_input = self.input();
469 let mut cur_node = self.clone();
470 let mut cur_win_func_pos = input_len;
471 for func_indices in &groups {
472 cur_node = Self::new(
473 func_indices
474 .iter()
475 .map(|&idx| {
476 let func = &self.window_functions()[idx];
477 out_fields[input_len + idx] = cur_win_func_pos;
478 cur_win_func_pos += 1;
479 func.clone()
480 })
481 .collect_vec(),
482 cur_input.clone(),
483 );
484 cur_input = cur_node.clone().into();
485 }
486 if out_fields == original_out_fields {
487 cur_node.into()
488 } else {
489 LogicalProject::with_out_col_idx(cur_node.into(), out_fields.into_iter()).into()
490 }
491 }
492
493 pub fn decompose(self) -> (PlanRef, Vec<PlanWindowFunction>) {
494 self.core.decompose()
495 }
496}
497
498impl PlanTreeNodeUnary<Logical> for LogicalOverWindow {
499 fn input(&self) -> PlanRef {
500 self.core.input.clone()
501 }
502
503 fn clone_with_input(&self, input: PlanRef) -> Self {
504 Self::new(self.core.window_functions.clone(), input)
505 }
506
507 fn rewrite_with_input(
508 &self,
509 input: PlanRef,
510 input_col_change: ColIndexMapping,
511 ) -> (Self, ColIndexMapping) {
512 let input_len = self.core.input_len();
513 let new_input_len = input.schema().len();
514 let output_len = self.core.output_len();
515 let new_output_len = new_input_len + self.window_functions().len();
516 let output_col_change = {
517 let mut mapping = ColIndexMapping::empty(output_len, new_output_len);
518 for win_func_idx in 0..self.window_functions().len() {
519 mapping.put(input_len + win_func_idx, Some(new_input_len + win_func_idx));
520 }
521 mapping.union(&input_col_change)
522 };
523 let new_self =
524 self.rewrite_with_input_and_window(input, self.window_functions(), input_col_change);
525 (new_self, output_col_change)
526 }
527}
528
529impl_plan_tree_node_for_unary! { Logical, LogicalOverWindow }
530impl_distill_by_unit!(LogicalOverWindow, core, "LogicalOverWindow");
531
532impl ColPrunable for LogicalOverWindow {
533 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
534 let input_len = self.input().schema().len();
535
536 let (req_cols_input_part, req_cols_win_func_part) = {
537 let mut in_input = required_cols.to_vec();
538 let in_win_funcs: IndexSet = in_input.extract_if(.., |i| *i >= input_len).collect();
539 (IndexSet::from(in_input), in_win_funcs)
540 };
541
542 if req_cols_win_func_part.is_empty() {
543 return self.input().prune_col(&req_cols_input_part.to_vec(), ctx);
545 }
546
547 let (input_cols_required_by_this, window_functions) = {
548 let mut tmp = IndexSet::empty();
549 let new_window_functions = req_cols_win_func_part
550 .indices()
551 .map(|idx| self.window_functions()[idx - input_len].clone())
552 .inspect(|func| {
553 tmp.extend(func.args.iter().map(|x| x.index()));
554 tmp.extend(func.partition_by.iter().map(|x| x.index()));
555 tmp.extend(func.order_by.iter().map(|x| x.column_index));
556 })
557 .collect_vec();
558 (tmp, new_window_functions)
559 };
560
561 let input_required_cols = (req_cols_input_part | input_cols_required_by_this).to_vec();
562 let input_col_change =
563 ColIndexMapping::with_remaining_columns(&input_required_cols, input_len);
564 let new_self = {
565 let input = self.input().prune_col(&input_required_cols, ctx);
566 self.rewrite_with_input_and_window(input, &window_functions, input_col_change)
567 };
568 if new_self.schema().len() == required_cols.len() {
569 new_self.into()
571 } else {
572 let mut new_output_cols = input_required_cols.clone();
574 new_output_cols.extend(required_cols.iter().filter(|&&x| x >= input_len));
575 let mapping =
576 &ColIndexMapping::with_remaining_columns(&new_output_cols, self.schema().len());
577 let output_required_cols = required_cols
578 .iter()
579 .map(|&idx| mapping.map(idx))
580 .collect_vec();
581 let src_size = new_self.schema().len();
582 LogicalProject::with_mapping(
583 new_self.into(),
584 ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
585 )
586 .into()
587 }
588 }
589}
590
591impl ExprRewritable<Logical> for LogicalOverWindow {}
592
593impl ExprVisitable for LogicalOverWindow {}
594
595impl PredicatePushdown for LogicalOverWindow {
596 fn predicate_pushdown(
597 &self,
598 predicate: Condition,
599 ctx: &mut PredicatePushdownContext,
600 ) -> PlanRef {
601 if !self.core.funcs_have_same_partition_and_order() {
602 return LogicalFilter::create(self.clone().into(), predicate);
604 }
605
606 let all_out_cols: FixedBitSet = (0..self.schema().len()).collect();
607 let mut remain_cols: FixedBitSet = all_out_cols
608 .difference(&self.partition_key_indices().into_iter().collect())
609 .collect();
610 remain_cols.grow(self.schema().len());
611
612 let (remain_pred, pushed_pred) = predicate.split_disjoint(&remain_cols);
613 gen_filter_and_pushdown(self, remain_pred, pushed_pred, ctx)
614 }
615}
616
617macro_rules! empty_partition_by_not_implemented {
618 () => {
619 bail_not_implemented!(
620 issue = 11505,
621 "Window function with empty PARTITION BY is not supported because of potential bad performance. \
622 If you really need this, please workaround with something like `PARTITION BY 1::int`."
623 )
624 };
625}
626
627impl ToBatch for LogicalOverWindow {
628 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
629 assert!(
630 self.core.funcs_have_same_partition_and_order(),
631 "must apply OverWindowSplitRule before generating physical plan"
632 );
633
634 let partition_key_indices = self.window_functions()[0]
637 .partition_by
638 .iter()
639 .map(|e| e.index())
640 .collect_vec();
641 if partition_key_indices.is_empty() {
642 empty_partition_by_not_implemented!();
643 }
644
645 let input = self.input().to_batch()?;
646 let core = self.core.clone_with_input(input);
647 Ok(BatchOverWindow::new(core).into())
648 }
649}
650
651impl ToStream for LogicalOverWindow {
652 fn to_stream(
653 &self,
654 ctx: &mut ToStreamContext,
655 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
656 use super::stream::prelude::*;
657
658 assert!(
659 self.core.funcs_have_same_partition_and_order(),
660 "must apply OverWindowSplitRule before generating physical plan"
661 );
662
663 let stream_input = self.core.input.to_stream(ctx)?;
664
665 if ctx.emit_on_window_close() {
666 let order_by = &self.window_functions()[0].order_by;
669 if order_by.len() != 1 || order_by[0].order_type != OrderType::ascending() {
670 return Err(ErrorCode::InvalidInputSyntax(
671 "Only support window functions order by single column and in ascending order"
672 .to_owned(),
673 )
674 .into());
675 }
676 if !stream_input
677 .watermark_columns()
678 .contains(order_by[0].column_index)
679 {
680 return Err(ErrorCode::InvalidInputSyntax(
681 "The column ordered by must be a watermark column".to_owned(),
682 )
683 .into());
684 }
685 let order_key_index = order_by[0].column_index;
686
687 let partition_key_indices = self.window_functions()[0]
688 .partition_by
689 .iter()
690 .map(|e| e.index())
691 .collect_vec();
692 if partition_key_indices.is_empty() {
693 empty_partition_by_not_implemented!();
694 }
695
696 let sort_input =
697 RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
698 .streaming_enforce_if_not_satisfies(stream_input)?;
699 let sort = StreamEowcSort::new(sort_input, order_key_index);
700
701 let core = self.core.clone_with_input(sort.into());
702 Ok(StreamEowcOverWindow::new(core).into())
703 } else {
704 if self
707 .window_functions()
708 .iter()
709 .any(|f| f.frame.bounds.is_session())
710 {
711 bail_not_implemented!(
712 "Session frame is not yet supported in general streaming mode. \
713 Please consider using Emit-On-Window-Close mode."
714 );
715 }
716
717 let partition_key_indices = self.window_functions()[0]
720 .partition_by
721 .iter()
722 .map(|e| e.index())
723 .collect_vec();
724 if partition_key_indices.is_empty() {
725 empty_partition_by_not_implemented!();
726 }
727
728 let new_input =
729 RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
730 .streaming_enforce_if_not_satisfies(stream_input)?;
731 let core = self.core.clone_with_input(new_input);
732
733 Ok(StreamOverWindow::new(core)?.into())
734 }
735 }
736
737 fn logical_rewrite_for_stream(
738 &self,
739 ctx: &mut RewriteStreamContext,
740 ) -> Result<(PlanRef, ColIndexMapping)> {
741 let (input, input_col_change) = self.core.input.logical_rewrite_for_stream(ctx)?;
742 let (new_self, output_col_change) = self.rewrite_with_input(input, input_col_change);
743 Ok((new_self.into(), output_col_change))
744 }
745}