risingwave_frontend/optimizer/rule/stream/
separate_consecutive_join_rule.rs1use super::prelude::*;
16use crate::optimizer::plan_node::generic::Join;
17use crate::optimizer::plan_node::{PlanTreeNodeBinary, StreamExchange, StreamHashJoin};
18
19pub struct SeparateConsecutiveJoinRule {}
21
22impl Rule<Stream> for SeparateConsecutiveJoinRule {
23 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
24 let join = plan.as_stream_hash_join()?;
25 let left_input = join.left();
26 let right_input = join.right();
27
28 let new_left = if left_input.as_stream_hash_join().is_some() {
29 StreamExchange::new_no_shuffle(left_input).into()
30 } else {
31 left_input
32 };
33
34 let new_right = if right_input.as_stream_hash_join().is_some() {
35 StreamExchange::new_no_shuffle(right_input).into()
36 } else {
37 right_input
38 };
39
40 let core = Join::with_full_output(
41 new_left,
42 new_right,
43 join.join_type(),
44 join.eq_join_predicate().all_cond(),
45 );
46 Some(
47 StreamHashJoin::new(core, join.eq_join_predicate().clone())
48 .unwrap()
49 .into(),
50 )
51 }
52}
53
54impl SeparateConsecutiveJoinRule {
55 pub fn create() -> BoxedRule {
56 Box::new(SeparateConsecutiveJoinRule {})
57 }
58}