risingwave_frontend/optimizer/plan_node/
stream_exchange.rs1use std::assert_matches::assert_matches;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::stream_plan::stream_node::NodeBody;
19use risingwave_pb::stream_plan::{
20 DispatchStrategy, DispatcherType, ExchangeNode, PbDispatchOutputMapping,
21};
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record, plan_node_name};
25use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{
28 Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
29};
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct StreamExchange {
36 pub base: PlanBase<Stream>,
37 input: PlanRef,
38 no_shuffle: bool,
39}
40
41impl StreamExchange {
42 pub fn new(input: PlanRef, dist: Distribution) -> Self {
43 assert_matches!(
44 dist,
45 Distribution::HashShard(_) | Distribution::Single | Distribution::Broadcast,
46 "exchange can not be used to enforce such distribution"
47 );
48
49 let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
50 input.columns_monotonicity().clone()
53 } else {
54 MonotonicityMap::new()
55 };
56 assert!(!input.schema().is_empty());
57 let base = PlanBase::new_stream(
58 input.ctx(),
59 input.schema().clone(),
60 input.stream_key().map(|v| v.to_vec()),
61 input.functional_dependency().clone(),
62 dist,
63 input.stream_kind(), input.emit_on_window_close(),
65 input.watermark_columns().clone(),
66 columns_monotonicity,
67 );
68 StreamExchange {
69 base,
70 input,
71 no_shuffle: false,
72 }
73 }
74
75 pub fn new_no_shuffle(input: PlanRef) -> Self {
76 let ctx = input.ctx();
77 let base = PlanBase::new_stream(
78 ctx,
79 input.schema().clone(),
80 input.stream_key().map(|v| v.to_vec()),
81 input.functional_dependency().clone(),
82 input.distribution().clone(),
83 input.stream_kind(), input.emit_on_window_close(),
85 input.watermark_columns().clone(),
86 input.columns_monotonicity().clone(),
87 );
88 StreamExchange {
89 base,
90 input,
91 no_shuffle: true,
92 }
93 }
94
95 pub fn no_shuffle(&self) -> bool {
96 self.no_shuffle
97 }
98}
99
100impl Distill for StreamExchange {
101 fn distill<'a>(&self) -> XmlNode<'a> {
102 let distribution_display = DistributionDisplay {
103 distribution: self.base.distribution(),
104 input_schema: self.input.schema(),
105 };
106 childless_record(
107 plan_node_name!(
108 "StreamExchange",
109 { "no_shuffle", self.no_shuffle },
110 ),
111 vec![("dist", Pretty::display(&distribution_display))],
112 )
113 }
114}
115
116impl PlanTreeNodeUnary<Stream> for StreamExchange {
117 fn input(&self) -> PlanRef {
118 self.input.clone()
119 }
120
121 fn clone_with_input(&self, input: PlanRef) -> Self {
122 if self.no_shuffle {
123 Self::new_no_shuffle(input)
124 } else {
125 Self::new(input, self.distribution().clone())
126 }
127 }
128}
129impl_plan_tree_node_for_unary! { Stream, StreamExchange}
130
131impl StreamNode for StreamExchange {
132 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
133 let output_mapping = PbDispatchOutputMapping::identical(self.schema().len()).into();
134
135 NodeBody::Exchange(Box::new(ExchangeNode {
136 strategy: if self.no_shuffle {
137 Some(DispatchStrategy {
138 r#type: DispatcherType::NoShuffle as i32,
139 dist_key_indices: vec![],
140 output_mapping,
141 })
142 } else {
143 Some(DispatchStrategy {
144 r#type: match &self.base.distribution() {
145 Distribution::HashShard(_) => DispatcherType::Hash,
146 Distribution::Single => DispatcherType::Simple,
147 Distribution::Broadcast => DispatcherType::Broadcast,
148 _ => panic!("Do not allow Any or AnyShard in serialization process"),
149 } as i32,
150 dist_key_indices: match &self.base.distribution() {
151 Distribution::HashShard(keys) => {
152 keys.iter().map(|num| *num as u32).collect()
153 }
154 _ => vec![],
155 },
156 output_mapping,
157 })
158 },
159 }))
160 }
161}
162
163impl ExprRewritable<Stream> for StreamExchange {}
164
165impl ExprVisitable for StreamExchange {}