risingwave_frontend/optimizer/plan_node/
batch_over_window.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
16use risingwave_pb::batch_plan::SortOverWindowNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::generic::PlanWindowFunction;
21use super::utils::impl_distill_by_unit;
22use super::{
23    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
24    ToDistributedBatch, ToLocalBatch, generic,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchOverWindow {
32    pub base: PlanBase<Batch>,
33    core: generic::OverWindow<PlanRef>,
34}
35
36impl BatchOverWindow {
37    pub fn new(core: generic::OverWindow<PlanRef>) -> Self {
38        assert!(core.funcs_have_same_partition_and_order());
39
40        let input = &core.input;
41        let input_dist = input.distribution().clone();
42
43        let order = Order::new(
44            core.partition_key_indices()
45                .into_iter()
46                .map(|idx| ColumnOrder::new(idx, OrderType::default()))
47                .chain(core.order_key().iter().cloned())
48                .collect(),
49        );
50
51        let base = PlanBase::new_batch_with_core(&core, input_dist, order);
52        BatchOverWindow { base, core }
53    }
54
55    fn expected_input_order(&self) -> Order {
56        self.order().clone()
57    }
58}
59
60impl_distill_by_unit!(BatchOverWindow, core, "BatchOverWindow");
61
62impl PlanTreeNodeUnary<Batch> for BatchOverWindow {
63    fn input(&self) -> PlanRef {
64        self.core.input.clone()
65    }
66
67    fn clone_with_input(&self, input: PlanRef) -> Self {
68        let mut core = self.core.clone();
69        core.input = input;
70        Self::new(core)
71    }
72}
73
74impl_plan_tree_node_for_unary! { Batch, BatchOverWindow }
75
76impl ToDistributedBatch for BatchOverWindow {
77    fn to_distributed(&self) -> Result<PlanRef> {
78        let partition_key_indices = self.core.partition_key_indices();
79        let required_dist = if partition_key_indices.is_empty() {
80            RequiredDist::single()
81        } else {
82            RequiredDist::shard_by_key(self.input().schema().len(), &partition_key_indices)
83        };
84        let new_input = self
85            .input()
86            .to_distributed_with_required(&self.expected_input_order(), &required_dist)?;
87        Ok(self.clone_with_input(new_input).into())
88    }
89}
90
91impl ToLocalBatch for BatchOverWindow {
92    fn to_local(&self) -> Result<PlanRef> {
93        let new_input = self.input().to_local()?;
94        let new_input = RequiredDist::single()
95            .batch_enforce_if_not_satisfies(new_input, &self.expected_input_order())?;
96        Ok(self.clone_with_input(new_input).into())
97    }
98}
99
100impl ToBatchPb for BatchOverWindow {
101    fn to_batch_prost_body(&self) -> NodeBody {
102        let calls = self
103            .core
104            .window_functions()
105            .iter()
106            .map(PlanWindowFunction::to_protobuf)
107            .collect();
108        let partition_by = self
109            .core
110            .partition_key_indices()
111            .into_iter()
112            .map(|idx| idx as _)
113            .collect();
114        let order_by = self
115            .core
116            .order_key()
117            .iter()
118            .copied()
119            .map(ColumnOrder::to_protobuf)
120            .collect();
121
122        NodeBody::SortOverWindow(SortOverWindowNode {
123            calls,
124            partition_by,
125            order_by,
126        })
127    }
128}
129
130impl ExprRewritable<Batch> for BatchOverWindow {}
131
132impl ExprVisitable for BatchOverWindow {}