risingwave_frontend/optimizer/plan_node/
logical_over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
19use risingwave_common::{bail_not_implemented, not_implemented};
20use risingwave_expr::aggregate::{AggType, PbAggKind, agg_types};
21use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind};
22
23use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder};
24use super::utils::impl_distill_by_unit;
25use super::{
26    BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter,
27    LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
28    StreamEowcOverWindow, StreamEowcSort, StreamOverWindow, ToBatch, ToStream,
29    gen_filter_and_pushdown,
30};
31use crate::error::{ErrorCode, Result, RwError};
32use crate::expr::{
33    AggCall, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef,
34    WindowFunction,
35};
36use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
37use crate::optimizer::plan_node::logical_agg::LogicalAggBuilder;
38use crate::optimizer::plan_node::{
39    ColumnPruningContext, Literal, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
40};
41use crate::optimizer::property::RequiredDist;
42use crate::utils::{ColIndexMapping, Condition, IndexSet};
43
44struct LogicalOverWindowBuilder<'a> {
45    /// the builder of the input Project
46    input_proj_builder: &'a ProjectBuilder,
47    /// the window functions
48    window_functions: &'a mut Vec<WindowFunction>,
49    /// the error during the expression rewriting
50    error: Option<RwError>,
51}
52
53impl<'a> LogicalOverWindowBuilder<'a> {
54    fn new(
55        input_proj_builder: &'a ProjectBuilder,
56        window_functions: &'a mut Vec<WindowFunction>,
57    ) -> Result<Self> {
58        Ok(Self {
59            input_proj_builder,
60            window_functions,
61            error: None,
62        })
63    }
64
65    fn rewrite_selected_items(&mut self, selected_items: Vec<ExprImpl>) -> Result<Vec<ExprImpl>> {
66        let mut rewritten_items = vec![];
67        for expr in selected_items {
68            let rewritten_expr = self.rewrite_expr(expr);
69            if let Some(error) = self.error.take() {
70                return Err(error);
71            } else {
72                rewritten_items.push(rewritten_expr);
73            }
74        }
75        Ok(rewritten_items)
76    }
77
78    fn schema_over_window_start_offset(&self) -> usize {
79        self.input_proj_builder.exprs_len()
80    }
81
82    fn push_window_func(&mut self, window_func: WindowFunction) -> InputRef {
83        if let Some((pos, existing)) = self
84            .window_functions
85            .iter()
86            .find_position(|&w| w == &window_func)
87        {
88            return InputRef::new(
89                self.schema_over_window_start_offset() + pos,
90                existing.return_type.clone(),
91            );
92        }
93        let index = self.schema_over_window_start_offset() + self.window_functions.len();
94        let data_type = window_func.return_type.clone();
95        self.window_functions.push(window_func);
96        InputRef::new(index, data_type)
97    }
98
99    fn try_rewrite_window_function(&mut self, window_func: WindowFunction) -> Result<ExprImpl> {
100        let WindowFunction {
101            kind,
102            args,
103            return_type,
104            partition_by,
105            order_by,
106            ignore_nulls,
107            frame,
108        } = window_func;
109
110        let new_expr = if let WindowFuncKind::Aggregate(agg_type) = &kind
111            && matches!(agg_type, agg_types::rewritten!())
112        {
113            let agg_call = AggCall::new(
114                agg_type.clone(),
115                args,
116                false,
117                order_by,
118                Condition::true_cond(),
119                vec![],
120            )?;
121            LogicalAggBuilder::general_rewrite_agg_call(agg_call, |agg_call| {
122                Ok(self.push_window_func(
123                    // AggCall -> WindowFunction
124                    WindowFunction::new(
125                        WindowFuncKind::Aggregate(agg_call.agg_type),
126                        agg_call.args.clone(),
127                        false, // we don't support `IGNORE NULLS` for these functions now
128                        partition_by.clone(),
129                        agg_call.order_by.clone(),
130                        frame.clone(),
131                    )?,
132                ))
133            })?
134        } else {
135            ExprImpl::from(self.push_window_func(WindowFunction::new(
136                kind,
137                args,
138                ignore_nulls,
139                partition_by,
140                order_by,
141                frame,
142            )?))
143        };
144
145        assert_eq!(new_expr.return_type(), return_type);
146        Ok(new_expr)
147    }
148}
149
150impl ExprRewriter for LogicalOverWindowBuilder<'_> {
151    fn rewrite_window_function(&mut self, window_func: WindowFunction) -> ExprImpl {
152        let dummy = Literal::new(None, window_func.return_type()).into();
153        match self.try_rewrite_window_function(window_func) {
154            Ok(expr) => expr,
155            Err(err) => {
156                self.error = Some(err);
157                dummy
158            }
159        }
160    }
161
162    fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
163        let input_expr = input_ref.into();
164        let index = self.input_proj_builder.expr_index(&input_expr).unwrap();
165        ExprImpl::from(InputRef::new(index, input_expr.return_type()))
166    }
167}
168
169/// Build columns from window function `args` / `partition_by` / `order_by`
170struct OverWindowProjectBuilder<'a> {
171    builder: &'a mut ProjectBuilder,
172    error: Option<ErrorCode>,
173}
174
175impl<'a> OverWindowProjectBuilder<'a> {
176    fn new(builder: &'a mut ProjectBuilder) -> Self {
177        Self {
178            builder,
179            error: None,
180        }
181    }
182
183    fn try_visit_window_function(
184        &mut self,
185        window_function: &WindowFunction,
186    ) -> std::result::Result<(), ErrorCode> {
187        if let WindowFuncKind::Aggregate(agg_type) = &window_function.kind
188            && matches!(
189                agg_type,
190                AggType::Builtin(
191                    PbAggKind::StddevPop
192                        | PbAggKind::StddevSamp
193                        | PbAggKind::VarPop
194                        | PbAggKind::VarSamp
195                )
196            )
197        {
198            let input = window_function.args.iter().exactly_one().unwrap();
199            let squared_input_expr = ExprImpl::from(
200                FunctionCall::new(ExprType::Multiply, vec![input.clone(), input.clone()]).unwrap(),
201            );
202            self.builder
203                .add_expr(&squared_input_expr)
204                .map_err(|err| not_implemented!("{err} inside args"))?;
205        }
206        for arg in &window_function.args {
207            self.builder
208                .add_expr(arg)
209                .map_err(|err| not_implemented!("{err} inside args"))?;
210        }
211        for partition_by in &window_function.partition_by {
212            self.builder
213                .add_expr(partition_by)
214                .map_err(|err| not_implemented!("{err} inside partition_by"))?;
215        }
216        for order_by in window_function.order_by.sort_exprs.iter().map(|e| &e.expr) {
217            self.builder
218                .add_expr(order_by)
219                .map_err(|err| not_implemented!("{err} inside order_by"))?;
220        }
221        Ok(())
222    }
223}
224
225impl ExprVisitor for OverWindowProjectBuilder<'_> {
226    fn visit_window_function(&mut self, window_function: &WindowFunction) {
227        if let Err(e) = self.try_visit_window_function(window_function) {
228            self.error = Some(e);
229        }
230    }
231}
232
233/// `LogicalOverWindow` performs `OVER` window functions to its input.
234///
235/// The output schema is the input schema plus the window functions.
236#[derive(Debug, Clone, PartialEq, Eq, Hash)]
237pub struct LogicalOverWindow {
238    pub base: PlanBase<Logical>,
239    core: OverWindow<PlanRef>,
240}
241
242impl LogicalOverWindow {
243    pub fn new(calls: Vec<PlanWindowFunction>, input: PlanRef) -> Self {
244        let core = OverWindow::new(calls, input);
245        let base = PlanBase::new_logical_with_core(&core);
246        Self { base, core }
247    }
248
249    fn build_input_proj(input: PlanRef, select_exprs: &[ExprImpl]) -> Result<ProjectBuilder> {
250        let mut input_proj_builder = ProjectBuilder::default();
251        // Add and check input columns
252        for (idx, field) in input.schema().fields().iter().enumerate() {
253            input_proj_builder
254                .add_expr(&InputRef::new(idx, field.data_type()).into())
255                .map_err(|err| not_implemented!("{err} inside input"))?;
256        }
257        let mut build_input_proj_visitor = OverWindowProjectBuilder::new(&mut input_proj_builder);
258        for expr in select_exprs {
259            build_input_proj_visitor.visit_expr(expr);
260            if let Some(error) = build_input_proj_visitor.error.take() {
261                return Err(error.into());
262            }
263        }
264        Ok(input_proj_builder)
265    }
266
267    pub fn create(input: PlanRef, select_exprs: Vec<ExprImpl>) -> Result<(PlanRef, Vec<ExprImpl>)> {
268        let input_proj_builder = Self::build_input_proj(input.clone(), &select_exprs)?;
269
270        let mut window_functions = vec![];
271        let mut over_window_builder =
272            LogicalOverWindowBuilder::new(&input_proj_builder, &mut window_functions)?;
273
274        let rewritten_selected_items = over_window_builder.rewrite_selected_items(select_exprs)?;
275
276        for window_func in &window_functions {
277            if window_func.kind.is_numbering() && window_func.order_by.sort_exprs.is_empty() {
278                return Err(ErrorCode::InvalidInputSyntax(format!(
279                    "window rank function without order by: {:?}",
280                    window_func
281                ))
282                .into());
283            }
284        }
285
286        let plan_window_funcs = window_functions
287            .into_iter()
288            .map(|x| Self::convert_window_function(x, &input_proj_builder))
289            .try_collect()?;
290
291        Ok((
292            Self::new(
293                plan_window_funcs,
294                LogicalProject::with_core(input_proj_builder.build(input)).into(),
295            )
296            .into(),
297            rewritten_selected_items,
298        ))
299    }
300
301    fn convert_window_function(
302        window_function: WindowFunction,
303        input_proj_builder: &ProjectBuilder,
304    ) -> Result<PlanWindowFunction> {
305        let order_by = window_function
306            .order_by
307            .sort_exprs
308            .into_iter()
309            .map(|e| {
310                ColumnOrder::new(
311                    input_proj_builder.expr_index(&e.expr).unwrap(),
312                    e.order_type,
313                )
314            })
315            .collect_vec();
316        let partition_by = window_function
317            .partition_by
318            .into_iter()
319            .map(|e| InputRef::new(input_proj_builder.expr_index(&e).unwrap(), e.return_type()))
320            .collect_vec();
321
322        let mut args = window_function.args;
323        let (kind, frame) = match window_function.kind {
324            WindowFuncKind::RowNumber | WindowFuncKind::Rank | WindowFuncKind::DenseRank => {
325                // ignore user-defined frame for rank functions, also, rank functions only care
326                // about the rows before current row
327                (
328                    window_function.kind,
329                    Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
330                )
331            }
332            WindowFuncKind::Lag | WindowFuncKind::Lead => {
333                // `lag(x, const offset N) over ()`
334                //     == `first_value(x) over (rows between N preceding and N preceding)`
335                // `lead(x, const offset N) over ()`
336                //     == `first_value(x) over (rows between N following and N following)`
337                assert!(!window_function.ignore_nulls); // the conversion is not applicable to `LAG`/`LEAD` with `IGNORE NULLS`
338
339                let offset = if args.len() > 1 {
340                    let offset_expr = args.remove(1);
341                    if !offset_expr.return_type().is_int() {
342                        return Err(ErrorCode::InvalidInputSyntax(format!(
343                            "the `offset` of `{}` function should be integer",
344                            window_function.kind
345                        ))
346                        .into());
347                    }
348                    let const_offset = offset_expr
349                        .cast_implicit(&DataType::Int64)?
350                        .try_fold_const();
351                    if const_offset.is_none() {
352                        // should already be checked in `WindowFunction::infer_return_type`,
353                        // but just in case
354                        bail_not_implemented!(
355                            "non-const `offset` of `lag`/`lead` is not supported yet"
356                        );
357                    }
358                    const_offset.unwrap()?.map(|v| *v.as_int64()).unwrap_or(1)
359                } else {
360                    1
361                };
362                let sign = if window_function.kind == WindowFuncKind::Lag {
363                    -1
364                } else {
365                    1
366                };
367                let abs_offset = offset.unsigned_abs() as usize;
368                let frame = if sign * offset <= 0 {
369                    Frame::rows(
370                        FrameBound::Preceding(abs_offset),
371                        FrameBound::Preceding(abs_offset),
372                    )
373                } else {
374                    Frame::rows(
375                        FrameBound::Following(abs_offset),
376                        FrameBound::Following(abs_offset),
377                    )
378                };
379
380                (
381                    WindowFuncKind::Aggregate(AggType::Builtin(PbAggKind::FirstValue)),
382                    frame,
383                )
384            }
385            WindowFuncKind::Aggregate(_) => {
386                let frame = window_function.frame.unwrap_or({
387                    // FIXME(rc): The following 2 cases should both be `Frame::Range(Unbounded,
388                    // CurrentRow)` but we don't support yet.
389                    if order_by.is_empty() {
390                        Frame::rows(
391                            FrameBound::UnboundedPreceding,
392                            FrameBound::UnboundedFollowing,
393                        )
394                    } else {
395                        Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow)
396                    }
397                });
398                (window_function.kind, frame)
399            }
400        };
401
402        let args = args
403            .into_iter()
404            .map(|e| InputRef::new(input_proj_builder.expr_index(&e).unwrap(), e.return_type()))
405            .collect_vec();
406
407        Ok(PlanWindowFunction {
408            kind,
409            return_type: window_function.return_type,
410            args,
411            ignore_nulls: window_function.ignore_nulls,
412            partition_by,
413            order_by,
414            frame,
415        })
416    }
417
418    pub fn window_functions(&self) -> &[PlanWindowFunction] {
419        &self.core.window_functions
420    }
421
422    pub fn partition_key_indices(&self) -> Vec<usize> {
423        self.core.partition_key_indices()
424    }
425
426    pub fn order_key(&self) -> &[ColumnOrder] {
427        self.core.order_key()
428    }
429
430    #[must_use]
431    fn rewrite_with_input_and_window(
432        &self,
433        input: PlanRef,
434        window_functions: &[PlanWindowFunction],
435        input_col_change: ColIndexMapping,
436    ) -> Self {
437        let window_functions = window_functions
438            .iter()
439            .cloned()
440            .map(|mut window_function| {
441                window_function.args.iter_mut().for_each(|i| {
442                    *i = InputRef::new(input_col_change.map(i.index()), i.return_type())
443                });
444                window_function.order_by.iter_mut().for_each(|o| {
445                    o.column_index = input_col_change.map(o.column_index);
446                });
447                window_function.partition_by.iter_mut().for_each(|i| {
448                    *i = InputRef::new(input_col_change.map(i.index()), i.return_type())
449                });
450                window_function
451            })
452            .collect();
453        Self::new(window_functions, input)
454    }
455
456    pub fn split_with_rule(&self, groups: Vec<Vec<usize>>) -> PlanRef {
457        assert!(groups.iter().flatten().all_unique());
458        assert!(
459            groups
460                .iter()
461                .flatten()
462                .all(|&idx| idx < self.window_functions().len())
463        );
464
465        let input_len = self.input().schema().len();
466        let original_out_fields = (0..input_len + self.window_functions().len()).collect_vec();
467        let mut out_fields = original_out_fields.clone();
468        let mut cur_input = self.input();
469        let mut cur_node = self.clone();
470        let mut cur_win_func_pos = input_len;
471        for func_indices in &groups {
472            cur_node = Self::new(
473                func_indices
474                    .iter()
475                    .map(|&idx| {
476                        let func = &self.window_functions()[idx];
477                        out_fields[input_len + idx] = cur_win_func_pos;
478                        cur_win_func_pos += 1;
479                        func.clone()
480                    })
481                    .collect_vec(),
482                cur_input.clone(),
483            );
484            cur_input = cur_node.clone().into();
485        }
486        if out_fields == original_out_fields {
487            cur_node.into()
488        } else {
489            LogicalProject::with_out_col_idx(cur_node.into(), out_fields.into_iter()).into()
490        }
491    }
492
493    pub fn decompose(self) -> (PlanRef, Vec<PlanWindowFunction>) {
494        self.core.decompose()
495    }
496}
497
498impl PlanTreeNodeUnary<Logical> for LogicalOverWindow {
499    fn input(&self) -> PlanRef {
500        self.core.input.clone()
501    }
502
503    fn clone_with_input(&self, input: PlanRef) -> Self {
504        Self::new(self.core.window_functions.clone(), input)
505    }
506
507    fn rewrite_with_input(
508        &self,
509        input: PlanRef,
510        input_col_change: ColIndexMapping,
511    ) -> (Self, ColIndexMapping) {
512        let input_len = self.core.input_len();
513        let new_input_len = input.schema().len();
514        let output_len = self.core.output_len();
515        let new_output_len = new_input_len + self.window_functions().len();
516        let output_col_change = {
517            let mut mapping = ColIndexMapping::empty(output_len, new_output_len);
518            for win_func_idx in 0..self.window_functions().len() {
519                mapping.put(input_len + win_func_idx, Some(new_input_len + win_func_idx));
520            }
521            mapping.union(&input_col_change)
522        };
523        let new_self =
524            self.rewrite_with_input_and_window(input, self.window_functions(), input_col_change);
525        (new_self, output_col_change)
526    }
527}
528
529impl_plan_tree_node_for_unary! { Logical, LogicalOverWindow }
530impl_distill_by_unit!(LogicalOverWindow, core, "LogicalOverWindow");
531
532impl ColPrunable for LogicalOverWindow {
533    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
534        let input_len = self.input().schema().len();
535
536        let (req_cols_input_part, req_cols_win_func_part) = {
537            let mut in_input = required_cols.to_vec();
538            let in_win_funcs: IndexSet = in_input.extract_if(.., |i| *i >= input_len).collect();
539            (IndexSet::from(in_input), in_win_funcs)
540        };
541
542        if req_cols_win_func_part.is_empty() {
543            // no window function is needed
544            return self.input().prune_col(&req_cols_input_part.to_vec(), ctx);
545        }
546
547        let (input_cols_required_by_this, window_functions) = {
548            let mut tmp = IndexSet::empty();
549            let new_window_functions = req_cols_win_func_part
550                .indices()
551                .map(|idx| self.window_functions()[idx - input_len].clone())
552                .inspect(|func| {
553                    tmp.extend(func.args.iter().map(|x| x.index()));
554                    tmp.extend(func.partition_by.iter().map(|x| x.index()));
555                    tmp.extend(func.order_by.iter().map(|x| x.column_index));
556                })
557                .collect_vec();
558            (tmp, new_window_functions)
559        };
560
561        let input_required_cols = (req_cols_input_part | input_cols_required_by_this).to_vec();
562        let input_col_change =
563            ColIndexMapping::with_remaining_columns(&input_required_cols, input_len);
564        let new_self = {
565            let input = self.input().prune_col(&input_required_cols, ctx);
566            self.rewrite_with_input_and_window(input, &window_functions, input_col_change)
567        };
568        if new_self.schema().len() == required_cols.len() {
569            // current schema perfectly fit the required columns
570            new_self.into()
571        } else {
572            // some columns are not needed so we did a projection to remove the columns.
573            let mut new_output_cols = input_required_cols.clone();
574            new_output_cols.extend(required_cols.iter().filter(|&&x| x >= input_len));
575            let mapping =
576                &ColIndexMapping::with_remaining_columns(&new_output_cols, self.schema().len());
577            let output_required_cols = required_cols
578                .iter()
579                .map(|&idx| mapping.map(idx))
580                .collect_vec();
581            let src_size = new_self.schema().len();
582            LogicalProject::with_mapping(
583                new_self.into(),
584                ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
585            )
586            .into()
587        }
588    }
589}
590
591impl ExprRewritable<Logical> for LogicalOverWindow {}
592
593impl ExprVisitable for LogicalOverWindow {}
594
595impl PredicatePushdown for LogicalOverWindow {
596    fn predicate_pushdown(
597        &self,
598        predicate: Condition,
599        ctx: &mut PredicatePushdownContext,
600    ) -> PlanRef {
601        if !self.core.funcs_have_same_partition_and_order() {
602            // Window function calls with different PARTITION BY and ORDER BY clauses are not split yet.
603            return LogicalFilter::create(self.clone().into(), predicate);
604        }
605
606        let all_out_cols: FixedBitSet = (0..self.schema().len()).collect();
607        let mut remain_cols: FixedBitSet = all_out_cols
608            .difference(&self.partition_key_indices().into_iter().collect())
609            .collect();
610        remain_cols.grow(self.schema().len());
611
612        let (remain_pred, pushed_pred) = predicate.split_disjoint(&remain_cols);
613        gen_filter_and_pushdown(self, remain_pred, pushed_pred, ctx)
614    }
615}
616
617macro_rules! empty_partition_by_not_implemented {
618    () => {
619        bail_not_implemented!(
620            issue = 11505,
621            "Window function with empty PARTITION BY is not supported because of potential bad performance. \
622            If you really need this, please workaround with something like `PARTITION BY 1::int`."
623        )
624    };
625}
626
627impl ToBatch for LogicalOverWindow {
628    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
629        assert!(
630            self.core.funcs_have_same_partition_and_order(),
631            "must apply OverWindowSplitRule before generating physical plan"
632        );
633
634        // TODO(rc): Let's not introduce too many cases at once. Later we may decide to support
635        // empty PARTITION BY by simply removing the following check.
636        let partition_key_indices = self.window_functions()[0]
637            .partition_by
638            .iter()
639            .map(|e| e.index())
640            .collect_vec();
641        if partition_key_indices.is_empty() {
642            empty_partition_by_not_implemented!();
643        }
644
645        let input = self.input().to_batch()?;
646        let core = self.core.clone_with_input(input);
647        Ok(BatchOverWindow::new(core).into())
648    }
649}
650
651impl ToStream for LogicalOverWindow {
652    fn to_stream(
653        &self,
654        ctx: &mut ToStreamContext,
655    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
656        use super::stream::prelude::*;
657
658        assert!(
659            self.core.funcs_have_same_partition_and_order(),
660            "must apply OverWindowSplitRule before generating physical plan"
661        );
662
663        let stream_input = self.core.input.to_stream(ctx)?;
664
665        if ctx.emit_on_window_close() {
666            // Emit-On-Window-Close case
667
668            let order_by = &self.window_functions()[0].order_by;
669            if order_by.len() != 1 || order_by[0].order_type != OrderType::ascending() {
670                return Err(ErrorCode::InvalidInputSyntax(
671                    "Only support window functions order by single column and in ascending order"
672                        .to_owned(),
673                )
674                .into());
675            }
676            if !stream_input
677                .watermark_columns()
678                .contains(order_by[0].column_index)
679            {
680                return Err(ErrorCode::InvalidInputSyntax(
681                    "The column ordered by must be a watermark column".to_owned(),
682                )
683                .into());
684            }
685            let order_key_index = order_by[0].column_index;
686
687            let partition_key_indices = self.window_functions()[0]
688                .partition_by
689                .iter()
690                .map(|e| e.index())
691                .collect_vec();
692            if partition_key_indices.is_empty() {
693                empty_partition_by_not_implemented!();
694            }
695
696            let sort_input =
697                RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
698                    .streaming_enforce_if_not_satisfies(stream_input)?;
699            let sort = StreamEowcSort::new(sort_input, order_key_index);
700
701            let core = self.core.clone_with_input(sort.into());
702            Ok(StreamEowcOverWindow::new(core).into())
703        } else {
704            // General (Emit-On-Update) case
705
706            if self
707                .window_functions()
708                .iter()
709                .any(|f| f.frame.bounds.is_session())
710            {
711                bail_not_implemented!(
712                    "Session frame is not yet supported in general streaming mode. \
713                    Please consider using Emit-On-Window-Close mode."
714                );
715            }
716
717            // TODO(rc): Let's not introduce too many cases at once. Later we may decide to support
718            // empty PARTITION BY by simply removing the following check.
719            let partition_key_indices = self.window_functions()[0]
720                .partition_by
721                .iter()
722                .map(|e| e.index())
723                .collect_vec();
724            if partition_key_indices.is_empty() {
725                empty_partition_by_not_implemented!();
726            }
727
728            let new_input =
729                RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
730                    .streaming_enforce_if_not_satisfies(stream_input)?;
731            let core = self.core.clone_with_input(new_input);
732
733            Ok(StreamOverWindow::new(core)?.into())
734        }
735    }
736
737    fn logical_rewrite_for_stream(
738        &self,
739        ctx: &mut RewriteStreamContext,
740    ) -> Result<(PlanRef, ColIndexMapping)> {
741        let (input, input_col_change) = self.core.input.logical_rewrite_for_stream(ctx)?;
742        let (new_self, output_col_change) = self.rewrite_with_input(input, input_col_change);
743        Ok((new_self.into(), output_col_change))
744    }
745}