risingwave_frontend/optimizer/rule/stream/
separate_consecutive_join_rule.rs1use super::prelude::*;
16use crate::optimizer::plan_node::{PlanTreeNodeBinary, StreamExchange};
17
18pub struct SeparateConsecutiveJoinRule {}
20
21impl Rule<Stream> for SeparateConsecutiveJoinRule {
22 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
23 let join = plan.as_stream_hash_join()?;
24 let left_input = join.left();
25 let right_input = join.right();
26
27 let left_needs_separation = left_input.as_stream_hash_join().is_some();
28 let right_needs_separation = right_input.as_stream_hash_join().is_some();
29
30 if !left_needs_separation && !right_needs_separation {
31 return None;
32 }
33
34 let new_left = if left_needs_separation {
35 StreamExchange::new_no_shuffle(left_input).into()
36 } else {
37 left_input
38 };
39 let new_right = if right_needs_separation {
40 StreamExchange::new_no_shuffle(right_input).into()
41 } else {
42 right_input
43 };
44
45 Some(join.clone_with_left_right(new_left, new_right).into())
46 }
47}
48
49impl SeparateConsecutiveJoinRule {
50 pub fn create() -> BoxedRule {
51 Box::new(SeparateConsecutiveJoinRule {})
52 }
53}