risingwave_frontend/optimizer/plan_node/
batch_exchange.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode};
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
22use crate::error::Result;
23use crate::optimizer::plan_node::ToLocalBatch;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, DistributionDisplay, Order, OrderDisplay};
26
27/// `BatchExchange` imposes a particular distribution on its input
28/// without changing its content.
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchExchange {
31    pub base: PlanBase<Batch>,
32    input: PlanRef,
33    // sequential means each tasks of the exchange node will be executed sequentially.
34    // Currently, it is used to avoid spawn too many tasks for limit operator.
35    sequential: bool,
36}
37
38impl BatchExchange {
39    pub fn new(input: PlanRef, order: Order, dist: Distribution) -> Self {
40        Self::new_inner(input, order, dist, false)
41    }
42
43    pub fn new_with_sequential(input: PlanRef, order: Order, dist: Distribution) -> Self {
44        Self::new_inner(input, order, dist, true)
45    }
46
47    fn new_inner(input: PlanRef, order: Order, dist: Distribution, sequential: bool) -> Self {
48        let ctx = input.ctx();
49        let schema = input.schema().clone();
50        let base = PlanBase::new_batch(ctx, schema, dist, order);
51        BatchExchange {
52            base,
53            input,
54            sequential,
55        }
56    }
57}
58
59impl Distill for BatchExchange {
60    fn distill<'a>(&self) -> XmlNode<'a> {
61        let input_schema = self.input.schema();
62        let order = OrderDisplay {
63            order: self.base.order(),
64            input_schema,
65        }
66        .distill();
67        let dist = Pretty::display(&DistributionDisplay {
68            distribution: self.base.distribution(),
69            input_schema,
70        });
71        let mut fields = vec![("order", order), ("dist", dist)];
72        if self.sequential {
73            fields.push(("sequential", Pretty::display(&true)));
74        }
75        childless_record("BatchExchange", fields)
76    }
77}
78
79impl PlanTreeNodeUnary for BatchExchange {
80    fn input(&self) -> PlanRef {
81        self.input.clone()
82    }
83
84    fn clone_with_input(&self, input: PlanRef) -> Self {
85        Self::new_inner(
86            input,
87            self.order().clone(),
88            self.distribution().clone(),
89            self.sequential,
90        )
91    }
92}
93impl_plan_tree_node_for_unary! {BatchExchange}
94
95impl ToDistributedBatch for BatchExchange {
96    fn to_distributed(&self) -> Result<PlanRef> {
97        unreachable!()
98    }
99}
100
101/// The serialization of Batch Exchange is default cuz it will be rewritten in scheduler.
102impl ToBatchPb for BatchExchange {
103    fn to_batch_prost_body(&self) -> NodeBody {
104        if self.base.order().is_any() {
105            NodeBody::Exchange(ExchangeNode {
106                sources: vec![],
107                sequential: self.sequential,
108                input_schema: self.base.schema().to_prost(),
109            })
110        } else {
111            assert!(!self.sequential);
112            NodeBody::MergeSortExchange(MergeSortExchangeNode {
113                exchange: Some(ExchangeNode {
114                    sources: vec![],
115                    sequential: self.sequential,
116                    input_schema: self.base.schema().to_prost(),
117                }),
118                column_orders: self.base.order().to_protobuf(),
119            })
120        }
121    }
122}
123
124impl ToLocalBatch for BatchExchange {
125    fn to_local(&self) -> Result<PlanRef> {
126        unreachable!()
127    }
128}
129
130impl ExprRewritable for BatchExchange {}
131
132impl ExprVisitable for BatchExchange {}