risingwave_frontend/optimizer/plan_node/
stream_exchange.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, plan_node_name};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{
24    Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
25};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28/// `StreamExchange` imposes a particular distribution on its input
29/// without changing its content.
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamExchange {
32    pub base: PlanBase<Stream>,
33    input: PlanRef,
34    no_shuffle: bool,
35}
36
37impl StreamExchange {
38    pub fn new(input: PlanRef, dist: Distribution) -> Self {
39        let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
40            // If the input is a singleton, the monotonicity will be preserved during shuffle
41            // since we use ordered channel/buffer when exchanging data.
42            input.columns_monotonicity().clone()
43        } else {
44            MonotonicityMap::new()
45        };
46        assert!(!input.schema().is_empty());
47        let base = PlanBase::new_stream(
48            input.ctx(),
49            input.schema().clone(),
50            input.stream_key().map(|v| v.to_vec()),
51            input.functional_dependency().clone(),
52            dist,
53            input.append_only(), // append-only property won't change
54            input.emit_on_window_close(),
55            input.watermark_columns().clone(),
56            columns_monotonicity,
57        );
58        StreamExchange {
59            base,
60            input,
61            no_shuffle: false,
62        }
63    }
64
65    pub fn new_no_shuffle(input: PlanRef) -> Self {
66        let ctx = input.ctx();
67        let base = PlanBase::new_stream(
68            ctx,
69            input.schema().clone(),
70            input.stream_key().map(|v| v.to_vec()),
71            input.functional_dependency().clone(),
72            input.distribution().clone(),
73            input.append_only(), // append-only property won't change
74            input.emit_on_window_close(),
75            input.watermark_columns().clone(),
76            input.columns_monotonicity().clone(),
77        );
78        StreamExchange {
79            base,
80            input,
81            no_shuffle: true,
82        }
83    }
84
85    pub fn no_shuffle(&self) -> bool {
86        self.no_shuffle
87    }
88}
89
90impl Distill for StreamExchange {
91    fn distill<'a>(&self) -> XmlNode<'a> {
92        let distribution_display = DistributionDisplay {
93            distribution: self.base.distribution(),
94            input_schema: self.input.schema(),
95        };
96        childless_record(
97            plan_node_name!(
98                "StreamExchange",
99                { "no_shuffle", self.no_shuffle },
100            ),
101            vec![("dist", Pretty::display(&distribution_display))],
102        )
103    }
104}
105
106impl PlanTreeNodeUnary for StreamExchange {
107    fn input(&self) -> PlanRef {
108        self.input.clone()
109    }
110
111    fn clone_with_input(&self, input: PlanRef) -> Self {
112        if self.no_shuffle {
113            Self::new_no_shuffle(input)
114        } else {
115            Self::new(input, self.distribution().clone())
116        }
117    }
118}
119impl_plan_tree_node_for_unary! {StreamExchange}
120
121impl StreamNode for StreamExchange {
122    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
123        NodeBody::Exchange(Box::new(ExchangeNode {
124            strategy: if self.no_shuffle {
125                Some(DispatchStrategy {
126                    r#type: DispatcherType::NoShuffle as i32,
127                    dist_key_indices: vec![],
128                    output_indices: (0..self.schema().len() as u32).collect(),
129                })
130            } else {
131                Some(DispatchStrategy {
132                    r#type: match &self.base.distribution() {
133                        Distribution::HashShard(_) => DispatcherType::Hash,
134                        Distribution::Single => DispatcherType::Simple,
135                        Distribution::Broadcast => DispatcherType::Broadcast,
136                        _ => panic!("Do not allow Any or AnyShard in serialization process"),
137                    } as i32,
138                    dist_key_indices: match &self.base.distribution() {
139                        Distribution::HashShard(keys) => {
140                            keys.iter().map(|num| *num as u32).collect()
141                        }
142                        _ => vec![],
143                    },
144                    output_indices: (0..self.schema().len() as u32).collect(),
145                })
146            },
147        }))
148    }
149}
150
151impl ExprRewritable for StreamExchange {}
152
153impl ExprVisitable for StreamExchange {}