risingwave_frontend/optimizer/plan_node/
stream_hash_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail;
18use risingwave_common::catalog::Field;
19use risingwave_common::types::DataType;
20use risingwave_common::util::functional::SameOrElseExt;
21use risingwave_common::util::sort_util::OrderType;
22use risingwave_pb::plan_common::JoinType;
23use risingwave_pb::stream_plan::stream_node::NodeBody;
24use risingwave_pb::stream_plan::{
25    HashJoinNode, HashJoinWatermarkHandleDesc, InequalityPairV2 as PbInequalityPairV2,
26    InequalityType as PbInequalityType, JoinKeyWatermarkIndex, PbJoinEncodingType,
27};
28
29use super::generic::GenericPlanNode;
30use super::stream::prelude::*;
31use super::stream_join_common::StreamJoinCommon;
32use super::utils::{
33    Distill, TableCatalogBuilder, childless_record, plan_node_name, watermark_pretty,
34};
35use super::{
36    ExprRewritable, PlanBase, PlanTreeNodeBinary, StreamDeltaJoin, StreamPlanRef as PlanRef,
37    TryToStreamPb, generic,
38};
39use crate::TableCatalog;
40use crate::expr::{Expr, ExprDisplay, ExprRewriter, ExprType, ExprVisitor, InequalityInputPair};
41use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
42use crate::optimizer::plan_node::utils::IndicesDisplay;
43use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
44use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
45use crate::scheduler::SchedulerResult;
46use crate::stream_fragmenter::BuildFragmentGraphState;
47
48/// [`StreamHashJoin`] implements [`super::LogicalJoin`] with hash table. It builds a hash table
49/// from inner (right-side) relation and probes with data from outer (left-side) relation to
50/// get output rows.
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct StreamHashJoin {
53    pub base: PlanBase<Stream>,
54    core: generic::Join<PlanRef>,
55
56    /// Join-key positions and cleaning flags for watermark handling.
57    watermark_indices_in_jk: Vec<(usize, bool)>,
58
59    /// `(conjunction_idx, clean_left_state, clean_right_state, InequalityInputPair)`.
60    /// Each entry represents an inequality condition like `left_col <op> right_col`.
61    /// The `conjunction_idx` is the index in `eq_join_predicate.other_cond().conjunctions`.
62    inequality_pairs: Vec<(usize, bool, bool, InequalityInputPair)>,
63
64    /// Whether can optimize for append-only stream.
65    /// It is true if input of both side is append-only
66    is_append_only: bool,
67}
68
69/// Result of watermark derivation for hash join.
70struct WatermarkDeriveResult {
71    /// Output watermark columns.
72    watermark_columns: WatermarkColumns,
73    /// Join-key positions and cleaning flags for watermark handling.
74    watermark_indices_in_jk: Vec<(usize, bool)>,
75    /// `(conjunction_idx, clean_left_state, clean_right_state, InequalityInputPair)`.
76    inequality_pairs: Vec<(usize, bool, bool, InequalityInputPair)>,
77}
78
79/// Derives watermark columns and state cleaning info for hash join.
80///
81/// This function analyzes equal and inequality conditions to determine:
82/// 1. Which join keys have watermarks on both sides (enabling state cleanup via equal conditions)
83/// 2. Which inequality conditions can be used for state cleanup
84/// 3. Which columns can emit watermarks downstream
85///
86/// **Important**: Only ONE column per table is allowed to do state cleaning.
87/// Priority: join key (leftmost in PK) > inequality pairs.
88fn derive_watermark_for_hash_join(
89    core: &generic::Join<PlanRef>,
90    eq_join_predicate: &EqJoinPredicate,
91) -> WatermarkDeriveResult {
92    let ctx = core.ctx();
93    let l2i = core.l2i_col_mapping();
94    let r2i = core.r2i_col_mapping();
95
96    let mut watermark_indices_in_jk = vec![];
97    let mut inequality_pairs = vec![];
98
99    // Track if we've already found a column for state cleaning.
100    // Join key state cleaning cleans both sides, so we track it separately.
101    let mut found_jk_state_clean = false;
102    let mut found_ineq_clean_left = false;
103    let mut found_ineq_clean_right = false;
104
105    // Process equal conditions: check if both sides have watermarks on join keys.
106    // Only the FIRST (leftmost) join key with watermarks on both sides can do state cleaning.
107    // This prioritizes the leftmost join key in PK order (since we reorder join keys
108    // to put watermark columns first).
109    let mut watermark_columns = WatermarkColumns::new();
110    for (idx, (left_key, right_key)) in eq_join_predicate.eq_indexes().iter().enumerate() {
111        if let Some(l_wtmk_group) = core.left.watermark_columns().get_group(*left_key)
112            && let Some(r_wtmk_group) = core.right.watermark_columns().get_group(*right_key)
113        {
114            // Only the first join key with watermarks on both sides can do state cleaning
115            let do_state_cleaning = !found_jk_state_clean;
116            if do_state_cleaning {
117                found_jk_state_clean = true;
118            }
119            watermark_indices_in_jk.push((idx, do_state_cleaning));
120
121            if let Some(internal) = l2i.try_map(*left_key) {
122                watermark_columns.insert(
123                    internal,
124                    l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
125                );
126            }
127            if let Some(internal) = r2i.try_map(*right_key) {
128                watermark_columns.insert(
129                    internal,
130                    l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
131                );
132            }
133        }
134    }
135
136    // Process inequality pairs using the V2 format.
137    // Inequality pairs can only clean state if no join key is doing state cleaning.
138    let original_inequality_pairs = eq_join_predicate.inequality_pairs_v2();
139    for (conjunction_idx, pair) in original_inequality_pairs {
140        let InequalityInputPair {
141            left_idx,
142            right_idx,
143            op,
144        } = pair;
145
146        // Check if both upstream sides have watermarks on the inequality columns
147        let both_upstream_has_watermark = core.left.watermark_columns().contains(left_idx)
148            && core.right.watermark_columns().contains(right_idx);
149        if !both_upstream_has_watermark {
150            continue;
151        }
152
153        // Determine which side's state can be cleaned based on the operator.
154        // State cleanup applies to the side with LARGER values.
155        // For `left < right` or `left <= right`: RIGHT is larger → clean RIGHT state
156        // For `left > right` or `left >= right`: LEFT is larger → clean LEFT state
157        let left_is_larger = matches!(op, ExprType::GreaterThan | ExprType::GreaterThanOrEqual);
158
159        // Inequality pairs can only clean state if:
160        // 1. No join key is doing state cleaning (join key has priority)
161        // 2. No other inequality pair has claimed this side yet
162        let (clean_left, clean_right) = if left_is_larger {
163            let do_clean = !found_jk_state_clean && !found_ineq_clean_left;
164            if do_clean {
165                found_ineq_clean_left = true;
166            }
167            (do_clean, false)
168        } else {
169            let do_clean = !found_jk_state_clean && !found_ineq_clean_right;
170            if do_clean {
171                found_ineq_clean_right = true;
172            }
173            (false, do_clean)
174        };
175
176        let mut is_valuable_inequality = clean_left || clean_right;
177
178        // Add watermark columns for the inequality.
179        // We can only yield watermark from the LARGER side downstream.
180        // For `left >= right`: left is larger, yield left watermark
181        // For `left <= right`: right is larger, yield right watermark
182        if left_is_larger {
183            if let Some(internal) = l2i.try_map(left_idx)
184                && !watermark_columns.contains(internal)
185            {
186                watermark_columns.insert(internal, ctx.next_watermark_group_id());
187                is_valuable_inequality = true;
188            }
189        } else if let Some(internal) = r2i.try_map(right_idx)
190            && !watermark_columns.contains(internal)
191        {
192            watermark_columns.insert(internal, ctx.next_watermark_group_id());
193            is_valuable_inequality = true;
194        }
195
196        if is_valuable_inequality {
197            inequality_pairs.push((
198                conjunction_idx,
199                clean_left,
200                clean_right,
201                InequalityInputPair::new(left_idx, right_idx, op),
202            ));
203        }
204    }
205
206    let watermark_columns = watermark_columns.map_clone(&core.i2o_col_mapping());
207
208    WatermarkDeriveResult {
209        watermark_columns,
210        watermark_indices_in_jk,
211        inequality_pairs,
212    }
213}
214
215impl StreamHashJoin {
216    pub fn new(mut core: generic::Join<PlanRef>) -> Result<Self> {
217        let stream_kind = core.stream_kind()?;
218
219        // Reorder `eq_join_predicate` by placing the watermark column at the beginning.
220        let eq_join_predicate = {
221            let eq_join_predicate = core
222                .on
223                .as_eq_predicate_ref()
224                .expect("StreamHashJoin requires JoinOn::EqPredicate in core")
225                .clone();
226            let mut reorder_idx = vec![];
227            for (i, (left_key, right_key)) in eq_join_predicate.eq_indexes().iter().enumerate() {
228                if core.left.watermark_columns().contains(*left_key)
229                    && core.right.watermark_columns().contains(*right_key)
230                {
231                    reorder_idx.push(i);
232                }
233            }
234            eq_join_predicate.reorder(&reorder_idx)
235        };
236        core.on = generic::JoinOn::EqPredicate(eq_join_predicate.clone());
237
238        let dist = StreamJoinCommon::derive_dist(
239            core.left.distribution(),
240            core.right.distribution(),
241            &core,
242        );
243
244        // Derive watermark columns and state cleaning info
245        let WatermarkDeriveResult {
246            watermark_columns,
247            watermark_indices_in_jk,
248            inequality_pairs,
249        } = derive_watermark_for_hash_join(&core, &eq_join_predicate);
250
251        // TODO: derive from input
252        let base = PlanBase::new_stream_with_core(
253            &core,
254            dist,
255            stream_kind,
256            false, // TODO(rc): derive EOWC property from input
257            watermark_columns,
258            MonotonicityMap::new(), // TODO: derive monotonicity
259        );
260
261        Ok(Self {
262            base,
263            core,
264            watermark_indices_in_jk,
265            inequality_pairs,
266            is_append_only: stream_kind.is_append_only(),
267        })
268    }
269
270    /// Get join type
271    pub fn join_type(&self) -> JoinType {
272        self.core.join_type
273    }
274
275    /// Get a reference to the hash join's eq join predicate.
276    pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
277        self.core
278            .on
279            .as_eq_predicate_ref()
280            .expect("StreamHashJoin should store predicate as EqJoinPredicate")
281    }
282
283    /// Convert this hash join to a delta join plan
284    pub fn into_delta_join(self) -> StreamDeltaJoin {
285        StreamDeltaJoin::new(self.core).unwrap()
286    }
287
288    pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
289        let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
290        let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
291
292        StreamJoinCommon::get_dist_key_in_join_key(
293            &left_dk_indices,
294            &right_dk_indices,
295            self.eq_join_predicate(),
296        )
297    }
298
299    pub fn inequality_pairs(&self) -> &Vec<(usize, bool, bool, InequalityInputPair)> {
300        &self.inequality_pairs
301    }
302
303    /// Returns the conjunction index of the first inequality that cleans left state, if any.
304    fn clean_left_state_conjunction_idx(&self) -> Option<usize> {
305        self.inequality_pairs
306            .iter()
307            .find(|(_, clean_left, _, _)| *clean_left)
308            .map(|(idx, _, _, _)| *idx)
309    }
310
311    /// Returns the conjunction index of the first inequality that cleans right state, if any.
312    fn clean_right_state_conjunction_idx(&self) -> Option<usize> {
313        self.inequality_pairs
314            .iter()
315            .find(|(_, _, clean_right, _)| *clean_right)
316            .map(|(idx, _, _, _)| *idx)
317    }
318
319    /// Infer internal table catalog and degree table catalog for hash join.
320    ///
321    /// This method also infers `clean_watermark_indices` based on:
322    /// 1. Equal conditions: if a join key has watermarks on both sides, the corresponding
323    ///    column can be used for state cleaning.
324    /// 2. Inequality conditions: if `clean_left_state` or `clean_right_state` is true,
325    ///    the corresponding column can be used for state cleaning.
326    ///
327    /// # Arguments
328    /// * `input` - The input plan (left or right side of the join)
329    /// * `join_key_indices` - The indices of join keys in the input schema
330    /// * `dk_indices_in_jk` - The indices of distribution keys in join keys
331    /// * `is_left` - Whether this is for the left side of the join
332    ///
333    /// # Returns
334    /// A tuple of (`internal_table`, `degree_table`, `deduped_input_pk_indices`)
335    fn infer_internal_and_degree_table_catalog(
336        &self,
337        input: PlanRef,
338        join_key_indices: Vec<usize>,
339        dk_indices_in_jk: Vec<usize>,
340        is_left: bool,
341    ) -> Result<(TableCatalog, TableCatalog, Vec<usize>)> {
342        let schema = input.schema();
343
344        let internal_table_dist_keys = dk_indices_in_jk
345            .iter()
346            .map(|idx| join_key_indices[*idx])
347            .collect_vec();
348
349        let degree_table_dist_keys = dk_indices_in_jk.clone();
350
351        // The pk of hash join internal and degree table should be join_key + input_pk.
352        let join_key_len = join_key_indices.len();
353        let mut pk_indices = join_key_indices.clone();
354
355        // dedup the pk in dist key..
356        let mut deduped_input_pk_indices = vec![];
357        for input_pk_idx in input.stream_key().unwrap() {
358            if !pk_indices.contains(input_pk_idx)
359                && !deduped_input_pk_indices.contains(input_pk_idx)
360            {
361                deduped_input_pk_indices.push(*input_pk_idx);
362            }
363        }
364
365        pk_indices.extend(deduped_input_pk_indices.clone());
366
367        // Infer clean_watermark_indices for internal tablestate cleaning
368        let (
369            clean_watermark_indices,
370            eq_join_key_clean_watermark_indices,
371            inequal_clean_watermark_indices,
372        ) = self.infer_clean_watermark_indices(&join_key_indices, is_left)?;
373
374        // Build internal table
375        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
376        let internal_columns_fields = schema.fields().to_vec();
377
378        internal_columns_fields.iter().for_each(|field| {
379            internal_table_catalog_builder.add_column(field);
380        });
381        pk_indices.iter().for_each(|idx| {
382            internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
383        });
384
385        internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
386        internal_table_catalog_builder.set_clean_watermark_indices(clean_watermark_indices);
387
388        // Build degree table.
389        let mut degree_table_catalog_builder = TableCatalogBuilder::default();
390
391        let degree_column_field = Field::with_name(DataType::Int64, "_degree");
392
393        pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
394            degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
395            degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending());
396        });
397
398        // Add degree column
399        degree_table_catalog_builder.add_column(&degree_column_field);
400        let degree_col_idx = degree_table_catalog_builder.columns().len() - 1;
401
402        // Add inequality column after _degree if this side has inequality-based cleaning
403        let degree_inequality_col_idx = inequal_clean_watermark_indices
404            .iter()
405            .at_most_one()
406            .unwrap()
407            .map(|idx| {
408                degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
409                degree_table_catalog_builder.columns().len() - 1
410            });
411
412        // Set value indices: always include _degree, optionally include inequality column
413        let mut value_indices = vec![degree_col_idx];
414        if let Some(idx) = degree_inequality_col_idx {
415            value_indices.push(idx);
416        }
417        degree_table_catalog_builder.set_value_indices(value_indices);
418
419        degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);
420
421        // Set clean watermark indices: use inequality column if present, otherwise use join key.
422        let degree_clean_watermark_indices = if let Some(idx) = degree_inequality_col_idx {
423            vec![idx]
424        } else {
425            // Note: eq_join_key_clean_watermark_indices are input-schema indices, but degree table
426            // columns are remapped via pk_indices (join_key first, then deduped pk).
427            eq_join_key_clean_watermark_indices
428                .iter()
429                .map(|input_col_idx| {
430                    join_key_indices
431                        .iter()
432                        .position(|jk_idx| jk_idx == input_col_idx)
433                        .expect("eq join key clean watermark index must exist in join_key_indices")
434                })
435                .collect()
436        };
437        degree_table_catalog_builder.set_clean_watermark_indices(degree_clean_watermark_indices);
438
439        Ok((
440            internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
441            degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len),
442            deduped_input_pk_indices,
443        ))
444    }
445
446    /// Infer which columns can be used for state cleaning based on watermark information.
447    ///
448    /// For the left/right internal table:
449    /// 1. From equal conditions: if `watermark_indices_in_jk[i].1` is true (`do_state_cleaning`),
450    ///    then the join key at position i can be used for cleaning both sides.
451    /// 2. From inequality conditions:
452    ///    - If `clean_left_state` is true, the left column (`left_idx`) can be used for cleaning left state.
453    ///    - If `clean_right_state` is true, the right column (`right_idx`) can be used for cleaning right state.
454    fn infer_clean_watermark_indices(
455        &self,
456        join_key_indices: &[usize],
457        is_left: bool,
458    ) -> Result<(Vec<usize>, Vec<usize>, Vec<usize>)> {
459        let mut clean_watermark_indices = vec![];
460
461        // From equal conditions: if do_state_cleaning is true, the join key column can clean state
462        let eq_join_key_clean_watermark_indices =
463            self.infer_eq_join_key_clean_watermark_indices(join_key_indices);
464        clean_watermark_indices.extend(eq_join_key_clean_watermark_indices.clone());
465        let mut inequal_clean_watermark_indices = vec![];
466        // From inequality conditions: check clean_left_state or clean_right_state
467        for (_conjunction_idx, clean_left, clean_right, pair) in &self.inequality_pairs {
468            if is_left && *clean_left {
469                let col_idx = pair.left_idx;
470                if !clean_watermark_indices.contains(&col_idx) {
471                    inequal_clean_watermark_indices.push(col_idx);
472                }
473            } else if !is_left && *clean_right {
474                let col_idx = pair.right_idx;
475                if !clean_watermark_indices.contains(&col_idx) {
476                    inequal_clean_watermark_indices.push(col_idx);
477                }
478            }
479        }
480
481        clean_watermark_indices.extend(inequal_clean_watermark_indices.clone());
482
483        // Verify: only 1 column per table is allowed to do state cleaning.
484        // This invariant is enforced by `derive_watermark_for_hash_join`.
485        if clean_watermark_indices.len() > 1 {
486            bail!(
487                "Expected at most 1 clean_watermark_index per table, got {:?}",
488                clean_watermark_indices
489            )
490        }
491
492        Ok((
493            clean_watermark_indices,
494            eq_join_key_clean_watermark_indices,
495            inequal_clean_watermark_indices,
496        ))
497    }
498
499    /// Infer which join keys can be used for state cleaning based on equal conditions.
500    fn infer_eq_join_key_clean_watermark_indices(&self, join_key_indices: &[usize]) -> Vec<usize> {
501        let mut clean_indices = vec![];
502        for (idx_in_jk, do_state_cleaning) in &self.watermark_indices_in_jk {
503            if *do_state_cleaning {
504                let col_idx = join_key_indices[*idx_in_jk];
505                if !clean_indices.contains(&col_idx) {
506                    clean_indices.push(col_idx);
507                }
508            }
509        }
510        clean_indices
511    }
512}
513
514impl Distill for StreamHashJoin {
515    fn distill<'a>(&self) -> XmlNode<'a> {
516        let (ljk, rjk) = self
517            .eq_join_predicate()
518            .eq_indexes()
519            .first()
520            .cloned()
521            .expect("first join key");
522
523        let clean_left_state_conjunction_idx = self.clean_left_state_conjunction_idx();
524        let clean_right_state_conjunction_idx = self.clean_right_state_conjunction_idx();
525        let clean_state_in_jk_indices: Vec<usize> = self
526            .watermark_indices_in_jk
527            .iter()
528            .filter_map(
529                |(idx, do_state_cleaning)| if *do_state_cleaning { Some(*idx) } else { None },
530            )
531            .collect();
532
533        let name = plan_node_name!("StreamHashJoin",
534            { "window", self.left().watermark_columns().contains(ljk) && self.right().watermark_columns().contains(rjk) },
535            { "interval", clean_left_state_conjunction_idx.is_some() && clean_right_state_conjunction_idx.is_some() },
536            { "append_only", self.is_append_only },
537        );
538        let verbose = self.base.ctx().is_explain_verbose();
539        let mut vec = Vec::with_capacity(6);
540        vec.push(("type", Pretty::debug(&self.core.join_type)));
541
542        let concat_schema = self.core.concat_schema();
543        vec.push((
544            "predicate",
545            Pretty::debug(&EqJoinPredicateDisplay {
546                eq_join_predicate: self.eq_join_predicate(),
547                input_schema: &concat_schema,
548            }),
549        ));
550
551        let get_other_cond = |conjunction_idx| {
552            Pretty::debug(&ExprDisplay {
553                expr: &self.eq_join_predicate().other_cond().conjunctions[conjunction_idx],
554                input_schema: &concat_schema,
555            })
556        };
557        let get_eq_cond = |conjunction_idx| {
558            Pretty::debug(&ExprDisplay {
559                expr: &self.eq_join_predicate().eq_cond().conjunctions[conjunction_idx],
560                input_schema: &concat_schema,
561            })
562        };
563        if !clean_state_in_jk_indices.is_empty() {
564            vec.push((
565                "conditions_to_clean_state_in_join_key",
566                Pretty::Array(
567                    clean_state_in_jk_indices
568                        .iter()
569                        .map(|idx| get_eq_cond(*idx))
570                        .collect::<Vec<_>>(),
571                ),
572            ));
573        }
574        if let Some(i) = clean_left_state_conjunction_idx {
575            vec.push(("conditions_to_clean_left_state_table", get_other_cond(i)));
576        }
577        if let Some(i) = clean_right_state_conjunction_idx {
578            vec.push(("conditions_to_clean_right_state_table", get_other_cond(i)));
579        }
580        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
581            vec.push(("output_watermarks", ow));
582        }
583
584        if verbose {
585            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
586            vec.push(("output", data));
587        }
588
589        childless_record(name, vec)
590    }
591}
592
593impl PlanTreeNodeBinary<Stream> for StreamHashJoin {
594    fn left(&self) -> PlanRef {
595        self.core.left.clone()
596    }
597
598    fn right(&self) -> PlanRef {
599        self.core.right.clone()
600    }
601
602    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
603        let mut core = self.core.clone();
604        core.left = left;
605        core.right = right;
606        Self::new(core).unwrap()
607    }
608}
609
610impl_plan_tree_node_for_binary! { Stream, StreamHashJoin }
611
612impl TryToStreamPb for StreamHashJoin {
613    fn try_to_stream_prost_body(
614        &self,
615        state: &mut BuildFragmentGraphState,
616    ) -> SchedulerResult<NodeBody> {
617        let left_jk_indices = self.eq_join_predicate().left_eq_indexes();
618        let right_jk_indices = self.eq_join_predicate().right_eq_indexes();
619        let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
620        let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
621
622        let retract =
623            self.left().stream_kind().is_retract() || self.right().stream_kind().is_retract();
624
625        let dk_indices_in_jk = self.derive_dist_key_in_join_key();
626
627        let (left_table, left_degree_table, left_deduped_input_pk_indices) = self
628            .infer_internal_and_degree_table_catalog(
629                self.left(),
630                left_jk_indices,
631                dk_indices_in_jk.clone(),
632                true, // is_left
633            )?;
634        let (right_table, right_degree_table, right_deduped_input_pk_indices) = self
635            .infer_internal_and_degree_table_catalog(
636                self.right(),
637                right_jk_indices,
638                dk_indices_in_jk,
639                false, // is_left
640            )?;
641
642        let left_deduped_input_pk_indices = left_deduped_input_pk_indices
643            .iter()
644            .map(|idx| *idx as u32)
645            .collect_vec();
646
647        let right_deduped_input_pk_indices = right_deduped_input_pk_indices
648            .iter()
649            .map(|idx| *idx as u32)
650            .collect_vec();
651
652        let (left_table, left_degree_table) = (
653            left_table.with_id(state.gen_table_id_wrapped()),
654            left_degree_table.with_id(state.gen_table_id_wrapped()),
655        );
656        let (right_table, right_degree_table) = (
657            right_table.with_id(state.gen_table_id_wrapped()),
658            right_degree_table.with_id(state.gen_table_id_wrapped()),
659        );
660
661        let null_safe_prost = self.eq_join_predicate().null_safes().into_iter().collect();
662
663        let condition = self
664            .eq_join_predicate()
665            .other_cond()
666            .as_expr_unless_true()
667            .map(|expr| expr.to_expr_proto_checked_pure(retract, "JOIN condition"))
668            .transpose()?;
669
670        // Helper function to convert ExprType to PbInequalityType
671        fn expr_type_to_pb_inequality_type(op: ExprType) -> i32 {
672            match op {
673                ExprType::LessThan => PbInequalityType::LessThan as i32,
674                ExprType::LessThanOrEqual => PbInequalityType::LessThanOrEqual as i32,
675                ExprType::GreaterThan => PbInequalityType::GreaterThan as i32,
676                ExprType::GreaterThanOrEqual => PbInequalityType::GreaterThanOrEqual as i32,
677                _ => PbInequalityType::Unspecified as i32,
678            }
679        }
680
681        Ok(NodeBody::HashJoin(Box::new(HashJoinNode {
682            join_type: self.core.join_type as i32,
683            left_key: left_jk_indices_prost,
684            right_key: right_jk_indices_prost,
685            null_safe: null_safe_prost,
686            condition,
687            watermark_handle_desc: Some(HashJoinWatermarkHandleDesc {
688                watermark_indices_in_jk: self
689                    .watermark_indices_in_jk
690                    .iter()
691                    .map(|(idx, do_clean)| JoinKeyWatermarkIndex {
692                        index: *idx as u32,
693                        do_state_cleaning: *do_clean,
694                    })
695                    .collect(),
696                inequality_pairs: self
697                    .inequality_pairs
698                    .iter()
699                    .map(
700                        |(_conjunction_idx, clean_left, clean_right, pair)| PbInequalityPairV2 {
701                            left_idx: pair.left_idx as u32,
702                            right_idx: pair.right_idx as u32,
703                            clean_left_state: *clean_left,
704                            clean_right_state: *clean_right,
705                            op: expr_type_to_pb_inequality_type(pair.op),
706                        },
707                    )
708                    .collect(),
709            }),
710            left_table: Some(left_table.to_internal_table_prost()),
711            right_table: Some(right_table.to_internal_table_prost()),
712            left_degree_table: Some(left_degree_table.to_internal_table_prost()),
713            right_degree_table: Some(right_degree_table.to_internal_table_prost()),
714            left_deduped_input_pk_indices,
715            right_deduped_input_pk_indices,
716            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
717            is_append_only: self.is_append_only,
718            // Join encoding type should now be read from per-job config override.
719            #[allow(deprecated)]
720            join_encoding_type: PbJoinEncodingType::Unspecified as _,
721        })))
722    }
723}
724
725impl ExprRewritable<Stream> for StreamHashJoin {
726    fn has_rewritable_expr(&self) -> bool {
727        true
728    }
729
730    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
731        let mut core = self.core.clone();
732        core.rewrite_exprs(r);
733        Self::new(core).unwrap().into()
734    }
735}
736
737impl ExprVisitable for StreamHashJoin {
738    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
739        self.core.visit_exprs(v);
740    }
741}