risingwave_frontend/optimizer/plan_node/
logical_over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
19use risingwave_common::{bail_not_implemented, not_implemented};
20use risingwave_expr::aggregate::{AggType, PbAggKind, agg_types};
21use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind};
22
23use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder};
24use super::utils::impl_distill_by_unit;
25use super::{
26    BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase,
27    PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, StreamEowcSort,
28    StreamOverWindow, ToBatch, ToStream, gen_filter_and_pushdown,
29};
30use crate::error::{ErrorCode, Result, RwError};
31use crate::expr::{
32    AggCall, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef,
33    WindowFunction,
34};
35use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
36use crate::optimizer::plan_node::logical_agg::LogicalAggBuilder;
37use crate::optimizer::plan_node::{
38    ColumnPruningContext, Literal, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
39};
40use crate::optimizer::property::{Order, RequiredDist};
41use crate::utils::{ColIndexMapping, Condition, IndexSet};
42
43struct LogicalOverWindowBuilder<'a> {
44    /// the builder of the input Project
45    input_proj_builder: &'a ProjectBuilder,
46    /// the window functions
47    window_functions: &'a mut Vec<WindowFunction>,
48    /// the error during the expression rewriting
49    error: Option<RwError>,
50}
51
52impl<'a> LogicalOverWindowBuilder<'a> {
53    fn new(
54        input_proj_builder: &'a ProjectBuilder,
55        window_functions: &'a mut Vec<WindowFunction>,
56    ) -> Result<Self> {
57        Ok(Self {
58            input_proj_builder,
59            window_functions,
60            error: None,
61        })
62    }
63
64    fn rewrite_selected_items(&mut self, selected_items: Vec<ExprImpl>) -> Result<Vec<ExprImpl>> {
65        let mut rewritten_items = vec![];
66        for expr in selected_items {
67            let rewritten_expr = self.rewrite_expr(expr);
68            if let Some(error) = self.error.take() {
69                return Err(error);
70            } else {
71                rewritten_items.push(rewritten_expr);
72            }
73        }
74        Ok(rewritten_items)
75    }
76
77    fn schema_over_window_start_offset(&self) -> usize {
78        self.input_proj_builder.exprs_len()
79    }
80
81    fn push_window_func(&mut self, window_func: WindowFunction) -> InputRef {
82        if let Some((pos, existing)) = self
83            .window_functions
84            .iter()
85            .find_position(|&w| w == &window_func)
86        {
87            return InputRef::new(
88                self.schema_over_window_start_offset() + pos,
89                existing.return_type.clone(),
90            );
91        }
92        let index = self.schema_over_window_start_offset() + self.window_functions.len();
93        let data_type = window_func.return_type.clone();
94        self.window_functions.push(window_func);
95        InputRef::new(index, data_type)
96    }
97
98    fn try_rewrite_window_function(&mut self, window_func: WindowFunction) -> Result<ExprImpl> {
99        let WindowFunction {
100            kind,
101            args,
102            return_type,
103            partition_by,
104            order_by,
105            ignore_nulls,
106            frame,
107        } = window_func;
108
109        let new_expr = if let WindowFuncKind::Aggregate(agg_type) = &kind
110            && matches!(agg_type, agg_types::rewritten!())
111        {
112            let agg_call = AggCall::new(
113                agg_type.clone(),
114                args,
115                false,
116                order_by,
117                Condition::true_cond(),
118                vec![],
119            )?;
120            LogicalAggBuilder::general_rewrite_agg_call(agg_call, |agg_call| {
121                Ok(self.push_window_func(
122                    // AggCall -> WindowFunction
123                    WindowFunction::new(
124                        WindowFuncKind::Aggregate(agg_call.agg_type),
125                        agg_call.args.clone(),
126                        false, // we don't support `IGNORE NULLS` for these functions now
127                        partition_by.clone(),
128                        agg_call.order_by.clone(),
129                        frame.clone(),
130                    )?,
131                ))
132            })?
133        } else {
134            ExprImpl::from(self.push_window_func(WindowFunction::new(
135                kind,
136                args,
137                ignore_nulls,
138                partition_by,
139                order_by,
140                frame,
141            )?))
142        };
143
144        assert_eq!(new_expr.return_type(), return_type);
145        Ok(new_expr)
146    }
147}
148
149impl ExprRewriter for LogicalOverWindowBuilder<'_> {
150    fn rewrite_window_function(&mut self, window_func: WindowFunction) -> ExprImpl {
151        let dummy = Literal::new(None, window_func.return_type()).into();
152        match self.try_rewrite_window_function(window_func) {
153            Ok(expr) => expr,
154            Err(err) => {
155                self.error = Some(err);
156                dummy
157            }
158        }
159    }
160
161    fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
162        let input_expr = input_ref.into();
163        let index = self.input_proj_builder.expr_index(&input_expr).unwrap();
164        ExprImpl::from(InputRef::new(index, input_expr.return_type()))
165    }
166}
167
168/// Build columns from window function `args` / `partition_by` / `order_by`
169struct OverWindowProjectBuilder<'a> {
170    builder: &'a mut ProjectBuilder,
171    error: Option<ErrorCode>,
172}
173
174impl<'a> OverWindowProjectBuilder<'a> {
175    fn new(builder: &'a mut ProjectBuilder) -> Self {
176        Self {
177            builder,
178            error: None,
179        }
180    }
181
182    fn try_visit_window_function(
183        &mut self,
184        window_function: &WindowFunction,
185    ) -> std::result::Result<(), ErrorCode> {
186        if let WindowFuncKind::Aggregate(agg_type) = &window_function.kind
187            && matches!(
188                agg_type,
189                AggType::Builtin(
190                    PbAggKind::StddevPop
191                        | PbAggKind::StddevSamp
192                        | PbAggKind::VarPop
193                        | PbAggKind::VarSamp
194                )
195            )
196        {
197            let input = window_function.args.iter().exactly_one().unwrap();
198            let squared_input_expr = ExprImpl::from(
199                FunctionCall::new(ExprType::Multiply, vec![input.clone(), input.clone()]).unwrap(),
200            );
201            self.builder
202                .add_expr(&squared_input_expr)
203                .map_err(|err| not_implemented!("{err} inside args"))?;
204        }
205        for arg in &window_function.args {
206            self.builder
207                .add_expr(arg)
208                .map_err(|err| not_implemented!("{err} inside args"))?;
209        }
210        for partition_by in &window_function.partition_by {
211            self.builder
212                .add_expr(partition_by)
213                .map_err(|err| not_implemented!("{err} inside partition_by"))?;
214        }
215        for order_by in window_function.order_by.sort_exprs.iter().map(|e| &e.expr) {
216            self.builder
217                .add_expr(order_by)
218                .map_err(|err| not_implemented!("{err} inside order_by"))?;
219        }
220        Ok(())
221    }
222}
223
224impl ExprVisitor for OverWindowProjectBuilder<'_> {
225    fn visit_window_function(&mut self, window_function: &WindowFunction) {
226        if let Err(e) = self.try_visit_window_function(window_function) {
227            self.error = Some(e);
228        }
229    }
230}
231
232/// `LogicalOverWindow` performs `OVER` window functions to its input.
233///
234/// The output schema is the input schema plus the window functions.
235#[derive(Debug, Clone, PartialEq, Eq, Hash)]
236pub struct LogicalOverWindow {
237    pub base: PlanBase<Logical>,
238    core: OverWindow<PlanRef>,
239}
240
241impl LogicalOverWindow {
242    pub fn new(calls: Vec<PlanWindowFunction>, input: PlanRef) -> Self {
243        let core = OverWindow::new(calls, input);
244        let base = PlanBase::new_logical_with_core(&core);
245        Self { base, core }
246    }
247
248    fn build_input_proj(input: PlanRef, select_exprs: &[ExprImpl]) -> Result<ProjectBuilder> {
249        let mut input_proj_builder = ProjectBuilder::default();
250        // Add and check input columns
251        for (idx, field) in input.schema().fields().iter().enumerate() {
252            input_proj_builder
253                .add_expr(&InputRef::new(idx, field.data_type()).into())
254                .map_err(|err| not_implemented!("{err} inside input"))?;
255        }
256        let mut build_input_proj_visitor = OverWindowProjectBuilder::new(&mut input_proj_builder);
257        for expr in select_exprs {
258            build_input_proj_visitor.visit_expr(expr);
259            if let Some(error) = build_input_proj_visitor.error.take() {
260                return Err(error.into());
261            }
262        }
263        Ok(input_proj_builder)
264    }
265
266    pub fn create(input: PlanRef, select_exprs: Vec<ExprImpl>) -> Result<(PlanRef, Vec<ExprImpl>)> {
267        let input_proj_builder = Self::build_input_proj(input.clone(), &select_exprs)?;
268
269        let mut window_functions = vec![];
270        let mut over_window_builder =
271            LogicalOverWindowBuilder::new(&input_proj_builder, &mut window_functions)?;
272
273        let rewritten_selected_items = over_window_builder.rewrite_selected_items(select_exprs)?;
274
275        for window_func in &window_functions {
276            if window_func.kind.is_numbering() && window_func.order_by.sort_exprs.is_empty() {
277                return Err(ErrorCode::InvalidInputSyntax(format!(
278                    "window rank function without order by: {:?}",
279                    window_func
280                ))
281                .into());
282            }
283        }
284
285        let plan_window_funcs = window_functions
286            .into_iter()
287            .map(|x| Self::convert_window_function(x, &input_proj_builder))
288            .try_collect()?;
289
290        Ok((
291            Self::new(
292                plan_window_funcs,
293                LogicalProject::with_core(input_proj_builder.build(input)).into(),
294            )
295            .into(),
296            rewritten_selected_items,
297        ))
298    }
299
300    fn convert_window_function(
301        window_function: WindowFunction,
302        input_proj_builder: &ProjectBuilder,
303    ) -> Result<PlanWindowFunction> {
304        let order_by = window_function
305            .order_by
306            .sort_exprs
307            .into_iter()
308            .map(|e| {
309                ColumnOrder::new(
310                    input_proj_builder.expr_index(&e.expr).unwrap(),
311                    e.order_type,
312                )
313            })
314            .collect_vec();
315        let partition_by = window_function
316            .partition_by
317            .into_iter()
318            .map(|e| InputRef::new(input_proj_builder.expr_index(&e).unwrap(), e.return_type()))
319            .collect_vec();
320
321        let mut args = window_function.args;
322        let (kind, frame) = match window_function.kind {
323            WindowFuncKind::RowNumber | WindowFuncKind::Rank | WindowFuncKind::DenseRank => {
324                // ignore user-defined frame for rank functions, also, rank functions only care
325                // about the rows before current row
326                (
327                    window_function.kind,
328                    Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
329                )
330            }
331            WindowFuncKind::Lag | WindowFuncKind::Lead => {
332                // `lag(x, const offset N) over ()`
333                //     == `first_value(x) over (rows between N preceding and N preceding)`
334                // `lead(x, const offset N) over ()`
335                //     == `first_value(x) over (rows between N following and N following)`
336                assert!(!window_function.ignore_nulls); // the conversion is not applicable to `LAG`/`LEAD` with `IGNORE NULLS`
337
338                let offset = if args.len() > 1 {
339                    let offset_expr = args.remove(1);
340                    if !offset_expr.return_type().is_int() {
341                        return Err(ErrorCode::InvalidInputSyntax(format!(
342                            "the `offset` of `{}` function should be integer",
343                            window_function.kind
344                        ))
345                        .into());
346                    }
347                    let const_offset = offset_expr.cast_implicit(DataType::Int64)?.try_fold_const();
348                    if const_offset.is_none() {
349                        // should already be checked in `WindowFunction::infer_return_type`,
350                        // but just in case
351                        bail_not_implemented!(
352                            "non-const `offset` of `lag`/`lead` is not supported yet"
353                        );
354                    }
355                    const_offset.unwrap()?.map(|v| *v.as_int64()).unwrap_or(1)
356                } else {
357                    1
358                };
359                let sign = if window_function.kind == WindowFuncKind::Lag {
360                    -1
361                } else {
362                    1
363                };
364                let abs_offset = offset.unsigned_abs() as usize;
365                let frame = if sign * offset <= 0 {
366                    Frame::rows(
367                        FrameBound::Preceding(abs_offset),
368                        FrameBound::Preceding(abs_offset),
369                    )
370                } else {
371                    Frame::rows(
372                        FrameBound::Following(abs_offset),
373                        FrameBound::Following(abs_offset),
374                    )
375                };
376
377                (
378                    WindowFuncKind::Aggregate(AggType::Builtin(PbAggKind::FirstValue)),
379                    frame,
380                )
381            }
382            WindowFuncKind::Aggregate(_) => {
383                let frame = window_function.frame.unwrap_or({
384                    // FIXME(rc): The following 2 cases should both be `Frame::Range(Unbounded,
385                    // CurrentRow)` but we don't support yet.
386                    if order_by.is_empty() {
387                        Frame::rows(
388                            FrameBound::UnboundedPreceding,
389                            FrameBound::UnboundedFollowing,
390                        )
391                    } else {
392                        Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow)
393                    }
394                });
395                (window_function.kind, frame)
396            }
397        };
398
399        let args = args
400            .into_iter()
401            .map(|e| InputRef::new(input_proj_builder.expr_index(&e).unwrap(), e.return_type()))
402            .collect_vec();
403
404        Ok(PlanWindowFunction {
405            kind,
406            return_type: window_function.return_type,
407            args,
408            ignore_nulls: window_function.ignore_nulls,
409            partition_by,
410            order_by,
411            frame,
412        })
413    }
414
415    pub fn window_functions(&self) -> &[PlanWindowFunction] {
416        &self.core.window_functions
417    }
418
419    pub fn partition_key_indices(&self) -> Vec<usize> {
420        self.core.partition_key_indices()
421    }
422
423    pub fn order_key(&self) -> &[ColumnOrder] {
424        self.core.order_key()
425    }
426
427    #[must_use]
428    fn rewrite_with_input_and_window(
429        &self,
430        input: PlanRef,
431        window_functions: &[PlanWindowFunction],
432        input_col_change: ColIndexMapping,
433    ) -> Self {
434        let window_functions = window_functions
435            .iter()
436            .cloned()
437            .map(|mut window_function| {
438                window_function.args.iter_mut().for_each(|i| {
439                    *i = InputRef::new(input_col_change.map(i.index()), i.return_type())
440                });
441                window_function.order_by.iter_mut().for_each(|o| {
442                    o.column_index = input_col_change.map(o.column_index);
443                });
444                window_function.partition_by.iter_mut().for_each(|i| {
445                    *i = InputRef::new(input_col_change.map(i.index()), i.return_type())
446                });
447                window_function
448            })
449            .collect();
450        Self::new(window_functions, input)
451    }
452
453    pub fn split_with_rule(&self, groups: Vec<Vec<usize>>) -> PlanRef {
454        assert!(groups.iter().flatten().all_unique());
455        assert!(
456            groups
457                .iter()
458                .flatten()
459                .all(|&idx| idx < self.window_functions().len())
460        );
461
462        let input_len = self.input().schema().len();
463        let original_out_fields = (0..input_len + self.window_functions().len()).collect_vec();
464        let mut out_fields = original_out_fields.clone();
465        let mut cur_input = self.input();
466        let mut cur_node = self.clone();
467        let mut cur_win_func_pos = input_len;
468        for func_indices in &groups {
469            cur_node = Self::new(
470                func_indices
471                    .iter()
472                    .map(|&idx| {
473                        let func = &self.window_functions()[idx];
474                        out_fields[input_len + idx] = cur_win_func_pos;
475                        cur_win_func_pos += 1;
476                        func.clone()
477                    })
478                    .collect_vec(),
479                cur_input.clone(),
480            );
481            cur_input = cur_node.clone().into();
482        }
483        if out_fields == original_out_fields {
484            cur_node.into()
485        } else {
486            LogicalProject::with_out_col_idx(cur_node.into(), out_fields.into_iter()).into()
487        }
488    }
489
490    pub fn decompose(self) -> (PlanRef, Vec<PlanWindowFunction>) {
491        self.core.decompose()
492    }
493}
494
495impl PlanTreeNodeUnary for LogicalOverWindow {
496    fn input(&self) -> PlanRef {
497        self.core.input.clone()
498    }
499
500    fn clone_with_input(&self, input: PlanRef) -> Self {
501        Self::new(self.core.window_functions.clone(), input)
502    }
503
504    fn rewrite_with_input(
505        &self,
506        input: PlanRef,
507        input_col_change: ColIndexMapping,
508    ) -> (Self, ColIndexMapping) {
509        let input_len = self.core.input_len();
510        let new_input_len = input.schema().len();
511        let output_len = self.core.output_len();
512        let new_output_len = new_input_len + self.window_functions().len();
513        let output_col_change = {
514            let mut mapping = ColIndexMapping::empty(output_len, new_output_len);
515            for win_func_idx in 0..self.window_functions().len() {
516                mapping.put(input_len + win_func_idx, Some(new_input_len + win_func_idx));
517            }
518            mapping.union(&input_col_change)
519        };
520        let new_self =
521            self.rewrite_with_input_and_window(input, self.window_functions(), input_col_change);
522        (new_self, output_col_change)
523    }
524}
525
526impl_plan_tree_node_for_unary! { LogicalOverWindow }
527impl_distill_by_unit!(LogicalOverWindow, core, "LogicalOverWindow");
528
529impl ColPrunable for LogicalOverWindow {
530    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
531        let input_len = self.input().schema().len();
532
533        let (req_cols_input_part, req_cols_win_func_part) = {
534            let mut in_input = required_cols.to_vec();
535            let in_win_funcs: IndexSet = in_input.extract_if(.., |i| *i >= input_len).collect();
536            (IndexSet::from(in_input), in_win_funcs)
537        };
538
539        if req_cols_win_func_part.is_empty() {
540            // no window function is needed
541            return self.input().prune_col(&req_cols_input_part.to_vec(), ctx);
542        }
543
544        let (input_cols_required_by_this, window_functions) = {
545            let mut tmp = IndexSet::empty();
546            let new_window_functions = req_cols_win_func_part
547                .indices()
548                .map(|idx| self.window_functions()[idx - input_len].clone())
549                .inspect(|func| {
550                    tmp.extend(func.args.iter().map(|x| x.index()));
551                    tmp.extend(func.partition_by.iter().map(|x| x.index()));
552                    tmp.extend(func.order_by.iter().map(|x| x.column_index));
553                })
554                .collect_vec();
555            (tmp, new_window_functions)
556        };
557
558        let input_required_cols = (req_cols_input_part | input_cols_required_by_this).to_vec();
559        let input_col_change =
560            ColIndexMapping::with_remaining_columns(&input_required_cols, input_len);
561        let new_self = {
562            let input = self.input().prune_col(&input_required_cols, ctx);
563            self.rewrite_with_input_and_window(input, &window_functions, input_col_change)
564        };
565        if new_self.schema().len() == required_cols.len() {
566            // current schema perfectly fit the required columns
567            new_self.into()
568        } else {
569            // some columns are not needed so we did a projection to remove the columns.
570            let mut new_output_cols = input_required_cols.clone();
571            new_output_cols.extend(required_cols.iter().filter(|&&x| x >= input_len));
572            let mapping =
573                &ColIndexMapping::with_remaining_columns(&new_output_cols, self.schema().len());
574            let output_required_cols = required_cols
575                .iter()
576                .map(|&idx| mapping.map(idx))
577                .collect_vec();
578            let src_size = new_self.schema().len();
579            LogicalProject::with_mapping(
580                new_self.into(),
581                ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
582            )
583            .into()
584        }
585    }
586}
587
588impl ExprRewritable for LogicalOverWindow {}
589
590impl ExprVisitable for LogicalOverWindow {}
591
592impl PredicatePushdown for LogicalOverWindow {
593    fn predicate_pushdown(
594        &self,
595        predicate: Condition,
596        ctx: &mut PredicatePushdownContext,
597    ) -> PlanRef {
598        if !self.core.funcs_have_same_partition_and_order() {
599            // Window function calls with different PARTITION BY and ORDER BY clauses are not split yet.
600            return LogicalFilter::create(self.clone().into(), predicate);
601        }
602
603        let all_out_cols: FixedBitSet = (0..self.schema().len()).collect();
604        let mut remain_cols: FixedBitSet = all_out_cols
605            .difference(&self.partition_key_indices().into_iter().collect())
606            .collect();
607        remain_cols.grow(self.schema().len());
608
609        let (remain_pred, pushed_pred) = predicate.split_disjoint(&remain_cols);
610        gen_filter_and_pushdown(self, remain_pred, pushed_pred, ctx)
611    }
612}
613
614macro_rules! empty_partition_by_not_implemented {
615    () => {
616        bail_not_implemented!(
617            issue = 11505,
618            "Window function with empty PARTITION BY is not supported because of potential bad performance. \
619            If you really need this, please workaround with something like `PARTITION BY 1::int`."
620        )
621    };
622}
623
624impl ToBatch for LogicalOverWindow {
625    fn to_batch(&self) -> Result<PlanRef> {
626        assert!(
627            self.core.funcs_have_same_partition_and_order(),
628            "must apply OverWindowSplitRule before generating physical plan"
629        );
630
631        // TODO(rc): Let's not introduce too many cases at once. Later we may decide to support
632        // empty PARTITION BY by simply removing the following check.
633        let partition_key_indices = self.window_functions()[0]
634            .partition_by
635            .iter()
636            .map(|e| e.index())
637            .collect_vec();
638        if partition_key_indices.is_empty() {
639            empty_partition_by_not_implemented!();
640        }
641
642        let input = self.input().to_batch()?;
643        let new_logical = OverWindow {
644            input,
645            ..self.core.clone()
646        };
647        Ok(BatchOverWindow::new(new_logical).into())
648    }
649}
650
651impl ToStream for LogicalOverWindow {
652    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
653        use super::stream::prelude::*;
654
655        assert!(
656            self.core.funcs_have_same_partition_and_order(),
657            "must apply OverWindowSplitRule before generating physical plan"
658        );
659
660        let stream_input = self.core.input.to_stream(ctx)?;
661
662        if ctx.emit_on_window_close() {
663            // Emit-On-Window-Close case
664
665            let order_by = &self.window_functions()[0].order_by;
666            if order_by.len() != 1 || order_by[0].order_type != OrderType::ascending() {
667                return Err(ErrorCode::InvalidInputSyntax(
668                    "Only support window functions order by single column and in ascending order"
669                        .to_owned(),
670                )
671                .into());
672            }
673            if !stream_input
674                .watermark_columns()
675                .contains(order_by[0].column_index)
676            {
677                return Err(ErrorCode::InvalidInputSyntax(
678                    "The column ordered by must be a watermark column".to_owned(),
679                )
680                .into());
681            }
682            let order_key_index = order_by[0].column_index;
683
684            let partition_key_indices = self.window_functions()[0]
685                .partition_by
686                .iter()
687                .map(|e| e.index())
688                .collect_vec();
689            if partition_key_indices.is_empty() {
690                empty_partition_by_not_implemented!();
691            }
692
693            let sort_input =
694                RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
695                    .enforce_if_not_satisfies(stream_input, &Order::any())?;
696            let sort = StreamEowcSort::new(sort_input, order_key_index);
697
698            let mut core = self.core.clone();
699            core.input = sort.into();
700            Ok(StreamEowcOverWindow::new(core).into())
701        } else {
702            // General (Emit-On-Update) case
703
704            if self
705                .window_functions()
706                .iter()
707                .any(|f| f.frame.bounds.is_session())
708            {
709                bail_not_implemented!(
710                    "Session frame is not yet supported in general streaming mode. \
711                    Please consider using Emit-On-Window-Close mode."
712                );
713            }
714
715            // TODO(rc): Let's not introduce too many cases at once. Later we may decide to support
716            // empty PARTITION BY by simply removing the following check.
717            let partition_key_indices = self.window_functions()[0]
718                .partition_by
719                .iter()
720                .map(|e| e.index())
721                .collect_vec();
722            if partition_key_indices.is_empty() {
723                empty_partition_by_not_implemented!();
724            }
725
726            let new_input =
727                RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
728                    .enforce_if_not_satisfies(stream_input, &Order::any())?;
729            let mut core = self.core.clone();
730            core.input = new_input;
731            Ok(StreamOverWindow::new(core).into())
732        }
733    }
734
735    fn logical_rewrite_for_stream(
736        &self,
737        ctx: &mut RewriteStreamContext,
738    ) -> Result<(PlanRef, ColIndexMapping)> {
739        let (input, input_col_change) = self.core.input.logical_rewrite_for_stream(ctx)?;
740        let (new_self, output_col_change) = self.rewrite_with_input(input, input_col_change);
741        Ok((new_self.into(), output_col_change))
742    }
743}