risingwave_frontend/optimizer/plan_node/
stream_exchange.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17use risingwave_pb::stream_plan::{
18    DispatchStrategy, DispatcherType, ExchangeNode, PbDispatchOutputMapping,
19};
20
21use super::stream::prelude::*;
22use super::utils::{Distill, childless_record, plan_node_name};
23use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{
26    Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
27};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30/// `StreamExchange` imposes a particular distribution on its input
31/// without changing its content.
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamExchange {
34    pub base: PlanBase<Stream>,
35    input: PlanRef,
36    no_shuffle: bool,
37}
38
39impl StreamExchange {
40    pub fn new(input: PlanRef, dist: Distribution) -> Self {
41        let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
42            // If the input is a singleton, the monotonicity will be preserved during shuffle
43            // since we use ordered channel/buffer when exchanging data.
44            input.columns_monotonicity().clone()
45        } else {
46            MonotonicityMap::new()
47        };
48        assert!(!input.schema().is_empty());
49        let base = PlanBase::new_stream(
50            input.ctx(),
51            input.schema().clone(),
52            input.stream_key().map(|v| v.to_vec()),
53            input.functional_dependency().clone(),
54            dist,
55            input.append_only(), // append-only property won't change
56            input.emit_on_window_close(),
57            input.watermark_columns().clone(),
58            columns_monotonicity,
59        );
60        StreamExchange {
61            base,
62            input,
63            no_shuffle: false,
64        }
65    }
66
67    pub fn new_no_shuffle(input: PlanRef) -> Self {
68        let ctx = input.ctx();
69        let base = PlanBase::new_stream(
70            ctx,
71            input.schema().clone(),
72            input.stream_key().map(|v| v.to_vec()),
73            input.functional_dependency().clone(),
74            input.distribution().clone(),
75            input.append_only(), // append-only property won't change
76            input.emit_on_window_close(),
77            input.watermark_columns().clone(),
78            input.columns_monotonicity().clone(),
79        );
80        StreamExchange {
81            base,
82            input,
83            no_shuffle: true,
84        }
85    }
86
87    pub fn no_shuffle(&self) -> bool {
88        self.no_shuffle
89    }
90}
91
92impl Distill for StreamExchange {
93    fn distill<'a>(&self) -> XmlNode<'a> {
94        let distribution_display = DistributionDisplay {
95            distribution: self.base.distribution(),
96            input_schema: self.input.schema(),
97        };
98        childless_record(
99            plan_node_name!(
100                "StreamExchange",
101                { "no_shuffle", self.no_shuffle },
102            ),
103            vec![("dist", Pretty::display(&distribution_display))],
104        )
105    }
106}
107
108impl PlanTreeNodeUnary for StreamExchange {
109    fn input(&self) -> PlanRef {
110        self.input.clone()
111    }
112
113    fn clone_with_input(&self, input: PlanRef) -> Self {
114        if self.no_shuffle {
115            Self::new_no_shuffle(input)
116        } else {
117            Self::new(input, self.distribution().clone())
118        }
119    }
120}
121impl_plan_tree_node_for_unary! {StreamExchange}
122
123impl StreamNode for StreamExchange {
124    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
125        let output_mapping = PbDispatchOutputMapping::identical(self.schema().len()).into();
126
127        NodeBody::Exchange(Box::new(ExchangeNode {
128            strategy: if self.no_shuffle {
129                Some(DispatchStrategy {
130                    r#type: DispatcherType::NoShuffle as i32,
131                    dist_key_indices: vec![],
132                    output_mapping,
133                })
134            } else {
135                Some(DispatchStrategy {
136                    r#type: match &self.base.distribution() {
137                        Distribution::HashShard(_) => DispatcherType::Hash,
138                        Distribution::Single => DispatcherType::Simple,
139                        Distribution::Broadcast => DispatcherType::Broadcast,
140                        _ => panic!("Do not allow Any or AnyShard in serialization process"),
141                    } as i32,
142                    dist_key_indices: match &self.base.distribution() {
143                        Distribution::HashShard(keys) => {
144                            keys.iter().map(|num| *num as u32).collect()
145                        }
146                        _ => vec![],
147                    },
148                    output_mapping,
149                })
150            },
151        }))
152    }
153}
154
155impl ExprRewritable for StreamExchange {}
156
157impl ExprVisitable for StreamExchange {}