risingwave_frontend/optimizer/plan_node/
stream_exchange.rs1use std::assert_matches::assert_matches;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::stream_plan::stream_node::NodeBody;
19use risingwave_pb::stream_plan::{
20 DispatchStrategy, DispatcherType, ExchangeNode, PbDispatchOutputMapping,
21};
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record, plan_node_name};
25use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef};
26use crate::Explain as _;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::generic::LOCAL_PHASE_VNODE_COLUMN_NAME;
29use crate::optimizer::property::{
30 Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
31};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamExchange {
38 pub base: PlanBase<Stream>,
39 input: PlanRef,
40 no_shuffle: bool,
41}
42
43impl StreamExchange {
44 pub fn new(input: PlanRef, dist: Distribution) -> Self {
45 assert_matches!(
46 dist,
47 Distribution::HashShard(_) | Distribution::Single | Distribution::Broadcast,
48 "exchange can not be used to enforce such distribution"
49 );
50
51 if !input.append_only()
62 && input.distribution() != &Distribution::Single
63 && let Some(input_dist_key) = input.distribution().dist_column_indices_opt()
64 && let Some(input_stream_key) = input.stream_key()
65 && input_stream_key
66 .iter()
67 .all(|k| input.schema()[*k].name != LOCAL_PHASE_VNODE_COLUMN_NAME)
68 {
69 assert!(
70 input_dist_key
71 .iter()
72 .all(|idx| input_stream_key.contains(idx)),
73 "distribution key must be a subset of stream key before shuffle to a different distribution\n\
74 - dist_key: {input_dist_key:?}\n\
75 - stream_key: {input_stream_key:?}\n\
76 - schema: {}\n\
77 - plan:\n{}",
78 input.schema().formatted_col_names(),
79 input.explain_to_string()
80 );
81 }
82
83 let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
84 input.columns_monotonicity().clone()
87 } else {
88 MonotonicityMap::new()
89 };
90 assert!(!input.schema().is_empty());
91 let base = PlanBase::new_stream(
92 input.ctx(),
93 input.schema().clone(),
94 input.stream_key().map(|v| v.to_vec()),
95 input.functional_dependency().clone(),
96 dist,
97 input.stream_kind(), input.emit_on_window_close(),
99 input.watermark_columns().clone(),
100 columns_monotonicity,
101 );
102 StreamExchange {
103 base,
104 input,
105 no_shuffle: false,
106 }
107 }
108
109 pub fn new_no_shuffle(input: PlanRef) -> Self {
110 let ctx = input.ctx();
111 let base = PlanBase::new_stream(
112 ctx,
113 input.schema().clone(),
114 input.stream_key().map(|v| v.to_vec()),
115 input.functional_dependency().clone(),
116 input.distribution().clone(),
117 input.stream_kind(), input.emit_on_window_close(),
119 input.watermark_columns().clone(),
120 input.columns_monotonicity().clone(),
121 );
122 StreamExchange {
123 base,
124 input,
125 no_shuffle: true,
126 }
127 }
128
129 pub fn no_shuffle(&self) -> bool {
130 self.no_shuffle
131 }
132}
133
134impl Distill for StreamExchange {
135 fn distill<'a>(&self) -> XmlNode<'a> {
136 let distribution_display = DistributionDisplay {
137 distribution: self.base.distribution(),
138 input_schema: self.input.schema(),
139 };
140 childless_record(
141 plan_node_name!(
142 "StreamExchange",
143 { "no_shuffle", self.no_shuffle },
144 ),
145 vec![("dist", Pretty::display(&distribution_display))],
146 )
147 }
148}
149
150impl PlanTreeNodeUnary<Stream> for StreamExchange {
151 fn input(&self) -> PlanRef {
152 self.input.clone()
153 }
154
155 fn clone_with_input(&self, input: PlanRef) -> Self {
156 if self.no_shuffle {
157 Self::new_no_shuffle(input)
158 } else {
159 Self::new(input, self.distribution().clone())
160 }
161 }
162}
163impl_plan_tree_node_for_unary! { Stream, StreamExchange}
164
165impl StreamNode for StreamExchange {
166 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
167 let output_mapping = PbDispatchOutputMapping::identical(self.schema().len()).into();
168
169 NodeBody::Exchange(Box::new(ExchangeNode {
170 strategy: if self.no_shuffle {
171 Some(DispatchStrategy {
172 r#type: DispatcherType::NoShuffle as i32,
173 dist_key_indices: vec![],
174 output_mapping,
175 })
176 } else {
177 Some(DispatchStrategy {
178 r#type: match &self.base.distribution() {
179 Distribution::HashShard(_) => DispatcherType::Hash,
180 Distribution::Single => DispatcherType::Simple,
181 Distribution::Broadcast => DispatcherType::Broadcast,
182 _ => panic!("Do not allow Any or AnyShard in serialization process"),
183 } as i32,
184 dist_key_indices: match &self.base.distribution() {
185 Distribution::HashShard(keys) => {
186 keys.iter().map(|num| *num as u32).collect()
187 }
188 _ => vec![],
189 },
190 output_mapping,
191 })
192 },
193 }))
194 }
195}
196
197impl ExprRewritable<Stream> for StreamExchange {}
198
199impl ExprVisitable for StreamExchange {}