risingwave_frontend/optimizer/plan_node/
stream_exchange.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17use risingwave_pb::stream_plan::{
18 DispatchStrategy, DispatcherType, ExchangeNode, PbDispatchOutputMapping,
19};
20
21use super::stream::prelude::*;
22use super::utils::{Distill, childless_record, plan_node_name};
23use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{
26 Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
27};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamExchange {
34 pub base: PlanBase<Stream>,
35 input: PlanRef,
36 no_shuffle: bool,
37}
38
39impl StreamExchange {
40 pub fn new(input: PlanRef, dist: Distribution) -> Self {
41 let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
42 input.columns_monotonicity().clone()
45 } else {
46 MonotonicityMap::new()
47 };
48 assert!(!input.schema().is_empty());
49 let base = PlanBase::new_stream(
50 input.ctx(),
51 input.schema().clone(),
52 input.stream_key().map(|v| v.to_vec()),
53 input.functional_dependency().clone(),
54 dist,
55 input.append_only(), input.emit_on_window_close(),
57 input.watermark_columns().clone(),
58 columns_monotonicity,
59 );
60 StreamExchange {
61 base,
62 input,
63 no_shuffle: false,
64 }
65 }
66
67 pub fn new_no_shuffle(input: PlanRef) -> Self {
68 let ctx = input.ctx();
69 let base = PlanBase::new_stream(
70 ctx,
71 input.schema().clone(),
72 input.stream_key().map(|v| v.to_vec()),
73 input.functional_dependency().clone(),
74 input.distribution().clone(),
75 input.append_only(), input.emit_on_window_close(),
77 input.watermark_columns().clone(),
78 input.columns_monotonicity().clone(),
79 );
80 StreamExchange {
81 base,
82 input,
83 no_shuffle: true,
84 }
85 }
86
87 pub fn no_shuffle(&self) -> bool {
88 self.no_shuffle
89 }
90}
91
92impl Distill for StreamExchange {
93 fn distill<'a>(&self) -> XmlNode<'a> {
94 let distribution_display = DistributionDisplay {
95 distribution: self.base.distribution(),
96 input_schema: self.input.schema(),
97 };
98 childless_record(
99 plan_node_name!(
100 "StreamExchange",
101 { "no_shuffle", self.no_shuffle },
102 ),
103 vec![("dist", Pretty::display(&distribution_display))],
104 )
105 }
106}
107
108impl PlanTreeNodeUnary for StreamExchange {
109 fn input(&self) -> PlanRef {
110 self.input.clone()
111 }
112
113 fn clone_with_input(&self, input: PlanRef) -> Self {
114 if self.no_shuffle {
115 Self::new_no_shuffle(input)
116 } else {
117 Self::new(input, self.distribution().clone())
118 }
119 }
120}
121impl_plan_tree_node_for_unary! {StreamExchange}
122
123impl StreamNode for StreamExchange {
124 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
125 let output_mapping = PbDispatchOutputMapping::identical(self.schema().len()).into();
126
127 NodeBody::Exchange(Box::new(ExchangeNode {
128 strategy: if self.no_shuffle {
129 Some(DispatchStrategy {
130 r#type: DispatcherType::NoShuffle as i32,
131 dist_key_indices: vec![],
132 output_mapping,
133 })
134 } else {
135 Some(DispatchStrategy {
136 r#type: match &self.base.distribution() {
137 Distribution::HashShard(_) => DispatcherType::Hash,
138 Distribution::Single => DispatcherType::Simple,
139 Distribution::Broadcast => DispatcherType::Broadcast,
140 _ => panic!("Do not allow Any or AnyShard in serialization process"),
141 } as i32,
142 dist_key_indices: match &self.base.distribution() {
143 Distribution::HashShard(keys) => {
144 keys.iter().map(|num| *num as u32).collect()
145 }
146 _ => vec![],
147 },
148 output_mapping,
149 })
150 },
151 }))
152 }
153}
154
155impl ExprRewritable for StreamExchange {}
156
157impl ExprVisitable for StreamExchange {}