risingwave_frontend/optimizer/rule/
index_delta_join_rule.rs1use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17
18use crate::optimizer::plan_node::{Stream, StreamPlanRef as PlanRef, *};
19use crate::optimizer::rule::{BoxedRule, Rule};
20
21pub struct IndexDeltaJoinRule {}
23
24impl Rule<Stream> for IndexDeltaJoinRule {
25 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
26 let join = plan.as_stream_hash_join()?;
27 if join.eq_join_predicate().has_non_eq() || join.join_type() != JoinType::Inner {
28 return Some(plan);
29 }
30
31 fn match_through_exchange(plan: PlanRef) -> Option<PlanRef> {
35 if let Some(exchange) = plan.as_stream_exchange() {
36 match_through_exchange(exchange.input())
37 } else if plan.as_stream_table_scan().is_some() {
38 Some(plan)
39 } else {
40 None
41 }
42 }
43
44 let input_left_dyn = match_through_exchange(join.inputs()[0].clone())?;
45 let input_left = input_left_dyn.as_stream_table_scan()?;
46 let input_right_dyn = match_through_exchange(join.inputs()[1].clone())?;
47 let input_right = input_right_dyn.as_stream_table_scan()?;
48 let left_indices = join.eq_join_predicate().left_eq_indexes();
49 let right_indices = join.eq_join_predicate().right_eq_indexes();
50 let left_backfill_type = input_left.backfill_type();
51 if left_backfill_type != BackfillType::ArrangementBackfill {
52 plan.ctx().session_ctx().notice_to_user(format!(
53 "Delta join requires ArrangementBackfill on the backfilled side, but got {:?}. Fall back to regular join.",
54 left_backfill_type
55 ));
56 return None;
57 }
58
59 fn match_indexes(
60 join_indices: &[usize],
61 table_scan: &StreamTableScan,
62 backfill_type: BackfillType,
63 ) -> Option<PlanRef> {
64 if table_scan.core().cross_database()
65 && matches!(backfill_type, BackfillType::Replicated)
66 {
67 return None;
69 }
70
71 for index in &table_scan.core().table_indexes {
72 if !index.full_covering() {
74 continue;
75 }
76
77 let p2s_mapping = index.primary_to_secondary_mapping();
78
79 let join_indices_ref_to_index_table = join_indices
84 .iter()
85 .map(|&i| table_scan.core().output_col_idx[i])
86 .map(|x| *p2s_mapping.get(&x).unwrap())
87 .collect_vec();
88
89 if index.index_table.distribution_key != join_indices_ref_to_index_table {
90 continue;
91 }
92
93 let index_order_key_prefix = index
95 .index_table
96 .pk
97 .iter()
98 .map(|x| x.column_index)
99 .take(index.index_table.distribution_key.len())
100 .collect_vec();
101
102 if index_order_key_prefix != join_indices_ref_to_index_table {
103 continue;
104 }
105
106 return Some(
107 table_scan
108 .to_index_scan(
109 index.index_table.clone(),
110 p2s_mapping,
111 index.function_mapping(),
112 backfill_type,
113 )
114 .into(),
115 );
116 }
117
118 let primary_table = table_scan.core();
120 if let Some(primary_table_distribution_key) = primary_table.distribution_key()
121 && primary_table_distribution_key == join_indices
122 {
123 let primary_table_order_key_prefix = primary_table
125 .table_catalog
126 .pk
127 .iter()
128 .map(|x| x.column_index)
129 .take(primary_table_distribution_key.len())
130 .collect_vec();
131
132 if primary_table_order_key_prefix != join_indices {
133 return None;
134 }
135
136 if backfill_type != table_scan.backfill_type() {
137 Some(
138 StreamTableScan::new_with_backfill_type(
139 table_scan.core().clone(),
140 backfill_type,
141 )
142 .into(),
143 )
144 } else {
145 Some(table_scan.clone().into())
146 }
147 } else {
148 None
149 }
150 }
151
152 if let Some(left) = match_indexes(&left_indices, input_left, left_backfill_type) {
156 if let Some(right) =
157 match_indexes(&right_indices, input_right, BackfillType::Replicated)
158 {
159 Some(
162 join.clone()
163 .into_delta_join()
164 .clone_with_left_right(left, right)
165 .into(),
166 )
167 } else {
168 Some(plan)
169 }
170 } else {
171 Some(plan)
172 }
173 }
174}
175
176impl IndexDeltaJoinRule {
177 pub fn create() -> BoxedRule<Stream> {
178 Box::new(Self {})
179 }
180}