risingwave_frontend/optimizer/plan_node/generic/
join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::{EitherOrBoth, Itertools};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::plan_common::JoinType;
20
21use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef};
22use crate::TableCatalog;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::optimizer_context::OptimizerContextRef;
25use crate::optimizer::plan_node::StreamPlanRef;
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata as _;
27use crate::optimizer::plan_node::stream::prelude::*;
28use crate::optimizer::plan_node::utils::TableCatalogBuilder;
29use crate::optimizer::property::{FunctionalDependencySet, StreamKind};
30use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
31
32/// [`Join`] combines two relations according to some condition.
33///
34/// Each output row has fields from the left and right inputs. The set of output rows is a subset
35/// of the cartesian product of the two inputs; precisely which subset depends on the join
36/// condition. In addition, the output columns are a subset of the columns of the left and
37/// right columns, dependent on the output indices provided. A repeat output index is illegal.
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct Join<PlanRef> {
40    pub left: PlanRef,
41    pub right: PlanRef,
42    pub on: Condition,
43    pub join_type: JoinType,
44    pub output_indices: Vec<usize>,
45}
46
47pub(crate) fn has_repeated_element(slice: &[usize]) -> bool {
48    (1..slice.len()).any(|i| slice[i..].contains(&slice[i - 1]))
49}
50
51impl<PlanRef: GenericPlanRef> Join<PlanRef> {
52    pub(crate) fn clone_with_inputs<OtherPlanRef>(
53        &self,
54        left: OtherPlanRef,
55        right: OtherPlanRef,
56    ) -> Join<OtherPlanRef> {
57        Join {
58            left,
59            right,
60            on: self.on.clone(),
61            join_type: self.join_type,
62            output_indices: self.output_indices.clone(),
63        }
64    }
65
66    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
67        self.on = self.on.clone().rewrite_expr(r);
68    }
69
70    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
71        self.on.visit_expr(v);
72    }
73
74    pub fn eq_indexes(&self) -> Vec<(usize, usize)> {
75        let left_len = self.left.schema().len();
76        let right_len = self.right.schema().len();
77        let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());
78        eq_predicate.eq_indexes()
79    }
80
81    pub fn new(
82        left: PlanRef,
83        right: PlanRef,
84        on: Condition,
85        join_type: JoinType,
86        output_indices: Vec<usize>,
87    ) -> Self {
88        // We cannot deal with repeated output indices in join
89        debug_assert!(!has_repeated_element(&output_indices));
90        Self {
91            left,
92            right,
93            on,
94            join_type,
95            output_indices,
96        }
97    }
98}
99
100impl Join<StreamPlanRef> {
101    pub fn stream_kind(&self) -> Result<StreamKind> {
102        let left_kind = reject_upsert_input!(self.left, "Join");
103        let right_kind = reject_upsert_input!(self.right, "Join");
104
105        // Inner join won't change the append-only behavior of the stream. The rest might.
106        if let JoinType::Inner | JoinType::AsofInner = self.join_type
107            && let StreamKind::AppendOnly = left_kind
108            && let StreamKind::AppendOnly = right_kind
109        {
110            Ok(StreamKind::AppendOnly)
111        } else {
112            Ok(StreamKind::Retract)
113        }
114    }
115
116    /// Return stream hash join internal table catalog and degree table catalog.
117    pub fn infer_internal_and_degree_table_catalog(
118        input: StreamPlanRef,
119        join_key_indices: Vec<usize>,
120        dk_indices_in_jk: Vec<usize>,
121    ) -> (TableCatalog, TableCatalog, Vec<usize>) {
122        let schema = input.schema();
123
124        let internal_table_dist_keys = dk_indices_in_jk
125            .iter()
126            .map(|idx| join_key_indices[*idx])
127            .collect_vec();
128
129        let degree_table_dist_keys = dk_indices_in_jk.clone();
130
131        // The pk of hash join internal and degree table should be join_key + input_pk.
132        let join_key_len = join_key_indices.len();
133        let mut pk_indices = join_key_indices;
134
135        // dedup the pk in dist key..
136        let mut deduped_input_pk_indices = vec![];
137        for input_pk_idx in input.stream_key().unwrap() {
138            if !pk_indices.contains(input_pk_idx)
139                && !deduped_input_pk_indices.contains(input_pk_idx)
140            {
141                deduped_input_pk_indices.push(*input_pk_idx);
142            }
143        }
144
145        pk_indices.extend(deduped_input_pk_indices.clone());
146
147        // Build internal table
148        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
149        let internal_columns_fields = schema.fields().to_vec();
150
151        internal_columns_fields.iter().for_each(|field| {
152            internal_table_catalog_builder.add_column(field);
153        });
154        pk_indices.iter().for_each(|idx| {
155            internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
156        });
157
158        // Build degree table.
159        let mut degree_table_catalog_builder = TableCatalogBuilder::default();
160
161        let degree_column_field = Field::with_name(DataType::Int64, "_degree");
162
163        pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
164            degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
165            degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending());
166        });
167        degree_table_catalog_builder.add_column(&degree_column_field);
168        degree_table_catalog_builder
169            .set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]);
170
171        internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
172        degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);
173
174        (
175            internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
176            degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len),
177            deduped_input_pk_indices,
178        )
179    }
180}
181
182impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
183    fn schema(&self) -> Schema {
184        let left_schema = self.left.schema();
185        let right_schema = self.right.schema();
186        let i2l = self.i2l_col_mapping();
187        let i2r = self.i2r_col_mapping();
188        let fields = self
189            .output_indices
190            .iter()
191            .map(|&i| match (i2l.try_map(i), i2r.try_map(i)) {
192                (Some(l_i), None) => left_schema.fields()[l_i].clone(),
193                (None, Some(r_i)) => right_schema.fields()[r_i].clone(),
194                _ => panic!(
195                    "left len {}, right len {}, i {}, lmap {:?}, rmap {:?}",
196                    left_schema.len(),
197                    right_schema.len(),
198                    i,
199                    i2l,
200                    i2r
201                ),
202            })
203            .collect();
204        Schema { fields }
205    }
206
207    fn stream_key(&self) -> Option<Vec<usize>> {
208        let eq_indexes = self.eq_indexes();
209        let left_pk = self.left.stream_key()?;
210        let right_pk = self.right.stream_key()?;
211        let l2i = self.l2i_col_mapping();
212        let r2i = self.r2i_col_mapping();
213        let full_out_col_num = self.internal_column_num();
214        let i2o = ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num);
215
216        let mut pk_indices = left_pk
217            .iter()
218            .map(|index| l2i.try_map(*index))
219            .chain(right_pk.iter().map(|index| r2i.try_map(*index)))
220            .flatten()
221            .map(|index| i2o.try_map(index))
222            .collect::<Option<Vec<_>>>()?;
223
224        // NOTE(st1page): add join keys in the pk_indices a work around before we really have stream
225        // key.
226        let l2i = self.l2i_col_mapping();
227        let r2i = self.r2i_col_mapping();
228        let full_out_col_num = self.internal_column_num();
229        let i2o = ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num);
230
231        let either_or_both = self.add_which_join_key_to_pk();
232
233        for (lk, rk) in eq_indexes {
234            match either_or_both {
235                EitherOrBoth::Left(_) => {
236                    // Remove right-side join-key column it from pk_indices.
237                    // This may happen when right-side join-key is included in right-side PK.
238                    // e.g. select a, b where a.bid = b.id
239                    // Here the pk_indices should be [a.id, a.bid] instead of [a.id, b.id, a.bid],
240                    // because b.id = a.bid, so either of them would be enough.
241                    if let Some(rk) = r2i.try_map(rk)
242                        && let Some(out_k) = i2o.try_map(rk)
243                    {
244                        pk_indices.retain(|&x| x != out_k);
245                    }
246                    // Add left-side join-key column in pk_indices
247                    if let Some(lk) = l2i.try_map(lk) {
248                        let out_k = i2o.try_map(lk)?;
249                        if !pk_indices.contains(&out_k) {
250                            pk_indices.push(out_k);
251                        }
252                    }
253                }
254                EitherOrBoth::Right(_) => {
255                    // Remove left-side join-key column it from pk_indices
256                    // See the example above
257                    if let Some(lk) = l2i.try_map(lk)
258                        && let Some(out_k) = i2o.try_map(lk)
259                    {
260                        pk_indices.retain(|&x| x != out_k);
261                    }
262                    // Add right-side join-key column in pk_indices
263                    if let Some(rk) = r2i.try_map(rk) {
264                        let out_k = i2o.try_map(rk)?;
265                        if !pk_indices.contains(&out_k) {
266                            pk_indices.push(out_k);
267                        }
268                    }
269                }
270                EitherOrBoth::Both(_, _) => {
271                    if let Some(lk) = l2i.try_map(lk) {
272                        let out_k = i2o.try_map(lk)?;
273                        if !pk_indices.contains(&out_k) {
274                            pk_indices.push(out_k);
275                        }
276                    }
277                    if let Some(rk) = r2i.try_map(rk) {
278                        let out_k = i2o.try_map(rk)?;
279                        if !pk_indices.contains(&out_k) {
280                            pk_indices.push(out_k);
281                        }
282                    }
283                }
284            };
285        }
286        Some(pk_indices)
287    }
288
289    fn ctx(&self) -> OptimizerContextRef {
290        self.left.ctx()
291    }
292
293    fn functional_dependency(&self) -> FunctionalDependencySet {
294        let left_len = self.left.schema().len();
295        let right_len = self.right.schema().len();
296        let left_fd_set = self.left.functional_dependency().clone();
297        let right_fd_set = self.right.functional_dependency().clone();
298
299        let full_out_col_num = self.internal_column_num();
300
301        let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| {
302            ColIndexMapping::with_shift_offset(left_len, 0)
303                .composite(&ColIndexMapping::identity(full_out_col_num))
304                .rewrite_functional_dependency_set(left_fd_set)
305        };
306        let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| {
307            ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap())
308                .rewrite_functional_dependency_set(right_fd_set)
309        };
310        let fd_set: FunctionalDependencySet = match self.join_type {
311            JoinType::Inner | JoinType::AsofInner => {
312                let mut fd_set = FunctionalDependencySet::new(full_out_col_num);
313                for i in &self.on.conjunctions {
314                    if let Some((col, _)) = i.as_eq_const() {
315                        fd_set.add_constant_columns(&[col.index()])
316                    } else if let Some((left, right)) = i.as_eq_cond() {
317                        fd_set.add_functional_dependency_by_column_indices(
318                            &[left.index()],
319                            &[right.index()],
320                        );
321                        fd_set.add_functional_dependency_by_column_indices(
322                            &[right.index()],
323                            &[left.index()],
324                        );
325                    }
326                }
327                get_new_left_fd_set(left_fd_set)
328                    .into_dependencies()
329                    .into_iter()
330                    .chain(get_new_right_fd_set(right_fd_set).into_dependencies())
331                    .for_each(|fd| fd_set.add_functional_dependency(fd));
332                fd_set
333            }
334            JoinType::LeftOuter | JoinType::AsofLeftOuter => get_new_left_fd_set(left_fd_set),
335            JoinType::RightOuter => get_new_right_fd_set(right_fd_set),
336            JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num),
337            JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set,
338            JoinType::RightSemi | JoinType::RightAnti => right_fd_set,
339            JoinType::Unspecified => unreachable!(),
340        };
341        ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num)
342            .rewrite_functional_dependency_set(fd_set)
343    }
344}
345
346impl<PlanRef> Join<PlanRef> {
347    pub fn decompose(self) -> (PlanRef, PlanRef, Condition, JoinType, Vec<usize>) {
348        (
349            self.left,
350            self.right,
351            self.on,
352            self.join_type,
353            self.output_indices,
354        )
355    }
356}
357
358impl<PlanRef: GenericPlanRef> Join<PlanRef> {
359    pub fn full_out_col_num(left_len: usize, right_len: usize, join_type: JoinType) -> usize {
360        match join_type {
361            JoinType::Inner
362            | JoinType::LeftOuter
363            | JoinType::RightOuter
364            | JoinType::FullOuter
365            | JoinType::AsofInner
366            | JoinType::AsofLeftOuter => left_len + right_len,
367            JoinType::LeftSemi | JoinType::LeftAnti => left_len,
368            JoinType::RightSemi | JoinType::RightAnti => right_len,
369            JoinType::Unspecified => unreachable!(),
370        }
371    }
372
373    pub fn with_full_output(
374        left: PlanRef,
375        right: PlanRef,
376        join_type: JoinType,
377        on: Condition,
378    ) -> Self {
379        let out_column_num =
380            Self::full_out_col_num(left.schema().len(), right.schema().len(), join_type);
381        Self {
382            left,
383            right,
384            join_type,
385            on,
386            output_indices: (0..out_column_num).collect(),
387        }
388    }
389
390    pub fn internal_column_num(&self) -> usize {
391        Self::full_out_col_num(
392            self.left.schema().len(),
393            self.right.schema().len(),
394            self.join_type,
395        )
396    }
397
398    pub fn is_full_out(&self) -> bool {
399        self.output_indices.len() == self.internal_column_num()
400    }
401
402    /// Get the Mapping of columnIndex from internal column index to left column index.
403    pub fn i2l_col_mapping(&self) -> ColIndexMapping {
404        let left_len = self.left.schema().len();
405        let right_len = self.right.schema().len();
406
407        match self.join_type {
408            JoinType::Inner
409            | JoinType::LeftOuter
410            | JoinType::RightOuter
411            | JoinType::FullOuter
412            | JoinType::AsofInner
413            | JoinType::AsofLeftOuter => {
414                ColIndexMapping::identity_or_none(left_len + right_len, left_len)
415            }
416
417            JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::identity(left_len),
418            JoinType::RightSemi | JoinType::RightAnti => {
419                ColIndexMapping::empty(right_len, left_len)
420            }
421            JoinType::Unspecified => unreachable!(),
422        }
423    }
424
425    /// Get the Mapping of columnIndex from internal column index to right column index.
426    pub fn i2r_col_mapping(&self) -> ColIndexMapping {
427        let left_len = self.left.schema().len();
428        let right_len = self.right.schema().len();
429
430        match self.join_type {
431            JoinType::Inner
432            | JoinType::LeftOuter
433            | JoinType::RightOuter
434            | JoinType::FullOuter
435            | JoinType::AsofInner
436            | JoinType::AsofLeftOuter => {
437                ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
438            }
439            JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::empty(left_len, right_len),
440            JoinType::RightSemi | JoinType::RightAnti => ColIndexMapping::identity(right_len),
441            JoinType::Unspecified => unreachable!(),
442        }
443    }
444
445    /// TODO: This function may can be merged with `i2l_col_mapping` in future.
446    pub fn i2l_col_mapping_ignore_join_type(&self) -> ColIndexMapping {
447        let left_len = self.left.schema().len();
448        let right_len = self.right.schema().len();
449
450        ColIndexMapping::identity_or_none(left_len + right_len, left_len)
451    }
452
453    /// TODO: This function may can be merged with `i2r_col_mapping` in future.
454    pub fn i2r_col_mapping_ignore_join_type(&self) -> ColIndexMapping {
455        let left_len = self.left.schema().len();
456        let right_len = self.right.schema().len();
457
458        ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
459    }
460
461    /// Get the Mapping of columnIndex from left column index to internal column index.
462    pub fn l2i_col_mapping(&self) -> ColIndexMapping {
463        self.i2l_col_mapping()
464            .inverse()
465            .expect("must be invertible")
466    }
467
468    /// Get the Mapping of columnIndex from right column index to internal column index.
469    pub fn r2i_col_mapping(&self) -> ColIndexMapping {
470        self.i2r_col_mapping()
471            .inverse()
472            .expect("must be invertible")
473    }
474
475    /// Get the Mapping of columnIndex from internal column index to output column index
476    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
477        ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num())
478    }
479
480    /// Get the Mapping of columnIndex from output column index to internal column index
481    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
482        // If output_indices = [0, 0, 1], we should use it as `o2i_col_mapping` directly.
483        // If we use `self.i2o_col_mapping().inverse()`, we will lose the first 0.
484        ColIndexMapping::new(
485            self.output_indices.iter().map(|x| Some(*x)).collect(),
486            self.internal_column_num(),
487        )
488    }
489
490    pub fn add_which_join_key_to_pk(&self) -> EitherOrBoth<(), ()> {
491        match self.join_type {
492            JoinType::Inner | JoinType::AsofInner => {
493                // Theoretically adding either side is ok, but the distribution key of the inner
494                // join derived based on the left side by default, so we choose the left side here
495                // to ensure the pk comprises the distribution key.
496                EitherOrBoth::Left(())
497            }
498            JoinType::LeftOuter
499            | JoinType::LeftSemi
500            | JoinType::LeftAnti
501            | JoinType::AsofLeftOuter => EitherOrBoth::Left(()),
502            JoinType::RightSemi | JoinType::RightAnti | JoinType::RightOuter => {
503                EitherOrBoth::Right(())
504            }
505            JoinType::FullOuter => EitherOrBoth::Both((), ()),
506            JoinType::Unspecified => unreachable!(),
507        }
508    }
509
510    pub fn concat_schema(&self) -> Schema {
511        Schema::new(
512            [
513                self.left.schema().fields.clone(),
514                self.right.schema().fields.clone(),
515            ]
516            .concat(),
517        )
518    }
519}
520
521/// Try to split and pushdown `predicate` into a into a join condition and into the inputs of the
522/// join. Returns the pushed predicates. The pushed part will be removed from the original
523/// predicate.
524///
525/// `InputRef`s in the right pushed condition are indexed by the right child's output schema.
526pub fn push_down_into_join(
527    predicate: &mut Condition,
528    left_col_num: usize,
529    right_col_num: usize,
530    ty: JoinType,
531    push_temporal_predicate: bool,
532) -> (Condition, Condition, Condition) {
533    let (left, right) = push_down_to_inputs(
534        predicate,
535        left_col_num,
536        right_col_num,
537        can_push_left_from_filter(ty),
538        can_push_right_from_filter(ty),
539        push_temporal_predicate,
540    );
541
542    let on = if can_push_on_from_filter(ty) {
543        let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
544
545        if push_temporal_predicate {
546            Condition { conjunctions }
547        } else {
548            // Do not push now on to the on, it will be pulled up into a filter instead.
549            let on = Condition {
550                conjunctions: conjunctions
551                    .extract_if(.., |expr| expr.count_nows() == 0)
552                    .collect(),
553            };
554            predicate.conjunctions = conjunctions;
555            on
556        }
557    } else {
558        Condition::true_cond()
559    };
560    (left, right, on)
561}
562
563/// Try to pushes parts of the join condition to its inputs. Returns the pushed predicates. The
564/// pushed part will be removed from the original join predicate.
565///
566/// `InputRef`s in the right pushed condition are indexed by the right child's output schema.
567pub fn push_down_join_condition(
568    on_condition: &mut Condition,
569    left_col_num: usize,
570    right_col_num: usize,
571    ty: JoinType,
572    push_temporal_predicate: bool,
573) -> (Condition, Condition) {
574    push_down_to_inputs(
575        on_condition,
576        left_col_num,
577        right_col_num,
578        can_push_left_from_on(ty),
579        can_push_right_from_on(ty),
580        push_temporal_predicate,
581    )
582}
583
584/// Try to split and pushdown `predicate` into a join's left/right child.
585/// Returns the pushed predicates. The pushed part will be removed from the original predicate.
586///
587/// `InputRef`s in the right `Condition` are shifted by `-left_col_num`.
588fn push_down_to_inputs(
589    predicate: &mut Condition,
590    left_col_num: usize,
591    right_col_num: usize,
592    push_left: bool,
593    push_right: bool,
594    push_temporal_predicate: bool,
595) -> (Condition, Condition) {
596    let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
597    let (mut left, right, mut others) = if push_temporal_predicate {
598        Condition { conjunctions }.split(left_col_num, right_col_num)
599    } else {
600        let temporal_filter_cons = conjunctions
601            .extract_if(.., |e| e.count_nows() != 0)
602            .collect_vec();
603        let (left, right, mut others) =
604            Condition { conjunctions }.split(left_col_num, right_col_num);
605
606        others.conjunctions.extend(temporal_filter_cons);
607        (left, right, others)
608    };
609
610    if !push_left {
611        others.conjunctions.extend(left);
612        left = Condition::true_cond();
613    };
614
615    let right = if push_right {
616        let mut mapping = ColIndexMapping::with_shift_offset(
617            left_col_num + right_col_num,
618            -(left_col_num as isize),
619        );
620        right.rewrite_expr(&mut mapping)
621    } else {
622        others.conjunctions.extend(right);
623        Condition::true_cond()
624    };
625
626    predicate.conjunctions = others.conjunctions;
627
628    (left, right)
629}
630
631pub fn can_push_left_from_filter(ty: JoinType) -> bool {
632    matches!(
633        ty,
634        JoinType::Inner
635            | JoinType::LeftOuter
636            | JoinType::LeftSemi
637            | JoinType::LeftAnti
638            | JoinType::AsofInner
639            | JoinType::AsofLeftOuter
640    )
641}
642
643pub fn can_push_right_from_filter(ty: JoinType) -> bool {
644    matches!(
645        ty,
646        JoinType::Inner
647            | JoinType::RightOuter
648            | JoinType::RightSemi
649            | JoinType::RightAnti
650            | JoinType::AsofInner
651    )
652}
653
654pub fn can_push_on_from_filter(ty: JoinType) -> bool {
655    matches!(
656        ty,
657        JoinType::Inner | JoinType::LeftSemi | JoinType::RightSemi
658    )
659}
660
661pub fn can_push_left_from_on(ty: JoinType) -> bool {
662    matches!(
663        ty,
664        JoinType::Inner
665            | JoinType::RightOuter
666            | JoinType::LeftSemi
667            | JoinType::AsofInner
668            | JoinType::AsofLeftOuter
669    )
670}
671
672pub fn can_push_right_from_on(ty: JoinType) -> bool {
673    matches!(
674        ty,
675        JoinType::Inner
676            | JoinType::LeftOuter
677            | JoinType::RightSemi
678            | JoinType::AsofInner
679            | JoinType::AsofLeftOuter
680    )
681}