risingwave_frontend/optimizer/plan_node/generic/
join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::{EitherOrBoth, Itertools};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::plan_common::JoinType;
20
21use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef};
22use crate::TableCatalog;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::optimizer_context::OptimizerContextRef;
25use crate::optimizer::plan_node::StreamPlanRef;
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata as _;
27use crate::optimizer::plan_node::stream::prelude::*;
28use crate::optimizer::plan_node::utils::TableCatalogBuilder;
29use crate::optimizer::property::{FunctionalDependencySet, StreamKind};
30use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
31
32/// [`Join`] combines two relations according to some condition.
33///
34/// Each output row has fields from the left and right inputs. The set of output rows is a subset
35/// of the cartesian product of the two inputs; precisely which subset depends on the join
36/// condition. In addition, the output columns are a subset of the columns of the left and
37/// right columns, dependent on the output indices provided. A repeat output index is illegal.
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct Join<PlanRef> {
40    pub left: PlanRef,
41    pub right: PlanRef,
42    pub on: Condition,
43    pub join_type: JoinType,
44    pub output_indices: Vec<usize>,
45}
46
47pub(crate) fn has_repeated_element(slice: &[usize]) -> bool {
48    (1..slice.len()).any(|i| slice[i..].contains(&slice[i - 1]))
49}
50
51impl<PlanRef: GenericPlanRef> Join<PlanRef> {
52    pub(crate) fn clone_with_inputs<OtherPlanRef>(
53        &self,
54        left: OtherPlanRef,
55        right: OtherPlanRef,
56    ) -> Join<OtherPlanRef> {
57        Join {
58            left,
59            right,
60            on: self.on.clone(),
61            join_type: self.join_type,
62            output_indices: self.output_indices.clone(),
63        }
64    }
65
66    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
67        self.on = self.on.clone().rewrite_expr(r);
68    }
69
70    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
71        self.on.visit_expr(v);
72    }
73
74    pub fn eq_indexes(&self) -> Vec<(usize, usize)> {
75        let left_len = self.left.schema().len();
76        let right_len = self.right.schema().len();
77        let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());
78        eq_predicate.eq_indexes()
79    }
80
81    pub fn new(
82        left: PlanRef,
83        right: PlanRef,
84        on: Condition,
85        join_type: JoinType,
86        output_indices: Vec<usize>,
87    ) -> Self {
88        // We cannot deal with repeated output indices in join
89        debug_assert!(!has_repeated_element(&output_indices));
90        Self {
91            left,
92            right,
93            on,
94            join_type,
95            output_indices,
96        }
97    }
98}
99
100impl Join<StreamPlanRef> {
101    pub fn stream_kind(&self) -> Result<StreamKind> {
102        let left_kind = reject_upsert_input!(self.left, "Join");
103        let right_kind = reject_upsert_input!(self.right, "Join");
104
105        // Inner join won't change the append-only behavior of the stream. The rest might.
106        if let JoinType::Inner | JoinType::AsofInner = self.join_type
107            && let StreamKind::AppendOnly = left_kind
108            && let StreamKind::AppendOnly = right_kind
109        {
110            Ok(StreamKind::AppendOnly)
111        } else {
112            Ok(StreamKind::Retract)
113        }
114    }
115
116    /// Return stream hash join internal table catalog and degree table catalog.
117    pub fn infer_internal_and_degree_table_catalog(
118        input: StreamPlanRef,
119        join_key_indices: Vec<usize>,
120        dk_indices_in_jk: Vec<usize>,
121    ) -> (TableCatalog, TableCatalog, Vec<usize>) {
122        let schema = input.schema();
123
124        let internal_table_dist_keys = dk_indices_in_jk
125            .iter()
126            .map(|idx| join_key_indices[*idx])
127            .collect_vec();
128
129        let degree_table_dist_keys = dk_indices_in_jk.clone();
130
131        // The pk of hash join internal and degree table should be join_key + input_pk.
132        let join_key_len = join_key_indices.len();
133        let mut pk_indices = join_key_indices;
134
135        // dedup the pk in dist key..
136        let mut deduped_input_pk_indices = vec![];
137        for input_pk_idx in input.stream_key().unwrap() {
138            if !pk_indices.contains(input_pk_idx)
139                && !deduped_input_pk_indices.contains(input_pk_idx)
140            {
141                deduped_input_pk_indices.push(*input_pk_idx);
142            }
143        }
144
145        pk_indices.extend(deduped_input_pk_indices.clone());
146
147        // Build internal table
148        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
149        let internal_columns_fields = schema.fields().to_vec();
150
151        internal_columns_fields.iter().for_each(|field| {
152            internal_table_catalog_builder.add_column(field);
153        });
154        pk_indices.iter().for_each(|idx| {
155            internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
156        });
157
158        // Build degree table.
159        let mut degree_table_catalog_builder = TableCatalogBuilder::default();
160
161        let degree_column_field = Field::with_name(DataType::Int64, "_degree");
162
163        pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
164            degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
165            degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending());
166        });
167        degree_table_catalog_builder.add_column(&degree_column_field);
168        degree_table_catalog_builder
169            .set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]);
170
171        internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
172        degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);
173
174        (
175            internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
176            degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len),
177            deduped_input_pk_indices,
178        )
179    }
180}
181
182impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
183    fn schema(&self) -> Schema {
184        let left_schema = self.left.schema();
185        let right_schema = self.right.schema();
186        let i2l = self.i2l_col_mapping();
187        let i2r = self.i2r_col_mapping();
188        let fields = self
189            .output_indices
190            .iter()
191            .map(|&i| match (i2l.try_map(i), i2r.try_map(i)) {
192                (Some(l_i), None) => left_schema.fields()[l_i].clone(),
193                (None, Some(r_i)) => right_schema.fields()[r_i].clone(),
194                _ => panic!(
195                    "left len {}, right len {}, i {}, lmap {:?}, rmap {:?}",
196                    left_schema.len(),
197                    right_schema.len(),
198                    i,
199                    i2l,
200                    i2r
201                ),
202            })
203            .collect();
204        Schema { fields }
205    }
206
207    fn stream_key(&self) -> Option<Vec<usize>> {
208        let eq_indexes = self.eq_indexes();
209        let left_pk = self.left.stream_key()?;
210        let right_pk = self.right.stream_key()?;
211        let l2i = self.l2i_col_mapping();
212        let r2i = self.r2i_col_mapping();
213        let full_out_col_num = self.internal_column_num();
214        let i2o = ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num);
215
216        // Collect PKs in internal column space (without applying i2o mapping yet)
217        let mut pk_indices_internal = left_pk
218            .iter()
219            .map(|index| l2i.try_map(*index))
220            .chain(right_pk.iter().map(|index| r2i.try_map(*index)))
221            .flatten()
222            .collect::<Vec<_>>();
223
224        let either_or_both = self.add_which_join_key_to_pk();
225
226        for (lk, rk) in eq_indexes {
227            match either_or_both {
228                EitherOrBoth::Left(_) => {
229                    // Remove right-side join-key column from pk_indices_internal.
230                    // This may happen when right-side join-key is included in right-side PK.
231                    // e.g. select a, b where a.bid = b.id
232                    // Here the pk_indices should be [a.id, a.bid] instead of [a.id, b.id, a.bid],
233                    // because b.id = a.bid, so either of them would be enough.
234                    if let Some(rk_internal) = r2i.try_map(rk) {
235                        pk_indices_internal.retain(|&x| x != rk_internal);
236                    }
237                    // Add left-side join-key column in pk_indices_internal
238                    if let Some(lk_internal) = l2i.try_map(lk)
239                        && !pk_indices_internal.contains(&lk_internal)
240                    {
241                        pk_indices_internal.push(lk_internal);
242                    }
243                }
244                EitherOrBoth::Right(_) => {
245                    // Remove left-side join-key column from pk_indices_internal
246                    // See the example above
247                    if let Some(lk_internal) = l2i.try_map(lk) {
248                        pk_indices_internal.retain(|&x| x != lk_internal);
249                    }
250                    // Add right-side join-key column in pk_indices_internal
251                    if let Some(rk_internal) = r2i.try_map(rk)
252                        && !pk_indices_internal.contains(&rk_internal)
253                    {
254                        pk_indices_internal.push(rk_internal);
255                    }
256                }
257                EitherOrBoth::Both(_, _) => {
258                    if let Some(lk_internal) = l2i.try_map(lk)
259                        && !pk_indices_internal.contains(&lk_internal)
260                    {
261                        pk_indices_internal.push(lk_internal);
262                    }
263                    if let Some(rk_internal) = r2i.try_map(rk)
264                        && !pk_indices_internal.contains(&rk_internal)
265                    {
266                        pk_indices_internal.push(rk_internal);
267                    }
268                }
269            };
270        }
271
272        // Now apply i2o mapping to get output indices
273        let pk_indices = pk_indices_internal
274            .iter()
275            .map(|&index| i2o.try_map(index))
276            .collect::<Option<Vec<_>>>()?;
277
278        Some(pk_indices)
279    }
280
281    fn ctx(&self) -> OptimizerContextRef {
282        self.left.ctx()
283    }
284
285    fn functional_dependency(&self) -> FunctionalDependencySet {
286        let left_len = self.left.schema().len();
287        let right_len = self.right.schema().len();
288        let left_fd_set = self.left.functional_dependency().clone();
289        let right_fd_set = self.right.functional_dependency().clone();
290
291        let full_out_col_num = self.internal_column_num();
292
293        let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| {
294            ColIndexMapping::with_shift_offset(left_len, 0)
295                .composite(&ColIndexMapping::identity(full_out_col_num))
296                .rewrite_functional_dependency_set(left_fd_set)
297        };
298        let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| {
299            ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap())
300                .rewrite_functional_dependency_set(right_fd_set)
301        };
302        let fd_set: FunctionalDependencySet = match self.join_type {
303            JoinType::Inner | JoinType::AsofInner => {
304                let mut fd_set = FunctionalDependencySet::new(full_out_col_num);
305                for i in &self.on.conjunctions {
306                    if let Some((col, _)) = i.as_eq_const() {
307                        fd_set.add_constant_columns(&[col.index()])
308                    } else if let Some((left, right)) = i.as_eq_cond() {
309                        fd_set.add_functional_dependency_by_column_indices(
310                            &[left.index()],
311                            &[right.index()],
312                        );
313                        fd_set.add_functional_dependency_by_column_indices(
314                            &[right.index()],
315                            &[left.index()],
316                        );
317                    }
318                }
319                get_new_left_fd_set(left_fd_set)
320                    .into_dependencies()
321                    .into_iter()
322                    .chain(get_new_right_fd_set(right_fd_set).into_dependencies())
323                    .for_each(|fd| fd_set.add_functional_dependency(fd));
324                fd_set
325            }
326            JoinType::LeftOuter | JoinType::AsofLeftOuter => get_new_left_fd_set(left_fd_set),
327            JoinType::RightOuter => get_new_right_fd_set(right_fd_set),
328            JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num),
329            JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set,
330            JoinType::RightSemi | JoinType::RightAnti => right_fd_set,
331            JoinType::Unspecified => unreachable!(),
332        };
333        ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num)
334            .rewrite_functional_dependency_set(fd_set)
335    }
336}
337
338impl<PlanRef> Join<PlanRef> {
339    pub fn decompose(self) -> (PlanRef, PlanRef, Condition, JoinType, Vec<usize>) {
340        (
341            self.left,
342            self.right,
343            self.on,
344            self.join_type,
345            self.output_indices,
346        )
347    }
348}
349
350impl<PlanRef: GenericPlanRef> Join<PlanRef> {
351    pub fn full_out_col_num(left_len: usize, right_len: usize, join_type: JoinType) -> usize {
352        match join_type {
353            JoinType::Inner
354            | JoinType::LeftOuter
355            | JoinType::RightOuter
356            | JoinType::FullOuter
357            | JoinType::AsofInner
358            | JoinType::AsofLeftOuter => left_len + right_len,
359            JoinType::LeftSemi | JoinType::LeftAnti => left_len,
360            JoinType::RightSemi | JoinType::RightAnti => right_len,
361            JoinType::Unspecified => unreachable!(),
362        }
363    }
364
365    pub fn with_full_output(
366        left: PlanRef,
367        right: PlanRef,
368        join_type: JoinType,
369        on: Condition,
370    ) -> Self {
371        let out_column_num =
372            Self::full_out_col_num(left.schema().len(), right.schema().len(), join_type);
373        Self {
374            left,
375            right,
376            join_type,
377            on,
378            output_indices: (0..out_column_num).collect(),
379        }
380    }
381
382    pub fn internal_column_num(&self) -> usize {
383        Self::full_out_col_num(
384            self.left.schema().len(),
385            self.right.schema().len(),
386            self.join_type,
387        )
388    }
389
390    pub fn is_full_out(&self) -> bool {
391        self.output_indices.len() == self.internal_column_num()
392    }
393
394    /// Get the Mapping of columnIndex from internal column index to left column index.
395    pub fn i2l_col_mapping(&self) -> ColIndexMapping {
396        let left_len = self.left.schema().len();
397        let right_len = self.right.schema().len();
398
399        match self.join_type {
400            JoinType::Inner
401            | JoinType::LeftOuter
402            | JoinType::RightOuter
403            | JoinType::FullOuter
404            | JoinType::AsofInner
405            | JoinType::AsofLeftOuter => {
406                ColIndexMapping::identity_or_none(left_len + right_len, left_len)
407            }
408
409            JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::identity(left_len),
410            JoinType::RightSemi | JoinType::RightAnti => {
411                ColIndexMapping::empty(right_len, left_len)
412            }
413            JoinType::Unspecified => unreachable!(),
414        }
415    }
416
417    /// Get the Mapping of columnIndex from internal column index to right column index.
418    pub fn i2r_col_mapping(&self) -> ColIndexMapping {
419        let left_len = self.left.schema().len();
420        let right_len = self.right.schema().len();
421
422        match self.join_type {
423            JoinType::Inner
424            | JoinType::LeftOuter
425            | JoinType::RightOuter
426            | JoinType::FullOuter
427            | JoinType::AsofInner
428            | JoinType::AsofLeftOuter => {
429                ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
430            }
431            JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::empty(left_len, right_len),
432            JoinType::RightSemi | JoinType::RightAnti => ColIndexMapping::identity(right_len),
433            JoinType::Unspecified => unreachable!(),
434        }
435    }
436
437    /// TODO: This function may can be merged with `i2l_col_mapping` in future.
438    pub fn i2l_col_mapping_ignore_join_type(&self) -> ColIndexMapping {
439        let left_len = self.left.schema().len();
440        let right_len = self.right.schema().len();
441
442        ColIndexMapping::identity_or_none(left_len + right_len, left_len)
443    }
444
445    /// TODO: This function may can be merged with `i2r_col_mapping` in future.
446    pub fn i2r_col_mapping_ignore_join_type(&self) -> ColIndexMapping {
447        let left_len = self.left.schema().len();
448        let right_len = self.right.schema().len();
449
450        ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
451    }
452
453    /// Get the Mapping of columnIndex from left column index to internal column index.
454    pub fn l2i_col_mapping(&self) -> ColIndexMapping {
455        self.i2l_col_mapping()
456            .inverse()
457            .expect("must be invertible")
458    }
459
460    /// Get the Mapping of columnIndex from right column index to internal column index.
461    pub fn r2i_col_mapping(&self) -> ColIndexMapping {
462        self.i2r_col_mapping()
463            .inverse()
464            .expect("must be invertible")
465    }
466
467    /// Get the Mapping of columnIndex from internal column index to output column index
468    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
469        ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num())
470    }
471
472    /// Get the Mapping of columnIndex from output column index to internal column index
473    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
474        // If output_indices = [0, 0, 1], we should use it as `o2i_col_mapping` directly.
475        // If we use `self.i2o_col_mapping().inverse()`, we will lose the first 0.
476        ColIndexMapping::new(
477            self.output_indices.iter().map(|x| Some(*x)).collect(),
478            self.internal_column_num(),
479        )
480    }
481
482    pub fn add_which_join_key_to_pk(&self) -> EitherOrBoth<(), ()> {
483        match self.join_type {
484            JoinType::Inner | JoinType::AsofInner => {
485                // Theoretically adding either side is ok, but the distribution key of the inner
486                // join derived based on the left side by default, so we choose the left side here
487                // to ensure the pk comprises the distribution key.
488                EitherOrBoth::Left(())
489            }
490            JoinType::LeftOuter
491            | JoinType::LeftSemi
492            | JoinType::LeftAnti
493            | JoinType::AsofLeftOuter => EitherOrBoth::Left(()),
494            JoinType::RightSemi | JoinType::RightAnti | JoinType::RightOuter => {
495                EitherOrBoth::Right(())
496            }
497            JoinType::FullOuter => EitherOrBoth::Both((), ()),
498            JoinType::Unspecified => unreachable!(),
499        }
500    }
501
502    pub fn concat_schema(&self) -> Schema {
503        Schema::new(
504            [
505                self.left.schema().fields.clone(),
506                self.right.schema().fields.clone(),
507            ]
508            .concat(),
509        )
510    }
511}
512
513/// Try to split and pushdown `predicate` into a into a join condition and into the inputs of the
514/// join. Returns the pushed predicates. The pushed part will be removed from the original
515/// predicate.
516///
517/// `InputRef`s in the right pushed condition are indexed by the right child's output schema.
518pub fn push_down_into_join(
519    predicate: &mut Condition,
520    left_col_num: usize,
521    right_col_num: usize,
522    ty: JoinType,
523    push_temporal_predicate: bool,
524) -> (Condition, Condition, Condition) {
525    let (left, right) = push_down_to_inputs(
526        predicate,
527        left_col_num,
528        right_col_num,
529        can_push_left_from_filter(ty),
530        can_push_right_from_filter(ty),
531        push_temporal_predicate,
532    );
533
534    let on = if can_push_on_from_filter(ty) {
535        let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
536
537        if push_temporal_predicate {
538            Condition { conjunctions }
539        } else {
540            // Do not push now on to the on, it will be pulled up into a filter instead.
541            let on = Condition {
542                conjunctions: conjunctions
543                    .extract_if(.., |expr| expr.count_nows() == 0)
544                    .collect(),
545            };
546            predicate.conjunctions = conjunctions;
547            on
548        }
549    } else {
550        Condition::true_cond()
551    };
552    (left, right, on)
553}
554
555/// Try to pushes parts of the join condition to its inputs. Returns the pushed predicates. The
556/// pushed part will be removed from the original join predicate.
557///
558/// `InputRef`s in the right pushed condition are indexed by the right child's output schema.
559pub fn push_down_join_condition(
560    on_condition: &mut Condition,
561    left_col_num: usize,
562    right_col_num: usize,
563    ty: JoinType,
564    push_temporal_predicate: bool,
565) -> (Condition, Condition) {
566    push_down_to_inputs(
567        on_condition,
568        left_col_num,
569        right_col_num,
570        can_push_left_from_on(ty),
571        can_push_right_from_on(ty),
572        push_temporal_predicate,
573    )
574}
575
576/// Try to split and pushdown `predicate` into a join's left/right child.
577/// Returns the pushed predicates. The pushed part will be removed from the original predicate.
578///
579/// `InputRef`s in the right `Condition` are shifted by `-left_col_num`.
580fn push_down_to_inputs(
581    predicate: &mut Condition,
582    left_col_num: usize,
583    right_col_num: usize,
584    push_left: bool,
585    push_right: bool,
586    push_temporal_predicate: bool,
587) -> (Condition, Condition) {
588    let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
589    let (mut left, right, mut others) = if push_temporal_predicate {
590        Condition { conjunctions }.split(left_col_num, right_col_num)
591    } else {
592        let temporal_filter_cons = conjunctions
593            .extract_if(.., |e| e.count_nows() != 0)
594            .collect_vec();
595        let (left, right, mut others) =
596            Condition { conjunctions }.split(left_col_num, right_col_num);
597
598        others.conjunctions.extend(temporal_filter_cons);
599        (left, right, others)
600    };
601
602    if !push_left {
603        others.conjunctions.extend(left);
604        left = Condition::true_cond();
605    };
606
607    let right = if push_right {
608        let mut mapping = ColIndexMapping::with_shift_offset(
609            left_col_num + right_col_num,
610            -(left_col_num as isize),
611        );
612        right.rewrite_expr(&mut mapping)
613    } else {
614        others.conjunctions.extend(right);
615        Condition::true_cond()
616    };
617
618    predicate.conjunctions = others.conjunctions;
619
620    (left, right)
621}
622
623pub fn can_push_left_from_filter(ty: JoinType) -> bool {
624    matches!(
625        ty,
626        JoinType::Inner
627            | JoinType::LeftOuter
628            | JoinType::LeftSemi
629            | JoinType::LeftAnti
630            | JoinType::AsofInner
631            | JoinType::AsofLeftOuter
632    )
633}
634
635pub fn can_push_right_from_filter(ty: JoinType) -> bool {
636    matches!(
637        ty,
638        JoinType::Inner
639            | JoinType::RightOuter
640            | JoinType::RightSemi
641            | JoinType::RightAnti
642            | JoinType::AsofInner
643    )
644}
645
646pub fn can_push_on_from_filter(ty: JoinType) -> bool {
647    matches!(
648        ty,
649        JoinType::Inner | JoinType::LeftSemi | JoinType::RightSemi
650    )
651}
652
653pub fn can_push_left_from_on(ty: JoinType) -> bool {
654    matches!(
655        ty,
656        JoinType::Inner
657            | JoinType::RightOuter
658            | JoinType::LeftSemi
659            | JoinType::AsofInner
660            | JoinType::AsofLeftOuter
661    )
662}
663
664pub fn can_push_right_from_on(ty: JoinType) -> bool {
665    matches!(
666        ty,
667        JoinType::Inner
668            | JoinType::LeftOuter
669            | JoinType::RightSemi
670            | JoinType::AsofInner
671            | JoinType::AsofLeftOuter
672    )
673}