risingwave_frontend/optimizer/rule/stream/
separate_consecutive_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::PlanRef;
16use crate::optimizer::plan_node::{
17    LogicalJoin, PlanTreeNodeBinary, StreamExchange, StreamHashJoin,
18};
19use crate::optimizer::rule::{BoxedRule, Rule};
20
21/// Separate consecutive stream hash joins by no-shuffle exchange
22pub struct SeparateConsecutiveJoinRule {}
23
24impl Rule for SeparateConsecutiveJoinRule {
25    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
26        let join = plan.as_stream_hash_join()?;
27        let left_input = join.left();
28        let right_input = join.right();
29
30        let new_left = if left_input.as_stream_hash_join().is_some() {
31            StreamExchange::new_no_shuffle(left_input).into()
32        } else {
33            left_input
34        };
35
36        let new_right = if right_input.as_stream_hash_join().is_some() {
37            StreamExchange::new_no_shuffle(right_input).into()
38        } else {
39            right_input
40        };
41
42        let new_logical_join = LogicalJoin::new(
43            new_left,
44            new_right,
45            join.join_type(),
46            join.eq_join_predicate().all_cond(),
47        );
48        Some(
49            StreamHashJoin::new(
50                new_logical_join.core().clone(),
51                join.eq_join_predicate().clone(),
52            )
53            .into(),
54        )
55    }
56}
57
58impl SeparateConsecutiveJoinRule {
59    pub fn create() -> BoxedRule {
60        Box::new(SeparateConsecutiveJoinRule {})
61    }
62}