risingwave_frontend/optimizer/rule/
index_delta_join_rule.rs1use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17use risingwave_pb::stream_plan::StreamScanType;
18
19use crate::optimizer::plan_node::{Stream, StreamPlanRef as PlanRef, *};
20use crate::optimizer::rule::{BoxedRule, Rule};
21
22pub struct IndexDeltaJoinRule {}
24
25impl Rule<Stream> for IndexDeltaJoinRule {
26 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
27 let join = plan.as_stream_hash_join()?;
28 if join.eq_join_predicate().has_non_eq() || join.join_type() != JoinType::Inner {
29 return Some(plan);
30 }
31
32 fn match_through_exchange(plan: PlanRef) -> Option<PlanRef> {
36 if let Some(exchange) = plan.as_stream_exchange() {
37 match_through_exchange(exchange.input())
38 } else if plan.as_stream_table_scan().is_some() {
39 Some(plan)
40 } else {
41 None
42 }
43 }
44
45 let input_left_dyn = match_through_exchange(join.inputs()[0].clone())?;
46 let input_left = input_left_dyn.as_stream_table_scan()?;
47 let input_right_dyn = match_through_exchange(join.inputs()[1].clone())?;
48 let input_right = input_right_dyn.as_stream_table_scan()?;
49 let left_indices = join.eq_join_predicate().left_eq_indexes();
50 let right_indices = join.eq_join_predicate().right_eq_indexes();
51
52 fn match_indexes(
53 join_indices: &[usize],
54 table_scan: &StreamTableScan,
55 stream_scan_type: StreamScanType,
56 ) -> Option<PlanRef> {
57 if table_scan.core().cross_database()
58 && stream_scan_type == StreamScanType::UpstreamOnly
59 {
60 return None;
62 }
63
64 for index in &table_scan.core().table_indexes {
65 if !index.full_covering() {
67 continue;
68 }
69
70 let p2s_mapping = index.primary_to_secondary_mapping();
71
72 let join_indices_ref_to_index_table = join_indices
77 .iter()
78 .map(|&i| table_scan.core().output_col_idx[i])
79 .map(|x| *p2s_mapping.get(&x).unwrap())
80 .collect_vec();
81
82 if index.index_table.distribution_key != join_indices_ref_to_index_table {
83 continue;
84 }
85
86 let index_order_key_prefix = index
88 .index_table
89 .pk
90 .iter()
91 .map(|x| x.column_index)
92 .take(index.index_table.distribution_key.len())
93 .collect_vec();
94
95 if index_order_key_prefix != join_indices_ref_to_index_table {
96 continue;
97 }
98
99 return Some(
100 table_scan
101 .to_index_scan(
102 index.index_table.clone(),
103 p2s_mapping,
104 index.function_mapping(),
105 stream_scan_type,
106 )
107 .into(),
108 );
109 }
110
111 let primary_table = table_scan.core();
113 if let Some(primary_table_distribution_key) = primary_table.distribution_key()
114 && primary_table_distribution_key == join_indices
115 {
116 let primary_table_order_key_prefix = primary_table
118 .table_catalog
119 .pk
120 .iter()
121 .map(|x| x.column_index)
122 .take(primary_table_distribution_key.len())
123 .collect_vec();
124
125 if primary_table_order_key_prefix != join_indices {
126 return None;
127 }
128
129 if stream_scan_type != table_scan.stream_scan_type() {
130 Some(
131 StreamTableScan::new_with_stream_scan_type(
132 table_scan.core().clone(),
133 stream_scan_type,
134 )
135 .into(),
136 )
137 } else {
138 Some(table_scan.clone().into())
139 }
140 } else {
141 None
142 }
143 }
144
145 if let Some(left) = match_indexes(&left_indices, input_left, StreamScanType::Backfill) {
149 if let Some(right) =
150 match_indexes(&right_indices, input_right, StreamScanType::UpstreamOnly)
151 {
152 Some(
155 join.clone()
156 .into_delta_join()
157 .clone_with_left_right(left, right)
158 .into(),
159 )
160 } else {
161 Some(plan)
162 }
163 } else {
164 Some(plan)
165 }
166 }
167}
168
169impl IndexDeltaJoinRule {
170 pub fn create() -> BoxedRule<Stream> {
171 Box::new(Self {})
172 }
173}