risingwave_frontend/optimizer/plan_node/
batch_exchange.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode};
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
23    ToDistributedBatch,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Distribution, DistributionDisplay, Order, OrderDisplay};
29
30/// `BatchExchange` imposes a particular distribution on its input
31/// without changing its content.
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchExchange {
34    pub base: PlanBase<Batch>,
35    input: PlanRef,
36    // sequential means each tasks of the exchange node will be executed sequentially.
37    // Currently, it is used to avoid spawn too many tasks for limit operator.
38    sequential: bool,
39}
40
41impl BatchExchange {
42    pub fn new(input: PlanRef, order: Order, dist: Distribution) -> Self {
43        Self::new_inner(input, order, dist, false)
44    }
45
46    pub fn new_with_sequential(input: PlanRef, order: Order, dist: Distribution) -> Self {
47        Self::new_inner(input, order, dist, true)
48    }
49
50    fn new_inner(input: PlanRef, order: Order, dist: Distribution, sequential: bool) -> Self {
51        let ctx = input.ctx();
52        let schema = input.schema().clone();
53        let base = PlanBase::new_batch(ctx, schema, dist, order);
54        BatchExchange {
55            base,
56            input,
57            sequential,
58        }
59    }
60}
61
62impl Distill for BatchExchange {
63    fn distill<'a>(&self) -> XmlNode<'a> {
64        let input_schema = self.input.schema();
65        let order = OrderDisplay {
66            order: self.base.order(),
67            input_schema,
68        }
69        .distill();
70        let dist = Pretty::display(&DistributionDisplay {
71            distribution: self.base.distribution(),
72            input_schema,
73        });
74        let mut fields = vec![("order", order), ("dist", dist)];
75        if self.sequential {
76            fields.push(("sequential", Pretty::display(&true)));
77        }
78        childless_record("BatchExchange", fields)
79    }
80}
81
82impl PlanTreeNodeUnary<Batch> for BatchExchange {
83    fn input(&self) -> PlanRef {
84        self.input.clone()
85    }
86
87    fn clone_with_input(&self, input: PlanRef) -> Self {
88        Self::new_inner(
89            input,
90            self.order().clone(),
91            self.distribution().clone(),
92            self.sequential,
93        )
94    }
95}
96impl_plan_tree_node_for_unary! { Batch, BatchExchange}
97
98impl ToDistributedBatch for BatchExchange {
99    fn to_distributed(&self) -> Result<PlanRef> {
100        unreachable!()
101    }
102}
103
104/// The serialization of Batch Exchange is default cuz it will be rewritten in scheduler.
105impl ToBatchPb for BatchExchange {
106    fn to_batch_prost_body(&self) -> NodeBody {
107        if self.base.order().is_any() {
108            NodeBody::Exchange(ExchangeNode {
109                sources: vec![],
110                sequential: self.sequential,
111                input_schema: self.base.schema().to_prost(),
112            })
113        } else {
114            assert!(!self.sequential);
115            NodeBody::MergeSortExchange(MergeSortExchangeNode {
116                exchange: Some(ExchangeNode {
117                    sources: vec![],
118                    sequential: self.sequential,
119                    input_schema: self.base.schema().to_prost(),
120                }),
121                column_orders: self.base.order().to_protobuf(),
122            })
123        }
124    }
125}
126
127impl ToLocalBatch for BatchExchange {
128    fn to_local(&self) -> Result<PlanRef> {
129        unreachable!()
130    }
131}
132
133impl ExprRewritable<Batch> for BatchExchange {}
134
135impl ExprVisitable for BatchExchange {}