1use std::rc::Rc;
16
17use itertools::Itertools;
18use risingwave_pb::plan_common::PbField;
19use risingwave_pb::stream_plan::lookup_node::ArrangementTableId;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21use risingwave_pb::stream_plan::{
22 DispatchStrategy, DispatcherType, ExchangeNode, LookupNode, LookupUnionNode,
23 PbDispatchOutputMapping, StreamNode,
24};
25
26use super::super::{BuildFragmentGraphState, StreamFragment, StreamFragmentEdge};
27use crate::error::ErrorCode::NotSupported;
28use crate::error::Result;
29use crate::stream_fragmenter::build_and_add_fragment;
30
31fn build_no_shuffle_exchange_for_delta_join(
32 state: &mut BuildFragmentGraphState,
33 upstream: &StreamNode,
34) -> StreamNode {
35 StreamNode {
36 operator_id: state.gen_operator_id() as u64,
37 identity: "NO SHUFFLE Exchange (Lookup and Merge)".into(),
38 fields: upstream.fields.clone(),
39 stream_key: upstream.stream_key.clone(),
40 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
41 strategy: Some(dispatch_no_shuffle(
42 (0..(upstream.fields.len() as u32)).collect(),
43 )),
44 }))),
45 input: vec![],
46 append_only: upstream.append_only,
47 }
48}
49
50fn build_consistent_hash_shuffle_exchange_for_delta_join(
51 state: &mut BuildFragmentGraphState,
52 upstream: &StreamNode,
53 dist_key_indices: Vec<u32>,
54) -> StreamNode {
55 StreamNode {
56 operator_id: state.gen_operator_id() as u64,
57 identity: "HASH Exchange (Lookup and Merge)".into(),
58 fields: upstream.fields.clone(),
59 stream_key: upstream.stream_key.clone(),
60 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
61 strategy: Some(dispatch_consistent_hash_shuffle(
62 dist_key_indices,
63 (0..(upstream.fields.len() as u32)).collect(),
64 )),
65 }))),
66 input: vec![],
67 append_only: upstream.append_only,
68 }
69}
70
71fn dispatch_no_shuffle(output_indices: Vec<u32>) -> DispatchStrategy {
72 DispatchStrategy {
73 r#type: DispatcherType::NoShuffle.into(),
74 dist_key_indices: vec![],
75 output_mapping: PbDispatchOutputMapping::simple(output_indices).into(),
76 }
77}
78
79fn dispatch_consistent_hash_shuffle(
80 dist_key_indices: Vec<u32>,
81 output_indices: Vec<u32>,
82) -> DispatchStrategy {
83 DispatchStrategy {
85 r#type: DispatcherType::Hash.into(),
86 dist_key_indices,
87 output_mapping: PbDispatchOutputMapping::simple(output_indices).into(),
88 }
89}
90
91fn build_lookup_for_delta_join(
92 state: &mut BuildFragmentGraphState,
93 (exchange_node_arrangement, exchange_node_stream): (&StreamNode, &StreamNode),
94 (output_fields, output_stream_key): (Vec<PbField>, Vec<u32>),
95 lookup_node: LookupNode,
96) -> StreamNode {
97 StreamNode {
98 operator_id: state.gen_operator_id() as u64,
99 identity: "Lookup".into(),
100 fields: output_fields,
101 stream_key: output_stream_key,
102 node_body: Some(NodeBody::Lookup(Box::new(lookup_node))),
103 input: vec![
104 exchange_node_arrangement.clone(),
105 exchange_node_stream.clone(),
106 ],
107 append_only: exchange_node_stream.append_only,
108 }
109}
110
111fn build_delta_join_inner(
112 state: &mut BuildFragmentGraphState,
113 current_fragment: &StreamFragment,
114 arrange_0_frag: Rc<StreamFragment>,
115 arrange_1_frag: Rc<StreamFragment>,
116 node: &StreamNode,
117 is_local_table_id: bool,
118) -> Result<StreamNode> {
119 let delta_join_node = match &node.node_body {
120 Some(NodeBody::DeltaIndexJoin(node)) => node,
121 _ => unreachable!(),
122 };
123 let output_indices = &delta_join_node.output_indices;
124
125 let arrange_0 = arrange_0_frag.node.as_ref().unwrap();
126 let arrange_1 = arrange_1_frag.node.as_ref().unwrap();
127 let exchange_a0l0 = build_no_shuffle_exchange_for_delta_join(state, arrange_0);
128 let exchange_a0l1 = build_consistent_hash_shuffle_exchange_for_delta_join(
129 state,
130 arrange_0,
131 delta_join_node
132 .left_key
133 .iter()
134 .map(|x| *x as u32)
135 .collect_vec(),
136 );
137 let exchange_a1l0 = build_consistent_hash_shuffle_exchange_for_delta_join(
138 state,
139 arrange_1,
140 delta_join_node
141 .right_key
142 .iter()
143 .map(|x| *x as u32)
144 .collect_vec(),
145 );
146 let exchange_a1l1 = build_no_shuffle_exchange_for_delta_join(state, arrange_1);
147
148 let i0_length = arrange_0.fields.len();
149 let i1_length = arrange_1.fields.len();
150
151 let i0_output_indices = (0..i0_length as u32).collect_vec();
152 let i1_output_indices = (0..i1_length as u32).collect_vec();
153
154 let lookup_0_column_reordering = {
155 let tmp: Vec<i32> = (i1_length..i1_length + i0_length)
156 .chain(0..i1_length)
157 .map(|x| x as _)
158 .collect_vec();
159 output_indices
160 .iter()
161 .map(|&x| tmp[x as usize])
162 .collect_vec()
163 };
164 let lookup_0 = build_lookup_for_delta_join(
166 state,
167 (&exchange_a1l0, &exchange_a0l0),
168 (node.fields.clone(), node.stream_key.clone()),
169 LookupNode {
170 stream_key: delta_join_node.right_key.clone(),
171 arrange_key: delta_join_node.left_key.clone(),
172 use_current_epoch: false,
173 arrangement_table_id: if is_local_table_id {
175 Some(ArrangementTableId::TableId(delta_join_node.left_table_id))
176 } else {
177 Some(ArrangementTableId::IndexId(delta_join_node.left_table_id))
178 },
179 column_mapping: lookup_0_column_reordering,
180 arrangement_table_info: delta_join_node.left_info.clone(),
181 },
182 );
183 let lookup_1_column_reordering = {
184 let tmp: Vec<i32> = (0..i0_length + i1_length)
185 .chain(0..i1_length)
186 .map(|x| x as _)
187 .collect_vec();
188 output_indices
189 .iter()
190 .map(|&x| tmp[x as usize])
191 .collect_vec()
192 };
193 let lookup_1 = build_lookup_for_delta_join(
195 state,
196 (&exchange_a0l1, &exchange_a1l1),
197 (node.fields.clone(), node.stream_key.clone()),
198 LookupNode {
199 stream_key: delta_join_node.left_key.clone(),
200 arrange_key: delta_join_node.right_key.clone(),
201 use_current_epoch: true,
202 arrangement_table_id: if is_local_table_id {
204 Some(ArrangementTableId::TableId(delta_join_node.right_table_id))
205 } else {
206 Some(ArrangementTableId::IndexId(delta_join_node.right_table_id))
207 },
208 column_mapping: lookup_1_column_reordering,
209 arrangement_table_info: delta_join_node.right_info.clone(),
210 },
211 );
212
213 let lookup_0_frag = build_and_add_fragment(state, lookup_0)?;
214 let lookup_1_frag = build_and_add_fragment(state, lookup_1)?;
215
216 state.fragment_graph.add_edge(
219 arrange_0_frag.fragment_id,
220 lookup_0_frag.fragment_id,
221 StreamFragmentEdge {
222 dispatch_strategy: dispatch_no_shuffle(i0_output_indices.clone()),
223 link_id: exchange_a0l0.operator_id,
224 },
225 );
226
227 state.fragment_graph.add_edge(
230 arrange_0_frag.fragment_id,
231 lookup_1_frag.fragment_id,
232 StreamFragmentEdge {
233 dispatch_strategy: dispatch_consistent_hash_shuffle(
234 delta_join_node
235 .left_key
236 .iter()
237 .map(|x| *x as u32)
238 .collect_vec(),
239 i0_output_indices,
240 ),
241 link_id: exchange_a0l1.operator_id,
242 },
243 );
244
245 state.fragment_graph.add_edge(
248 arrange_1_frag.fragment_id,
249 lookup_0_frag.fragment_id,
250 StreamFragmentEdge {
251 dispatch_strategy: dispatch_consistent_hash_shuffle(
252 delta_join_node
253 .right_key
254 .iter()
255 .map(|x| *x as u32)
256 .collect_vec(),
257 i1_output_indices.clone(),
258 ),
259 link_id: exchange_a1l0.operator_id,
260 },
261 );
262
263 state.fragment_graph.add_edge(
266 arrange_1_frag.fragment_id,
267 lookup_1_frag.fragment_id,
268 StreamFragmentEdge {
269 dispatch_strategy: dispatch_no_shuffle(i1_output_indices),
270 link_id: exchange_a1l1.operator_id,
271 },
272 );
273
274 let exchange_l0m =
275 build_consistent_hash_shuffle_exchange_for_delta_join(state, node, node.stream_key.clone());
276 let exchange_l1m =
277 build_consistent_hash_shuffle_exchange_for_delta_join(state, node, node.stream_key.clone());
278
279 let union = StreamNode {
282 operator_id: state.gen_operator_id() as u64,
283 identity: "Union".into(),
284 fields: node.fields.clone(),
285 stream_key: node.stream_key.clone(),
286 node_body: Some(NodeBody::LookupUnion(Box::new(LookupUnionNode {
287 order: vec![1, 0],
288 }))),
289 input: vec![exchange_l0m.clone(), exchange_l1m.clone()],
290 append_only: node.append_only,
291 };
292
293 state.fragment_graph.add_edge(
294 lookup_0_frag.fragment_id,
295 current_fragment.fragment_id,
296 StreamFragmentEdge {
297 dispatch_strategy: dispatch_consistent_hash_shuffle(
298 node.stream_key.clone(),
299 (0..node.fields.len() as u32).collect(),
300 ),
301 link_id: exchange_l0m.operator_id,
302 },
303 );
304
305 state.fragment_graph.add_edge(
306 lookup_1_frag.fragment_id,
307 current_fragment.fragment_id,
308 StreamFragmentEdge {
309 dispatch_strategy: dispatch_consistent_hash_shuffle(
310 node.stream_key.clone(),
311 (0..node.fields.len() as u32).collect(),
312 ),
313 link_id: exchange_l1m.operator_id,
314 },
315 );
316
317 Ok(union)
318}
319
320pub(crate) fn build_delta_join_without_arrange(
321 state: &mut BuildFragmentGraphState,
322 current_fragment: &StreamFragment,
323 mut node: StreamNode,
324) -> Result<StreamNode> {
325 match &node.node_body {
326 Some(NodeBody::DeltaIndexJoin(node)) => node,
327 _ => unreachable!(),
328 };
329
330 let [arrange_0, arrange_1]: [_; 2] = std::mem::take(&mut node.input).try_into().unwrap();
331
332 fn pass_through_exchange(mut node: StreamNode) -> StreamNode {
335 if let Some(NodeBody::Exchange(exchange)) = node.node_body {
336 if let DispatcherType::NoShuffle =
337 exchange.strategy.as_ref().unwrap().get_type().unwrap()
338 {
339 return node.input.remove(0);
340 }
341 panic!("exchange other than no_shuffle not allowed between delta join and arrange");
342 } else {
343 node
345 }
346 }
347
348 let arrange_0 = pass_through_exchange(arrange_0);
349 let arrange_1 = pass_through_exchange(arrange_1);
350
351 let arrange_0_frag = build_and_add_fragment(state, arrange_0)?;
352 let arrange_1_frag = build_and_add_fragment(state, arrange_1)?;
353
354 let union = build_delta_join_inner(
355 state,
356 current_fragment,
357 arrange_0_frag,
358 arrange_1_frag,
359 &node,
360 false,
361 )?;
362
363 if state.has_snapshot_backfill {
364 return Err(NotSupported(
365 "Delta join with snapshot backfill is not supported".to_owned(),
366 "Please use a different join strategy or disable snapshot backfill by `SET streaming_use_snapshot_backfill = false`.".to_owned(),
367 ).into());
368 }
369
370 Ok(union)
371}