risingwave_frontend/optimizer/rule/stream/
separate_consecutive_join_rule.rs1use super::prelude::*;
16use crate::optimizer::plan_node::generic::Join;
17use crate::optimizer::plan_node::{PlanTreeNodeBinary, StreamExchange, StreamHashJoin};
18
19pub struct SeparateConsecutiveJoinRule {}
21
22impl Rule<Stream> for SeparateConsecutiveJoinRule {
23 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
24 let join = plan.as_stream_hash_join()?;
25 let left_input = join.left();
26 let right_input = join.right();
27
28 let new_left = if left_input.as_stream_hash_join().is_some() {
29 StreamExchange::new_no_shuffle(left_input).into()
30 } else {
31 left_input
32 };
33
34 let new_right = if right_input.as_stream_hash_join().is_some() {
35 StreamExchange::new_no_shuffle(right_input).into()
36 } else {
37 right_input
38 };
39
40 let core = Join::with_full_output_eq_predicate(
41 new_left,
42 new_right,
43 join.join_type(),
44 join.eq_join_predicate().clone(),
45 );
46 Some(StreamHashJoin::new(core).unwrap().into())
47 }
48}
49
50impl SeparateConsecutiveJoinRule {
51 pub fn create() -> BoxedRule {
52 Box::new(SeparateConsecutiveJoinRule {})
53 }
54}