1use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
19use risingwave_common::{bail_not_implemented, not_implemented};
20use risingwave_expr::aggregate::{AggType, PbAggKind, agg_types};
21use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind};
22
23use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder};
24use super::utils::impl_distill_by_unit;
25use super::{
26 BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase,
27 PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, StreamEowcSort,
28 StreamOverWindow, ToBatch, ToStream, gen_filter_and_pushdown,
29};
30use crate::error::{ErrorCode, Result, RwError};
31use crate::expr::{
32 AggCall, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef,
33 WindowFunction,
34};
35use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36use crate::optimizer::plan_node::logical_agg::LogicalAggBuilder;
37use crate::optimizer::plan_node::{
38 ColumnPruningContext, Literal, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
39};
40use crate::optimizer::property::{Order, RequiredDist};
41use crate::utils::{ColIndexMapping, Condition, IndexSet};
42
43struct LogicalOverWindowBuilder<'a> {
44 input_proj_builder: &'a ProjectBuilder,
46 window_functions: &'a mut Vec<WindowFunction>,
48 error: Option<RwError>,
50}
51
52impl<'a> LogicalOverWindowBuilder<'a> {
53 fn new(
54 input_proj_builder: &'a ProjectBuilder,
55 window_functions: &'a mut Vec<WindowFunction>,
56 ) -> Result<Self> {
57 Ok(Self {
58 input_proj_builder,
59 window_functions,
60 error: None,
61 })
62 }
63
64 fn rewrite_selected_items(&mut self, selected_items: Vec<ExprImpl>) -> Result<Vec<ExprImpl>> {
65 let mut rewritten_items = vec![];
66 for expr in selected_items {
67 let rewritten_expr = self.rewrite_expr(expr);
68 if let Some(error) = self.error.take() {
69 return Err(error);
70 } else {
71 rewritten_items.push(rewritten_expr);
72 }
73 }
74 Ok(rewritten_items)
75 }
76
77 fn schema_over_window_start_offset(&self) -> usize {
78 self.input_proj_builder.exprs_len()
79 }
80
81 fn push_window_func(&mut self, window_func: WindowFunction) -> InputRef {
82 if let Some((pos, existing)) = self
83 .window_functions
84 .iter()
85 .find_position(|&w| w == &window_func)
86 {
87 return InputRef::new(
88 self.schema_over_window_start_offset() + pos,
89 existing.return_type.clone(),
90 );
91 }
92 let index = self.schema_over_window_start_offset() + self.window_functions.len();
93 let data_type = window_func.return_type.clone();
94 self.window_functions.push(window_func);
95 InputRef::new(index, data_type)
96 }
97
98 fn try_rewrite_window_function(&mut self, window_func: WindowFunction) -> Result<ExprImpl> {
99 let WindowFunction {
100 kind,
101 args,
102 return_type,
103 partition_by,
104 order_by,
105 ignore_nulls,
106 frame,
107 } = window_func;
108
109 let new_expr = if let WindowFuncKind::Aggregate(agg_type) = &kind
110 && matches!(agg_type, agg_types::rewritten!())
111 {
112 let agg_call = AggCall::new(
113 agg_type.clone(),
114 args,
115 false,
116 order_by,
117 Condition::true_cond(),
118 vec![],
119 )?;
120 LogicalAggBuilder::general_rewrite_agg_call(agg_call, |agg_call| {
121 Ok(self.push_window_func(
122 WindowFunction::new(
124 WindowFuncKind::Aggregate(agg_call.agg_type),
125 agg_call.args.clone(),
126 false, partition_by.clone(),
128 agg_call.order_by.clone(),
129 frame.clone(),
130 )?,
131 ))
132 })?
133 } else {
134 ExprImpl::from(self.push_window_func(WindowFunction::new(
135 kind,
136 args,
137 ignore_nulls,
138 partition_by,
139 order_by,
140 frame,
141 )?))
142 };
143
144 assert_eq!(new_expr.return_type(), return_type);
145 Ok(new_expr)
146 }
147}
148
149impl ExprRewriter for LogicalOverWindowBuilder<'_> {
150 fn rewrite_window_function(&mut self, window_func: WindowFunction) -> ExprImpl {
151 let dummy = Literal::new(None, window_func.return_type()).into();
152 match self.try_rewrite_window_function(window_func) {
153 Ok(expr) => expr,
154 Err(err) => {
155 self.error = Some(err);
156 dummy
157 }
158 }
159 }
160
161 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
162 let input_expr = input_ref.into();
163 let index = self.input_proj_builder.expr_index(&input_expr).unwrap();
164 ExprImpl::from(InputRef::new(index, input_expr.return_type()))
165 }
166}
167
168struct OverWindowProjectBuilder<'a> {
170 builder: &'a mut ProjectBuilder,
171 error: Option<ErrorCode>,
172}
173
174impl<'a> OverWindowProjectBuilder<'a> {
175 fn new(builder: &'a mut ProjectBuilder) -> Self {
176 Self {
177 builder,
178 error: None,
179 }
180 }
181
182 fn try_visit_window_function(
183 &mut self,
184 window_function: &WindowFunction,
185 ) -> std::result::Result<(), ErrorCode> {
186 if let WindowFuncKind::Aggregate(agg_type) = &window_function.kind
187 && matches!(
188 agg_type,
189 AggType::Builtin(
190 PbAggKind::StddevPop
191 | PbAggKind::StddevSamp
192 | PbAggKind::VarPop
193 | PbAggKind::VarSamp
194 )
195 )
196 {
197 let input = window_function.args.iter().exactly_one().unwrap();
198 let squared_input_expr = ExprImpl::from(
199 FunctionCall::new(ExprType::Multiply, vec![input.clone(), input.clone()]).unwrap(),
200 );
201 self.builder
202 .add_expr(&squared_input_expr)
203 .map_err(|err| not_implemented!("{err} inside args"))?;
204 }
205 for arg in &window_function.args {
206 self.builder
207 .add_expr(arg)
208 .map_err(|err| not_implemented!("{err} inside args"))?;
209 }
210 for partition_by in &window_function.partition_by {
211 self.builder
212 .add_expr(partition_by)
213 .map_err(|err| not_implemented!("{err} inside partition_by"))?;
214 }
215 for order_by in window_function.order_by.sort_exprs.iter().map(|e| &e.expr) {
216 self.builder
217 .add_expr(order_by)
218 .map_err(|err| not_implemented!("{err} inside order_by"))?;
219 }
220 Ok(())
221 }
222}
223
224impl ExprVisitor for OverWindowProjectBuilder<'_> {
225 fn visit_window_function(&mut self, window_function: &WindowFunction) {
226 if let Err(e) = self.try_visit_window_function(window_function) {
227 self.error = Some(e);
228 }
229 }
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, Hash)]
236pub struct LogicalOverWindow {
237 pub base: PlanBase<Logical>,
238 core: OverWindow<PlanRef>,
239}
240
241impl LogicalOverWindow {
242 pub fn new(calls: Vec<PlanWindowFunction>, input: PlanRef) -> Self {
243 let core = OverWindow::new(calls, input);
244 let base = PlanBase::new_logical_with_core(&core);
245 Self { base, core }
246 }
247
248 fn build_input_proj(input: PlanRef, select_exprs: &[ExprImpl]) -> Result<ProjectBuilder> {
249 let mut input_proj_builder = ProjectBuilder::default();
250 for (idx, field) in input.schema().fields().iter().enumerate() {
252 input_proj_builder
253 .add_expr(&InputRef::new(idx, field.data_type()).into())
254 .map_err(|err| not_implemented!("{err} inside input"))?;
255 }
256 let mut build_input_proj_visitor = OverWindowProjectBuilder::new(&mut input_proj_builder);
257 for expr in select_exprs {
258 build_input_proj_visitor.visit_expr(expr);
259 if let Some(error) = build_input_proj_visitor.error.take() {
260 return Err(error.into());
261 }
262 }
263 Ok(input_proj_builder)
264 }
265
266 pub fn create(input: PlanRef, select_exprs: Vec<ExprImpl>) -> Result<(PlanRef, Vec<ExprImpl>)> {
267 let input_proj_builder = Self::build_input_proj(input.clone(), &select_exprs)?;
268
269 let mut window_functions = vec![];
270 let mut over_window_builder =
271 LogicalOverWindowBuilder::new(&input_proj_builder, &mut window_functions)?;
272
273 let rewritten_selected_items = over_window_builder.rewrite_selected_items(select_exprs)?;
274
275 for window_func in &window_functions {
276 if window_func.kind.is_numbering() && window_func.order_by.sort_exprs.is_empty() {
277 return Err(ErrorCode::InvalidInputSyntax(format!(
278 "window rank function without order by: {:?}",
279 window_func
280 ))
281 .into());
282 }
283 }
284
285 let plan_window_funcs = window_functions
286 .into_iter()
287 .map(|x| Self::convert_window_function(x, &input_proj_builder))
288 .try_collect()?;
289
290 Ok((
291 Self::new(
292 plan_window_funcs,
293 LogicalProject::with_core(input_proj_builder.build(input)).into(),
294 )
295 .into(),
296 rewritten_selected_items,
297 ))
298 }
299
300 fn convert_window_function(
301 window_function: WindowFunction,
302 input_proj_builder: &ProjectBuilder,
303 ) -> Result<PlanWindowFunction> {
304 let order_by = window_function
305 .order_by
306 .sort_exprs
307 .into_iter()
308 .map(|e| {
309 ColumnOrder::new(
310 input_proj_builder.expr_index(&e.expr).unwrap(),
311 e.order_type,
312 )
313 })
314 .collect_vec();
315 let partition_by = window_function
316 .partition_by
317 .into_iter()
318 .map(|e| InputRef::new(input_proj_builder.expr_index(&e).unwrap(), e.return_type()))
319 .collect_vec();
320
321 let mut args = window_function.args;
322 let (kind, frame) = match window_function.kind {
323 WindowFuncKind::RowNumber | WindowFuncKind::Rank | WindowFuncKind::DenseRank => {
324 (
327 window_function.kind,
328 Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
329 )
330 }
331 WindowFuncKind::Lag | WindowFuncKind::Lead => {
332 assert!(!window_function.ignore_nulls); let offset = if args.len() > 1 {
339 let offset_expr = args.remove(1);
340 if !offset_expr.return_type().is_int() {
341 return Err(ErrorCode::InvalidInputSyntax(format!(
342 "the `offset` of `{}` function should be integer",
343 window_function.kind
344 ))
345 .into());
346 }
347 let const_offset = offset_expr.cast_implicit(DataType::Int64)?.try_fold_const();
348 if const_offset.is_none() {
349 bail_not_implemented!(
352 "non-const `offset` of `lag`/`lead` is not supported yet"
353 );
354 }
355 const_offset.unwrap()?.map(|v| *v.as_int64()).unwrap_or(1)
356 } else {
357 1
358 };
359 let sign = if window_function.kind == WindowFuncKind::Lag {
360 -1
361 } else {
362 1
363 };
364 let abs_offset = offset.unsigned_abs() as usize;
365 let frame = if sign * offset <= 0 {
366 Frame::rows(
367 FrameBound::Preceding(abs_offset),
368 FrameBound::Preceding(abs_offset),
369 )
370 } else {
371 Frame::rows(
372 FrameBound::Following(abs_offset),
373 FrameBound::Following(abs_offset),
374 )
375 };
376
377 (
378 WindowFuncKind::Aggregate(AggType::Builtin(PbAggKind::FirstValue)),
379 frame,
380 )
381 }
382 WindowFuncKind::Aggregate(_) => {
383 let frame = window_function.frame.unwrap_or({
384 if order_by.is_empty() {
387 Frame::rows(
388 FrameBound::UnboundedPreceding,
389 FrameBound::UnboundedFollowing,
390 )
391 } else {
392 Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow)
393 }
394 });
395 (window_function.kind, frame)
396 }
397 };
398
399 let args = args
400 .into_iter()
401 .map(|e| InputRef::new(input_proj_builder.expr_index(&e).unwrap(), e.return_type()))
402 .collect_vec();
403
404 Ok(PlanWindowFunction {
405 kind,
406 return_type: window_function.return_type,
407 args,
408 ignore_nulls: window_function.ignore_nulls,
409 partition_by,
410 order_by,
411 frame,
412 })
413 }
414
415 pub fn window_functions(&self) -> &[PlanWindowFunction] {
416 &self.core.window_functions
417 }
418
419 pub fn partition_key_indices(&self) -> Vec<usize> {
420 self.core.partition_key_indices()
421 }
422
423 pub fn order_key(&self) -> &[ColumnOrder] {
424 self.core.order_key()
425 }
426
427 #[must_use]
428 fn rewrite_with_input_and_window(
429 &self,
430 input: PlanRef,
431 window_functions: &[PlanWindowFunction],
432 input_col_change: ColIndexMapping,
433 ) -> Self {
434 let window_functions = window_functions
435 .iter()
436 .cloned()
437 .map(|mut window_function| {
438 window_function.args.iter_mut().for_each(|i| {
439 *i = InputRef::new(input_col_change.map(i.index()), i.return_type())
440 });
441 window_function.order_by.iter_mut().for_each(|o| {
442 o.column_index = input_col_change.map(o.column_index);
443 });
444 window_function.partition_by.iter_mut().for_each(|i| {
445 *i = InputRef::new(input_col_change.map(i.index()), i.return_type())
446 });
447 window_function
448 })
449 .collect();
450 Self::new(window_functions, input)
451 }
452
453 pub fn split_with_rule(&self, groups: Vec<Vec<usize>>) -> PlanRef {
454 assert!(groups.iter().flatten().all_unique());
455 assert!(
456 groups
457 .iter()
458 .flatten()
459 .all(|&idx| idx < self.window_functions().len())
460 );
461
462 let input_len = self.input().schema().len();
463 let original_out_fields = (0..input_len + self.window_functions().len()).collect_vec();
464 let mut out_fields = original_out_fields.clone();
465 let mut cur_input = self.input();
466 let mut cur_node = self.clone();
467 let mut cur_win_func_pos = input_len;
468 for func_indices in &groups {
469 cur_node = Self::new(
470 func_indices
471 .iter()
472 .map(|&idx| {
473 let func = &self.window_functions()[idx];
474 out_fields[input_len + idx] = cur_win_func_pos;
475 cur_win_func_pos += 1;
476 func.clone()
477 })
478 .collect_vec(),
479 cur_input.clone(),
480 );
481 cur_input = cur_node.clone().into();
482 }
483 if out_fields == original_out_fields {
484 cur_node.into()
485 } else {
486 LogicalProject::with_out_col_idx(cur_node.into(), out_fields.into_iter()).into()
487 }
488 }
489
490 pub fn decompose(self) -> (PlanRef, Vec<PlanWindowFunction>) {
491 self.core.decompose()
492 }
493}
494
495impl PlanTreeNodeUnary for LogicalOverWindow {
496 fn input(&self) -> PlanRef {
497 self.core.input.clone()
498 }
499
500 fn clone_with_input(&self, input: PlanRef) -> Self {
501 Self::new(self.core.window_functions.clone(), input)
502 }
503
504 fn rewrite_with_input(
505 &self,
506 input: PlanRef,
507 input_col_change: ColIndexMapping,
508 ) -> (Self, ColIndexMapping) {
509 let input_len = self.core.input_len();
510 let new_input_len = input.schema().len();
511 let output_len = self.core.output_len();
512 let new_output_len = new_input_len + self.window_functions().len();
513 let output_col_change = {
514 let mut mapping = ColIndexMapping::empty(output_len, new_output_len);
515 for win_func_idx in 0..self.window_functions().len() {
516 mapping.put(input_len + win_func_idx, Some(new_input_len + win_func_idx));
517 }
518 mapping.union(&input_col_change)
519 };
520 let new_self =
521 self.rewrite_with_input_and_window(input, self.window_functions(), input_col_change);
522 (new_self, output_col_change)
523 }
524}
525
526impl_plan_tree_node_for_unary! { LogicalOverWindow }
527impl_distill_by_unit!(LogicalOverWindow, core, "LogicalOverWindow");
528
529impl ColPrunable for LogicalOverWindow {
530 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
531 let input_len = self.input().schema().len();
532
533 let (req_cols_input_part, req_cols_win_func_part) = {
534 let mut in_input = required_cols.to_vec();
535 let in_win_funcs: IndexSet = in_input.extract_if(.., |i| *i >= input_len).collect();
536 (IndexSet::from(in_input), in_win_funcs)
537 };
538
539 if req_cols_win_func_part.is_empty() {
540 return self.input().prune_col(&req_cols_input_part.to_vec(), ctx);
542 }
543
544 let (input_cols_required_by_this, window_functions) = {
545 let mut tmp = IndexSet::empty();
546 let new_window_functions = req_cols_win_func_part
547 .indices()
548 .map(|idx| self.window_functions()[idx - input_len].clone())
549 .inspect(|func| {
550 tmp.extend(func.args.iter().map(|x| x.index()));
551 tmp.extend(func.partition_by.iter().map(|x| x.index()));
552 tmp.extend(func.order_by.iter().map(|x| x.column_index));
553 })
554 .collect_vec();
555 (tmp, new_window_functions)
556 };
557
558 let input_required_cols = (req_cols_input_part | input_cols_required_by_this).to_vec();
559 let input_col_change =
560 ColIndexMapping::with_remaining_columns(&input_required_cols, input_len);
561 let new_self = {
562 let input = self.input().prune_col(&input_required_cols, ctx);
563 self.rewrite_with_input_and_window(input, &window_functions, input_col_change)
564 };
565 if new_self.schema().len() == required_cols.len() {
566 new_self.into()
568 } else {
569 let mut new_output_cols = input_required_cols.clone();
571 new_output_cols.extend(required_cols.iter().filter(|&&x| x >= input_len));
572 let mapping =
573 &ColIndexMapping::with_remaining_columns(&new_output_cols, self.schema().len());
574 let output_required_cols = required_cols
575 .iter()
576 .map(|&idx| mapping.map(idx))
577 .collect_vec();
578 let src_size = new_self.schema().len();
579 LogicalProject::with_mapping(
580 new_self.into(),
581 ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
582 )
583 .into()
584 }
585 }
586}
587
588impl ExprRewritable for LogicalOverWindow {}
589
590impl ExprVisitable for LogicalOverWindow {}
591
592impl PredicatePushdown for LogicalOverWindow {
593 fn predicate_pushdown(
594 &self,
595 predicate: Condition,
596 ctx: &mut PredicatePushdownContext,
597 ) -> PlanRef {
598 if !self.core.funcs_have_same_partition_and_order() {
599 return LogicalFilter::create(self.clone().into(), predicate);
601 }
602
603 let all_out_cols: FixedBitSet = (0..self.schema().len()).collect();
604 let mut remain_cols: FixedBitSet = all_out_cols
605 .difference(&self.partition_key_indices().into_iter().collect())
606 .collect();
607 remain_cols.grow(self.schema().len());
608
609 let (remain_pred, pushed_pred) = predicate.split_disjoint(&remain_cols);
610 gen_filter_and_pushdown(self, remain_pred, pushed_pred, ctx)
611 }
612}
613
614macro_rules! empty_partition_by_not_implemented {
615 () => {
616 bail_not_implemented!(
617 issue = 11505,
618 "Window function with empty PARTITION BY is not supported because of potential bad performance. \
619 If you really need this, please workaround with something like `PARTITION BY 1::int`."
620 )
621 };
622}
623
624impl ToBatch for LogicalOverWindow {
625 fn to_batch(&self) -> Result<PlanRef> {
626 assert!(
627 self.core.funcs_have_same_partition_and_order(),
628 "must apply OverWindowSplitRule before generating physical plan"
629 );
630
631 let partition_key_indices = self.window_functions()[0]
634 .partition_by
635 .iter()
636 .map(|e| e.index())
637 .collect_vec();
638 if partition_key_indices.is_empty() {
639 empty_partition_by_not_implemented!();
640 }
641
642 let input = self.input().to_batch()?;
643 let new_logical = OverWindow {
644 input,
645 ..self.core.clone()
646 };
647 Ok(BatchOverWindow::new(new_logical).into())
648 }
649}
650
651impl ToStream for LogicalOverWindow {
652 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
653 use super::stream::prelude::*;
654
655 assert!(
656 self.core.funcs_have_same_partition_and_order(),
657 "must apply OverWindowSplitRule before generating physical plan"
658 );
659
660 let stream_input = self.core.input.to_stream(ctx)?;
661
662 if ctx.emit_on_window_close() {
663 let order_by = &self.window_functions()[0].order_by;
666 if order_by.len() != 1 || order_by[0].order_type != OrderType::ascending() {
667 return Err(ErrorCode::InvalidInputSyntax(
668 "Only support window functions order by single column and in ascending order"
669 .to_owned(),
670 )
671 .into());
672 }
673 if !stream_input
674 .watermark_columns()
675 .contains(order_by[0].column_index)
676 {
677 return Err(ErrorCode::InvalidInputSyntax(
678 "The column ordered by must be a watermark column".to_owned(),
679 )
680 .into());
681 }
682 let order_key_index = order_by[0].column_index;
683
684 let partition_key_indices = self.window_functions()[0]
685 .partition_by
686 .iter()
687 .map(|e| e.index())
688 .collect_vec();
689 if partition_key_indices.is_empty() {
690 empty_partition_by_not_implemented!();
691 }
692
693 let sort_input =
694 RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
695 .enforce_if_not_satisfies(stream_input, &Order::any())?;
696 let sort = StreamEowcSort::new(sort_input, order_key_index);
697
698 let mut core = self.core.clone();
699 core.input = sort.into();
700 Ok(StreamEowcOverWindow::new(core).into())
701 } else {
702 if self
705 .window_functions()
706 .iter()
707 .any(|f| f.frame.bounds.is_session())
708 {
709 bail_not_implemented!(
710 "Session frame is not yet supported in general streaming mode. \
711 Please consider using Emit-On-Window-Close mode."
712 );
713 }
714
715 let partition_key_indices = self.window_functions()[0]
718 .partition_by
719 .iter()
720 .map(|e| e.index())
721 .collect_vec();
722 if partition_key_indices.is_empty() {
723 empty_partition_by_not_implemented!();
724 }
725
726 let new_input =
727 RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
728 .enforce_if_not_satisfies(stream_input, &Order::any())?;
729 let mut core = self.core.clone();
730 core.input = new_input;
731 Ok(StreamOverWindow::new(core).into())
732 }
733 }
734
735 fn logical_rewrite_for_stream(
736 &self,
737 ctx: &mut RewriteStreamContext,
738 ) -> Result<(PlanRef, ColIndexMapping)> {
739 let (input, input_col_change) = self.core.input.logical_rewrite_for_stream(ctx)?;
740 let (new_self, output_col_change) = self.rewrite_with_input(input, input_col_change);
741 Ok((new_self.into(), output_col_change))
742 }
743}