risingwave_frontend/optimizer/plan_node/
stream_exchange.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::stream_node::NodeBody;
17use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, plan_node_name};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{
24 Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
25};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamExchange {
32 pub base: PlanBase<Stream>,
33 input: PlanRef,
34 no_shuffle: bool,
35}
36
37impl StreamExchange {
38 pub fn new(input: PlanRef, dist: Distribution) -> Self {
39 let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
40 input.columns_monotonicity().clone()
43 } else {
44 MonotonicityMap::new()
45 };
46 assert!(!input.schema().is_empty());
47 let base = PlanBase::new_stream(
48 input.ctx(),
49 input.schema().clone(),
50 input.stream_key().map(|v| v.to_vec()),
51 input.functional_dependency().clone(),
52 dist,
53 input.append_only(), input.emit_on_window_close(),
55 input.watermark_columns().clone(),
56 columns_monotonicity,
57 );
58 StreamExchange {
59 base,
60 input,
61 no_shuffle: false,
62 }
63 }
64
65 pub fn new_no_shuffle(input: PlanRef) -> Self {
66 let ctx = input.ctx();
67 let base = PlanBase::new_stream(
68 ctx,
69 input.schema().clone(),
70 input.stream_key().map(|v| v.to_vec()),
71 input.functional_dependency().clone(),
72 input.distribution().clone(),
73 input.append_only(), input.emit_on_window_close(),
75 input.watermark_columns().clone(),
76 input.columns_monotonicity().clone(),
77 );
78 StreamExchange {
79 base,
80 input,
81 no_shuffle: true,
82 }
83 }
84
85 pub fn no_shuffle(&self) -> bool {
86 self.no_shuffle
87 }
88}
89
90impl Distill for StreamExchange {
91 fn distill<'a>(&self) -> XmlNode<'a> {
92 let distribution_display = DistributionDisplay {
93 distribution: self.base.distribution(),
94 input_schema: self.input.schema(),
95 };
96 childless_record(
97 plan_node_name!(
98 "StreamExchange",
99 { "no_shuffle", self.no_shuffle },
100 ),
101 vec![("dist", Pretty::display(&distribution_display))],
102 )
103 }
104}
105
106impl PlanTreeNodeUnary for StreamExchange {
107 fn input(&self) -> PlanRef {
108 self.input.clone()
109 }
110
111 fn clone_with_input(&self, input: PlanRef) -> Self {
112 if self.no_shuffle {
113 Self::new_no_shuffle(input)
114 } else {
115 Self::new(input, self.distribution().clone())
116 }
117 }
118}
119impl_plan_tree_node_for_unary! {StreamExchange}
120
121impl StreamNode for StreamExchange {
122 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
123 NodeBody::Exchange(Box::new(ExchangeNode {
124 strategy: if self.no_shuffle {
125 Some(DispatchStrategy {
126 r#type: DispatcherType::NoShuffle as i32,
127 dist_key_indices: vec![],
128 output_indices: (0..self.schema().len() as u32).collect(),
129 })
130 } else {
131 Some(DispatchStrategy {
132 r#type: match &self.base.distribution() {
133 Distribution::HashShard(_) => DispatcherType::Hash,
134 Distribution::Single => DispatcherType::Simple,
135 Distribution::Broadcast => DispatcherType::Broadcast,
136 _ => panic!("Do not allow Any or AnyShard in serialization process"),
137 } as i32,
138 dist_key_indices: match &self.base.distribution() {
139 Distribution::HashShard(keys) => {
140 keys.iter().map(|num| *num as u32).collect()
141 }
142 _ => vec![],
143 },
144 output_indices: (0..self.schema().len() as u32).collect(),
145 })
146 },
147 }))
148 }
149}
150
151impl ExprRewritable for StreamExchange {}
152
153impl ExprVisitable for StreamExchange {}