risingwave_frontend/optimizer/plan_node/
stream_exchange.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::stream_plan::stream_node::NodeBody;
19use risingwave_pb::stream_plan::{
20    DispatchStrategy, DispatcherType, ExchangeNode, PbDispatchOutputMapping,
21};
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record, plan_node_name};
25use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{
28    Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
29};
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32/// `StreamExchange` imposes a particular distribution on its input
33/// without changing its content.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct StreamExchange {
36    pub base: PlanBase<Stream>,
37    input: PlanRef,
38    no_shuffle: bool,
39}
40
41impl StreamExchange {
42    pub fn new(input: PlanRef, dist: Distribution) -> Self {
43        assert_matches!(
44            dist,
45            Distribution::HashShard(_) | Distribution::Single | Distribution::Broadcast,
46            "exchange can not be used to enforce such distribution"
47        );
48
49        let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
50            // If the input is a singleton, the monotonicity will be preserved during shuffle
51            // since we use ordered channel/buffer when exchanging data.
52            input.columns_monotonicity().clone()
53        } else {
54            MonotonicityMap::new()
55        };
56        assert!(!input.schema().is_empty());
57        let base = PlanBase::new_stream(
58            input.ctx(),
59            input.schema().clone(),
60            input.stream_key().map(|v| v.to_vec()),
61            input.functional_dependency().clone(),
62            dist,
63            input.stream_kind(), // stream kind property won't change
64            input.emit_on_window_close(),
65            input.watermark_columns().clone(),
66            columns_monotonicity,
67        );
68        StreamExchange {
69            base,
70            input,
71            no_shuffle: false,
72        }
73    }
74
75    pub fn new_no_shuffle(input: PlanRef) -> Self {
76        let ctx = input.ctx();
77        let base = PlanBase::new_stream(
78            ctx,
79            input.schema().clone(),
80            input.stream_key().map(|v| v.to_vec()),
81            input.functional_dependency().clone(),
82            input.distribution().clone(),
83            input.stream_kind(), // stream kind property won't change
84            input.emit_on_window_close(),
85            input.watermark_columns().clone(),
86            input.columns_monotonicity().clone(),
87        );
88        StreamExchange {
89            base,
90            input,
91            no_shuffle: true,
92        }
93    }
94
95    pub fn no_shuffle(&self) -> bool {
96        self.no_shuffle
97    }
98}
99
100impl Distill for StreamExchange {
101    fn distill<'a>(&self) -> XmlNode<'a> {
102        let distribution_display = DistributionDisplay {
103            distribution: self.base.distribution(),
104            input_schema: self.input.schema(),
105        };
106        childless_record(
107            plan_node_name!(
108                "StreamExchange",
109                { "no_shuffle", self.no_shuffle },
110            ),
111            vec![("dist", Pretty::display(&distribution_display))],
112        )
113    }
114}
115
116impl PlanTreeNodeUnary<Stream> for StreamExchange {
117    fn input(&self) -> PlanRef {
118        self.input.clone()
119    }
120
121    fn clone_with_input(&self, input: PlanRef) -> Self {
122        if self.no_shuffle {
123            Self::new_no_shuffle(input)
124        } else {
125            Self::new(input, self.distribution().clone())
126        }
127    }
128}
129impl_plan_tree_node_for_unary! { Stream, StreamExchange}
130
131impl StreamNode for StreamExchange {
132    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
133        let output_mapping = PbDispatchOutputMapping::identical(self.schema().len()).into();
134
135        NodeBody::Exchange(Box::new(ExchangeNode {
136            strategy: if self.no_shuffle {
137                Some(DispatchStrategy {
138                    r#type: DispatcherType::NoShuffle as i32,
139                    dist_key_indices: vec![],
140                    output_mapping,
141                })
142            } else {
143                Some(DispatchStrategy {
144                    r#type: match &self.base.distribution() {
145                        Distribution::HashShard(_) => DispatcherType::Hash,
146                        Distribution::Single => DispatcherType::Simple,
147                        Distribution::Broadcast => DispatcherType::Broadcast,
148                        _ => panic!("Do not allow Any or AnyShard in serialization process"),
149                    } as i32,
150                    dist_key_indices: match &self.base.distribution() {
151                        Distribution::HashShard(keys) => {
152                            keys.iter().map(|num| *num as u32).collect()
153                        }
154                        _ => vec![],
155                    },
156                    output_mapping,
157                })
158            },
159        }))
160    }
161}
162
163impl ExprRewritable<Stream> for StreamExchange {}
164
165impl ExprVisitable for StreamExchange {}