risingwave_frontend/optimizer/rule/
index_delta_join_rule.rs1use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17use risingwave_pb::stream_plan::StreamScanType;
18
19use crate::optimizer::plan_node::{Stream, StreamPlanRef as PlanRef, *};
20use crate::optimizer::rule::{BoxedRule, Rule};
21
22pub struct IndexDeltaJoinRule {}
24
25impl Rule<Stream> for IndexDeltaJoinRule {
26 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
27 let join = plan.as_stream_hash_join()?;
28 if join.eq_join_predicate().has_non_eq() || join.join_type() != JoinType::Inner {
29 return Some(plan);
30 }
31
32 fn match_through_exchange(plan: PlanRef) -> Option<PlanRef> {
36 if let Some(exchange) = plan.as_stream_exchange() {
37 match_through_exchange(exchange.input())
38 } else if plan.as_stream_table_scan().is_some() {
39 Some(plan)
40 } else {
41 None
42 }
43 }
44
45 let input_left_dyn = match_through_exchange(join.inputs()[0].clone())?;
46 let input_left = input_left_dyn.as_stream_table_scan()?;
47 let input_right_dyn = match_through_exchange(join.inputs()[1].clone())?;
48 let input_right = input_right_dyn.as_stream_table_scan()?;
49 let left_indices = join.eq_join_predicate().left_eq_indexes();
50 let right_indices = join.eq_join_predicate().right_eq_indexes();
51
52 fn match_indexes(
53 join_indices: &[usize],
54 table_scan: &StreamTableScan,
55 stream_scan_type: StreamScanType,
56 ) -> Option<PlanRef> {
57 for index in &table_scan.core().table_indexes {
58 if !index.full_covering() {
60 continue;
61 }
62
63 let p2s_mapping = index.primary_to_secondary_mapping();
64
65 let join_indices_ref_to_index_table = join_indices
70 .iter()
71 .map(|&i| table_scan.core().output_col_idx[i])
72 .map(|x| *p2s_mapping.get(&x).unwrap())
73 .collect_vec();
74
75 if index.index_table.distribution_key != join_indices_ref_to_index_table {
76 continue;
77 }
78
79 let index_order_key_prefix = index
81 .index_table
82 .pk
83 .iter()
84 .map(|x| x.column_index)
85 .take(index.index_table.distribution_key.len())
86 .collect_vec();
87
88 if index_order_key_prefix != join_indices_ref_to_index_table {
89 continue;
90 }
91
92 return Some(
93 table_scan
94 .to_index_scan(
95 index.index_table.clone(),
96 p2s_mapping,
97 index.function_mapping(),
98 stream_scan_type,
99 )
100 .into(),
101 );
102 }
103
104 let primary_table = table_scan.core();
106 if let Some(primary_table_distribution_key) = primary_table.distribution_key()
107 && primary_table_distribution_key == join_indices
108 {
109 let primary_table_order_key_prefix = primary_table
111 .table_catalog
112 .pk
113 .iter()
114 .map(|x| x.column_index)
115 .take(primary_table_distribution_key.len())
116 .collect_vec();
117
118 if primary_table_order_key_prefix != join_indices {
119 return None;
120 }
121
122 if stream_scan_type != table_scan.stream_scan_type() {
123 Some(
124 StreamTableScan::new_with_stream_scan_type(
125 table_scan.core().clone(),
126 stream_scan_type,
127 )
128 .into(),
129 )
130 } else {
131 Some(table_scan.clone().into())
132 }
133 } else {
134 None
135 }
136 }
137
138 if let Some(left) = match_indexes(&left_indices, input_left, StreamScanType::Backfill) {
142 if let Some(right) =
143 match_indexes(&right_indices, input_right, StreamScanType::UpstreamOnly)
144 {
145 Some(
148 join.clone()
149 .into_delta_join()
150 .clone_with_left_right(left, right)
151 .into(),
152 )
153 } else {
154 Some(plan)
155 }
156 } else {
157 Some(plan)
158 }
159 }
160}
161
162impl IndexDeltaJoinRule {
163 pub fn create() -> BoxedRule<Stream> {
164 Box::new(Self {})
165 }
166}