risingwave_frontend/optimizer/rule/stream/
separate_consecutive_join_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::prelude::*;
16use crate::optimizer::plan_node::{PlanTreeNodeBinary, StreamExchange};
17
18/// Separate consecutive stream hash joins by no-shuffle exchange
19pub struct SeparateConsecutiveJoinRule {}
20
21impl Rule<Stream> for SeparateConsecutiveJoinRule {
22    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
23        let join = plan.as_stream_hash_join()?;
24        let left_input = join.left();
25        let right_input = join.right();
26
27        let left_needs_separation = left_input.as_stream_hash_join().is_some();
28        let right_needs_separation = right_input.as_stream_hash_join().is_some();
29
30        if !left_needs_separation && !right_needs_separation {
31            return None;
32        }
33
34        let new_left = if left_needs_separation {
35            StreamExchange::new_no_shuffle(left_input).into()
36        } else {
37            left_input
38        };
39        let new_right = if right_needs_separation {
40            StreamExchange::new_no_shuffle(right_input).into()
41        } else {
42            right_input
43        };
44
45        Some(join.clone_with_left_right(new_left, new_right).into())
46    }
47}
48
49impl SeparateConsecutiveJoinRule {
50    pub fn create() -> BoxedRule {
51        Box::new(SeparateConsecutiveJoinRule {})
52    }
53}