risingwave_frontend/stream_fragmenter/rewrite/
delta_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use itertools::Itertools;
18use risingwave_pb::plan_common::PbField;
19use risingwave_pb::stream_plan::lookup_node::ArrangementTableId;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21use risingwave_pb::stream_plan::{
22    DispatchStrategy, DispatcherType, ExchangeNode, LookupNode, LookupUnionNode,
23    PbDispatchOutputMapping, StreamNode,
24};
25
26use super::super::{BuildFragmentGraphState, StreamFragment, StreamFragmentEdge};
27use crate::error::ErrorCode::NotSupported;
28use crate::error::Result;
29use crate::stream_fragmenter::build_and_add_fragment;
30
31fn build_no_shuffle_exchange_for_delta_join(
32    state: &mut BuildFragmentGraphState,
33    upstream: &StreamNode,
34) -> StreamNode {
35    StreamNode {
36        operator_id: state.gen_operator_id() as u64,
37        identity: "NO SHUFFLE Exchange (Lookup and Merge)".into(),
38        fields: upstream.fields.clone(),
39        stream_key: upstream.stream_key.clone(),
40        node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
41            strategy: Some(dispatch_no_shuffle(
42                (0..(upstream.fields.len() as u32)).collect(),
43            )),
44        }))),
45        input: vec![],
46        append_only: upstream.append_only,
47    }
48}
49
50fn build_consistent_hash_shuffle_exchange_for_delta_join(
51    state: &mut BuildFragmentGraphState,
52    upstream: &StreamNode,
53    dist_key_indices: Vec<u32>,
54) -> StreamNode {
55    StreamNode {
56        operator_id: state.gen_operator_id() as u64,
57        identity: "HASH Exchange (Lookup and Merge)".into(),
58        fields: upstream.fields.clone(),
59        stream_key: upstream.stream_key.clone(),
60        node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
61            strategy: Some(dispatch_consistent_hash_shuffle(
62                dist_key_indices,
63                (0..(upstream.fields.len() as u32)).collect(),
64            )),
65        }))),
66        input: vec![],
67        append_only: upstream.append_only,
68    }
69}
70
71fn dispatch_no_shuffle(output_indices: Vec<u32>) -> DispatchStrategy {
72    DispatchStrategy {
73        r#type: DispatcherType::NoShuffle.into(),
74        dist_key_indices: vec![],
75        output_mapping: PbDispatchOutputMapping::simple(output_indices).into(),
76    }
77}
78
79fn dispatch_consistent_hash_shuffle(
80    dist_key_indices: Vec<u32>,
81    output_indices: Vec<u32>,
82) -> DispatchStrategy {
83    // Actually Hash shuffle is consistent hash shuffle now.
84    DispatchStrategy {
85        r#type: DispatcherType::Hash.into(),
86        dist_key_indices,
87        output_mapping: PbDispatchOutputMapping::simple(output_indices).into(),
88    }
89}
90
91fn build_lookup_for_delta_join(
92    state: &mut BuildFragmentGraphState,
93    (exchange_node_arrangement, exchange_node_stream): (&StreamNode, &StreamNode),
94    (output_fields, output_stream_key): (Vec<PbField>, Vec<u32>),
95    lookup_node: LookupNode,
96) -> StreamNode {
97    StreamNode {
98        operator_id: state.gen_operator_id() as u64,
99        identity: "Lookup".into(),
100        fields: output_fields,
101        stream_key: output_stream_key,
102        node_body: Some(NodeBody::Lookup(Box::new(lookup_node))),
103        input: vec![
104            exchange_node_arrangement.clone(),
105            exchange_node_stream.clone(),
106        ],
107        append_only: exchange_node_stream.append_only,
108    }
109}
110
111fn build_delta_join_inner(
112    state: &mut BuildFragmentGraphState,
113    current_fragment: &StreamFragment,
114    arrange_0_frag: Rc<StreamFragment>,
115    arrange_1_frag: Rc<StreamFragment>,
116    node: &StreamNode,
117    is_local_table_id: bool,
118) -> Result<StreamNode> {
119    let delta_join_node = match &node.node_body {
120        Some(NodeBody::DeltaIndexJoin(node)) => node,
121        _ => unreachable!(),
122    };
123    let output_indices = &delta_join_node.output_indices;
124
125    let arrange_0 = arrange_0_frag.node.as_ref().unwrap();
126    let arrange_1 = arrange_1_frag.node.as_ref().unwrap();
127    let exchange_a0l0 = build_no_shuffle_exchange_for_delta_join(state, arrange_0);
128    let exchange_a0l1 = build_consistent_hash_shuffle_exchange_for_delta_join(
129        state,
130        arrange_0,
131        delta_join_node
132            .left_key
133            .iter()
134            .map(|x| *x as u32)
135            .collect_vec(),
136    );
137    let exchange_a1l0 = build_consistent_hash_shuffle_exchange_for_delta_join(
138        state,
139        arrange_1,
140        delta_join_node
141            .right_key
142            .iter()
143            .map(|x| *x as u32)
144            .collect_vec(),
145    );
146    let exchange_a1l1 = build_no_shuffle_exchange_for_delta_join(state, arrange_1);
147
148    let i0_length = arrange_0.fields.len();
149    let i1_length = arrange_1.fields.len();
150
151    let i0_output_indices = (0..i0_length as u32).collect_vec();
152    let i1_output_indices = (0..i1_length as u32).collect_vec();
153
154    let lookup_0_column_reordering = {
155        let tmp: Vec<i32> = (i1_length..i1_length + i0_length)
156            .chain(0..i1_length)
157            .map(|x| x as _)
158            .collect_vec();
159        output_indices
160            .iter()
161            .map(|&x| tmp[x as usize])
162            .collect_vec()
163    };
164    // lookup left table by right stream
165    let lookup_0 = build_lookup_for_delta_join(
166        state,
167        (&exchange_a1l0, &exchange_a0l0),
168        (node.fields.clone(), node.stream_key.clone()),
169        LookupNode {
170            stream_key: delta_join_node.right_key.clone(),
171            arrange_key: delta_join_node.left_key.clone(),
172            use_current_epoch: false,
173            // will be updated later to a global id
174            arrangement_table_id: if is_local_table_id {
175                Some(ArrangementTableId::TableId(delta_join_node.left_table_id))
176            } else {
177                Some(ArrangementTableId::IndexId(delta_join_node.left_table_id))
178            },
179            column_mapping: lookup_0_column_reordering,
180            arrangement_table_info: delta_join_node.left_info.clone(),
181        },
182    );
183    let lookup_1_column_reordering = {
184        let tmp: Vec<i32> = (0..i0_length + i1_length)
185            .chain(0..i1_length)
186            .map(|x| x as _)
187            .collect_vec();
188        output_indices
189            .iter()
190            .map(|&x| tmp[x as usize])
191            .collect_vec()
192    };
193    // lookup right table by left stream
194    let lookup_1 = build_lookup_for_delta_join(
195        state,
196        (&exchange_a0l1, &exchange_a1l1),
197        (node.fields.clone(), node.stream_key.clone()),
198        LookupNode {
199            stream_key: delta_join_node.left_key.clone(),
200            arrange_key: delta_join_node.right_key.clone(),
201            use_current_epoch: true,
202            // will be updated later to a global id
203            arrangement_table_id: if is_local_table_id {
204                Some(ArrangementTableId::TableId(delta_join_node.right_table_id))
205            } else {
206                Some(ArrangementTableId::IndexId(delta_join_node.right_table_id))
207            },
208            column_mapping: lookup_1_column_reordering,
209            arrangement_table_info: delta_join_node.right_info.clone(),
210        },
211    );
212
213    let lookup_0_frag = build_and_add_fragment(state, lookup_0)?;
214    let lookup_1_frag = build_and_add_fragment(state, lookup_1)?;
215
216    // Place index(arrange) together with corresponding lookup operator, so that we can lookup on
217    // the same node.
218    state.fragment_graph.add_edge(
219        arrange_0_frag.fragment_id,
220        lookup_0_frag.fragment_id,
221        StreamFragmentEdge {
222            dispatch_strategy: dispatch_no_shuffle(i0_output_indices.clone()),
223            link_id: exchange_a0l0.operator_id,
224        },
225    );
226
227    // Use consistent hash shuffle to distribute the index(arrange) to another lookup operator, so
228    // that we can find the correct node to lookup.
229    state.fragment_graph.add_edge(
230        arrange_0_frag.fragment_id,
231        lookup_1_frag.fragment_id,
232        StreamFragmentEdge {
233            dispatch_strategy: dispatch_consistent_hash_shuffle(
234                delta_join_node
235                    .left_key
236                    .iter()
237                    .map(|x| *x as u32)
238                    .collect_vec(),
239                i0_output_indices,
240            ),
241            link_id: exchange_a0l1.operator_id,
242        },
243    );
244
245    // Use consistent hash shuffle to distribute the index(arrange) to another lookup operator, so
246    // that we can find the correct node to lookup.
247    state.fragment_graph.add_edge(
248        arrange_1_frag.fragment_id,
249        lookup_0_frag.fragment_id,
250        StreamFragmentEdge {
251            dispatch_strategy: dispatch_consistent_hash_shuffle(
252                delta_join_node
253                    .right_key
254                    .iter()
255                    .map(|x| *x as u32)
256                    .collect_vec(),
257                i1_output_indices.clone(),
258            ),
259            link_id: exchange_a1l0.operator_id,
260        },
261    );
262
263    // Place index(arrange) together with corresponding lookup operator, so that we can lookup on
264    // the same node.
265    state.fragment_graph.add_edge(
266        arrange_1_frag.fragment_id,
267        lookup_1_frag.fragment_id,
268        StreamFragmentEdge {
269            dispatch_strategy: dispatch_no_shuffle(i1_output_indices),
270            link_id: exchange_a1l1.operator_id,
271        },
272    );
273
274    let exchange_l0m =
275        build_consistent_hash_shuffle_exchange_for_delta_join(state, node, node.stream_key.clone());
276    let exchange_l1m =
277        build_consistent_hash_shuffle_exchange_for_delta_join(state, node, node.stream_key.clone());
278
279    // LookupUnion's inputs might have different distribution and we need to unify them by using
280    // hash shuffle.
281    let union = StreamNode {
282        operator_id: state.gen_operator_id() as u64,
283        identity: "Union".into(),
284        fields: node.fields.clone(),
285        stream_key: node.stream_key.clone(),
286        node_body: Some(NodeBody::LookupUnion(Box::new(LookupUnionNode {
287            order: vec![1, 0],
288        }))),
289        input: vec![exchange_l0m.clone(), exchange_l1m.clone()],
290        append_only: node.append_only,
291    };
292
293    state.fragment_graph.add_edge(
294        lookup_0_frag.fragment_id,
295        current_fragment.fragment_id,
296        StreamFragmentEdge {
297            dispatch_strategy: dispatch_consistent_hash_shuffle(
298                node.stream_key.clone(),
299                (0..node.fields.len() as u32).collect(),
300            ),
301            link_id: exchange_l0m.operator_id,
302        },
303    );
304
305    state.fragment_graph.add_edge(
306        lookup_1_frag.fragment_id,
307        current_fragment.fragment_id,
308        StreamFragmentEdge {
309            dispatch_strategy: dispatch_consistent_hash_shuffle(
310                node.stream_key.clone(),
311                (0..node.fields.len() as u32).collect(),
312            ),
313            link_id: exchange_l1m.operator_id,
314        },
315    );
316
317    Ok(union)
318}
319
320pub(crate) fn build_delta_join_without_arrange(
321    state: &mut BuildFragmentGraphState,
322    current_fragment: &StreamFragment,
323    mut node: StreamNode,
324) -> Result<StreamNode> {
325    match &node.node_body {
326        Some(NodeBody::DeltaIndexJoin(node)) => node,
327        _ => unreachable!(),
328    };
329
330    let [arrange_0, arrange_1]: [_; 2] = std::mem::take(&mut node.input).try_into().unwrap();
331
332    // TODO: when distribution key is added to catalog, chain and delta join won't have any
333    // exchange in-between. Then we can safely remove this function.
334    fn pass_through_exchange(mut node: StreamNode) -> StreamNode {
335        if let Some(NodeBody::Exchange(exchange)) = node.node_body {
336            if let DispatcherType::NoShuffle =
337                exchange.strategy.as_ref().unwrap().get_type().unwrap()
338            {
339                return node.input.remove(0);
340            }
341            panic!("exchange other than no_shuffle not allowed between delta join and arrange");
342        } else {
343            // pass
344            node
345        }
346    }
347
348    let arrange_0 = pass_through_exchange(arrange_0);
349    let arrange_1 = pass_through_exchange(arrange_1);
350
351    let arrange_0_frag = build_and_add_fragment(state, arrange_0)?;
352    let arrange_1_frag = build_and_add_fragment(state, arrange_1)?;
353
354    let union = build_delta_join_inner(
355        state,
356        current_fragment,
357        arrange_0_frag,
358        arrange_1_frag,
359        &node,
360        false,
361    )?;
362
363    if state.has_snapshot_backfill {
364        return Err(NotSupported(
365            "Delta join with snapshot backfill is not supported".to_owned(),
366            "Please use a different join strategy or disable snapshot backfill by `SET streaming_use_snapshot_backfill = false`.".to_owned(),
367        ).into());
368    }
369
370    Ok(union)
371}