risingwave_frontend/optimizer/rule/stream/
separate_consecutive_join.rs1use crate::PlanRef;
16use crate::optimizer::plan_node::{
17 LogicalJoin, PlanTreeNodeBinary, StreamExchange, StreamHashJoin,
18};
19use crate::optimizer::rule::{BoxedRule, Rule};
20
21pub struct SeparateConsecutiveJoinRule {}
23
24impl Rule for SeparateConsecutiveJoinRule {
25 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
26 let join = plan.as_stream_hash_join()?;
27 let left_input = join.left();
28 let right_input = join.right();
29
30 let new_left = if left_input.as_stream_hash_join().is_some() {
31 StreamExchange::new_no_shuffle(left_input).into()
32 } else {
33 left_input
34 };
35
36 let new_right = if right_input.as_stream_hash_join().is_some() {
37 StreamExchange::new_no_shuffle(right_input).into()
38 } else {
39 right_input
40 };
41
42 let new_logical_join = LogicalJoin::new(
43 new_left,
44 new_right,
45 join.join_type(),
46 join.eq_join_predicate().all_cond(),
47 );
48 Some(
49 StreamHashJoin::new(
50 new_logical_join.core().clone(),
51 join.eq_join_predicate().clone(),
52 )
53 .into(),
54 )
55 }
56}
57
58impl SeparateConsecutiveJoinRule {
59 pub fn create() -> BoxedRule {
60 Box::new(SeparateConsecutiveJoinRule {})
61 }
62}