risingwave_frontend/optimizer/plan_node/
batch_over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
16use risingwave_pb::batch_plan::SortOverWindowNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::generic::PlanWindowFunction;
21use super::utils::impl_distill_by_unit;
22use super::{
23    ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
24    ToLocalBatch, generic,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchOverWindow {
32    pub base: PlanBase<Batch>,
33    core: generic::OverWindow<PlanRef>,
34}
35
36impl BatchOverWindow {
37    pub fn new(core: generic::OverWindow<PlanRef>) -> Self {
38        assert!(core.funcs_have_same_partition_and_order());
39
40        let input = &core.input;
41        let input_dist = input.distribution().clone();
42
43        let order = Order::new(
44            core.partition_key_indices()
45                .into_iter()
46                .map(|idx| ColumnOrder::new(idx, OrderType::default()))
47                .chain(core.order_key().iter().cloned())
48                .collect(),
49        );
50
51        let base = PlanBase::new_batch_with_core(&core, input_dist, order);
52        BatchOverWindow { base, core }
53    }
54
55    fn expected_input_order(&self) -> Order {
56        self.order().clone()
57    }
58}
59
60impl_distill_by_unit!(BatchOverWindow, core, "BatchOverWindow");
61
62impl PlanTreeNodeUnary for BatchOverWindow {
63    fn input(&self) -> PlanRef {
64        self.core.input.clone()
65    }
66
67    fn clone_with_input(&self, input: PlanRef) -> Self {
68        let mut core = self.core.clone();
69        core.input = input;
70        Self::new(core)
71    }
72}
73
74impl_plan_tree_node_for_unary! { BatchOverWindow }
75
76impl ToDistributedBatch for BatchOverWindow {
77    fn to_distributed(&self) -> Result<PlanRef> {
78        let new_input = self.input().to_distributed_with_required(
79            &self.expected_input_order(),
80            &RequiredDist::shard_by_key(
81                self.input().schema().len(),
82                &self.core.partition_key_indices(),
83            ),
84        )?;
85        Ok(self.clone_with_input(new_input).into())
86    }
87}
88
89impl ToLocalBatch for BatchOverWindow {
90    fn to_local(&self) -> Result<PlanRef> {
91        let new_input = self.input().to_local()?;
92        let new_input = RequiredDist::single()
93            .enforce_if_not_satisfies(new_input, &self.expected_input_order())?;
94        Ok(self.clone_with_input(new_input).into())
95    }
96}
97
98impl ToBatchPb for BatchOverWindow {
99    fn to_batch_prost_body(&self) -> NodeBody {
100        let calls = self
101            .core
102            .window_functions()
103            .iter()
104            .map(PlanWindowFunction::to_protobuf)
105            .collect();
106        let partition_by = self
107            .core
108            .partition_key_indices()
109            .into_iter()
110            .map(|idx| idx as _)
111            .collect();
112        let order_by = self
113            .core
114            .order_key()
115            .iter()
116            .map(ColumnOrder::to_protobuf)
117            .collect();
118
119        NodeBody::SortOverWindow(SortOverWindowNode {
120            calls,
121            partition_by,
122            order_by,
123        })
124    }
125}
126
127impl ExprRewritable for BatchOverWindow {}
128
129impl ExprVisitable for BatchOverWindow {}