risingwave_frontend/optimizer/plan_node/
stream_exchange.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::stream_plan::stream_node::NodeBody;
19use risingwave_pb::stream_plan::{
20    DispatchStrategy, DispatcherType, ExchangeNode, PbDispatchOutputMapping,
21};
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record, plan_node_name};
25use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef};
26use crate::Explain as _;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::generic::LOCAL_PHASE_VNODE_COLUMN_NAME;
29use crate::optimizer::property::{
30    Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
31};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34/// `StreamExchange` imposes a particular distribution on its input
35/// without changing its content.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamExchange {
38    pub base: PlanBase<Stream>,
39    input: PlanRef,
40    no_shuffle: bool,
41}
42
43impl StreamExchange {
44    pub fn new(input: PlanRef, dist: Distribution) -> Self {
45        assert_matches!(
46            dist,
47            Distribution::HashShard(_) | Distribution::Single | Distribution::Broadcast,
48            "exchange can not be used to enforce such distribution"
49        );
50
51        // For non-append-only distributed stream, check that distribution key is a subset of stream key
52        // for input plan.
53        //
54        // Otherwise, the changes on the same stream key might already be on different parallelism, and
55        // merging them with this exchange could break the correct ordering, leading to inconsistent
56        // stream or data loss.
57        //
58        // An exception is two-phase aggregation or top-n, where we used the `rw_vnode(dist_key)` as the
59        // group key (thus stream key). In this case, changes on the same stream key (then vnode value)
60        // must be on the same parallelism based on our scheduling algorithm, thus it's safe to shuffle.
61        if !input.append_only()
62            && input.distribution() != &Distribution::Single
63            && let Some(input_dist_key) = input.distribution().dist_column_indices_opt()
64            && let Some(input_stream_key) = input.stream_key()
65            && input_stream_key
66                .iter()
67                .all(|k| input.schema()[*k].name != LOCAL_PHASE_VNODE_COLUMN_NAME)
68        {
69            assert!(
70                input_dist_key
71                    .iter()
72                    .all(|idx| input_stream_key.contains(idx)),
73                "distribution key must be a subset of stream key before shuffle to a different distribution\n\
74                 - dist_key: {input_dist_key:?}\n\
75                 - stream_key: {input_stream_key:?}\n\
76                 - schema: {}\n\
77                 - plan:\n{}",
78                input.schema().formatted_col_names(),
79                input.explain_to_string()
80            );
81        }
82
83        let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
84            // If the input is a singleton, the monotonicity will be preserved during shuffle
85            // since we use ordered channel/buffer when exchanging data.
86            input.columns_monotonicity().clone()
87        } else {
88            MonotonicityMap::new()
89        };
90        assert!(!input.schema().is_empty());
91        let base = PlanBase::new_stream(
92            input.ctx(),
93            input.schema().clone(),
94            input.stream_key().map(|v| v.to_vec()),
95            input.functional_dependency().clone(),
96            dist,
97            input.stream_kind(), // stream kind property won't change
98            input.emit_on_window_close(),
99            input.watermark_columns().clone(),
100            columns_monotonicity,
101        );
102        StreamExchange {
103            base,
104            input,
105            no_shuffle: false,
106        }
107    }
108
109    pub fn new_no_shuffle(input: PlanRef) -> Self {
110        let ctx = input.ctx();
111        let base = PlanBase::new_stream(
112            ctx,
113            input.schema().clone(),
114            input.stream_key().map(|v| v.to_vec()),
115            input.functional_dependency().clone(),
116            input.distribution().clone(),
117            input.stream_kind(), // stream kind property won't change
118            input.emit_on_window_close(),
119            input.watermark_columns().clone(),
120            input.columns_monotonicity().clone(),
121        );
122        StreamExchange {
123            base,
124            input,
125            no_shuffle: true,
126        }
127    }
128
129    pub fn no_shuffle(&self) -> bool {
130        self.no_shuffle
131    }
132}
133
134impl Distill for StreamExchange {
135    fn distill<'a>(&self) -> XmlNode<'a> {
136        let distribution_display = DistributionDisplay {
137            distribution: self.base.distribution(),
138            input_schema: self.input.schema(),
139        };
140        childless_record(
141            plan_node_name!(
142                "StreamExchange",
143                { "no_shuffle", self.no_shuffle },
144            ),
145            vec![("dist", Pretty::display(&distribution_display))],
146        )
147    }
148}
149
150impl PlanTreeNodeUnary<Stream> for StreamExchange {
151    fn input(&self) -> PlanRef {
152        self.input.clone()
153    }
154
155    fn clone_with_input(&self, input: PlanRef) -> Self {
156        if self.no_shuffle {
157            Self::new_no_shuffle(input)
158        } else {
159            Self::new(input, self.distribution().clone())
160        }
161    }
162}
163impl_plan_tree_node_for_unary! { Stream, StreamExchange}
164
165impl StreamNode for StreamExchange {
166    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
167        let output_mapping = PbDispatchOutputMapping::identical(self.schema().len()).into();
168
169        NodeBody::Exchange(Box::new(ExchangeNode {
170            strategy: if self.no_shuffle {
171                Some(DispatchStrategy {
172                    r#type: DispatcherType::NoShuffle as i32,
173                    dist_key_indices: vec![],
174                    output_mapping,
175                })
176            } else {
177                Some(DispatchStrategy {
178                    r#type: match &self.base.distribution() {
179                        Distribution::HashShard(_) => DispatcherType::Hash,
180                        Distribution::Single => DispatcherType::Simple,
181                        Distribution::Broadcast => DispatcherType::Broadcast,
182                        _ => panic!("Do not allow Any or AnyShard in serialization process"),
183                    } as i32,
184                    dist_key_indices: match &self.base.distribution() {
185                        Distribution::HashShard(keys) => {
186                            keys.iter().map(|num| *num as u32).collect()
187                        }
188                        _ => vec![],
189                    },
190                    output_mapping,
191                })
192            },
193        }))
194    }
195}
196
197impl ExprRewritable<Stream> for StreamExchange {}
198
199impl ExprVisitable for StreamExchange {}