risingwave_frontend/optimizer/rule/index_delta_join_rule.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17use risingwave_pb::stream_plan::StreamScanType;
18
19use super::super::plan_node::*;
20use super::{BoxedRule, Rule};
21
22/// Use index scan and delta joins for supported queries.
23pub struct IndexDeltaJoinRule {}
24
25impl Rule for IndexDeltaJoinRule {
26 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
27 let join = plan.as_stream_hash_join()?;
28 if join.eq_join_predicate().has_non_eq() || join.join_type() != JoinType::Inner {
29 return Some(plan);
30 }
31
32 /// FIXME: Exchanges still may exist after table scan, because table scan's distribution
33 /// follows upstream materialize node(e.g. subset of pk), whereas join distributes by join
34 /// key.
35 fn match_through_exchange(plan: PlanRef) -> Option<PlanRef> {
36 if let Some(exchange) = plan.as_stream_exchange() {
37 match_through_exchange(exchange.input())
38 } else if plan.as_stream_table_scan().is_some() {
39 Some(plan)
40 } else {
41 None
42 }
43 }
44
45 let input_left_dyn = match_through_exchange(join.inputs()[0].clone())?;
46 let input_left = input_left_dyn.as_stream_table_scan()?;
47 let input_right_dyn = match_through_exchange(join.inputs()[1].clone())?;
48 let input_right = input_right_dyn.as_stream_table_scan()?;
49 let left_indices = join.eq_join_predicate().left_eq_indexes();
50 let right_indices = join.eq_join_predicate().right_eq_indexes();
51
52 fn match_indexes(
53 join_indices: &[usize],
54 table_scan: &StreamTableScan,
55 stream_scan_type: StreamScanType,
56 ) -> Option<PlanRef> {
57 for index in &table_scan.core().indexes {
58 // Only full covering index can be used in delta join
59 if !index.full_covering() {
60 continue;
61 }
62
63 let p2s_mapping = index.primary_to_secondary_mapping();
64
65 // 1. Check if distribution keys are the same.
66 // We don't assume the hash function we are using satisfies commutativity
67 // `Hash(A, B) == Hash(B, A)`, so we consider order of each item in distribution
68 // keys here.
69 let join_indices_ref_to_index_table = join_indices
70 .iter()
71 .map(|&i| table_scan.core().output_col_idx[i])
72 .map(|x| *p2s_mapping.get(&x).unwrap())
73 .collect_vec();
74
75 if index.index_table.distribution_key != join_indices_ref_to_index_table {
76 continue;
77 }
78
79 // 2. Check join key is prefix of index order key
80 let index_order_key_prefix = index
81 .index_table
82 .pk
83 .iter()
84 .map(|x| x.column_index)
85 .take(index.index_table.distribution_key.len())
86 .collect_vec();
87
88 if index_order_key_prefix != join_indices_ref_to_index_table {
89 continue;
90 }
91
92 return Some(
93 table_scan
94 .to_index_scan(
95 index.index_table.name.as_str(),
96 index.index_table.clone(),
97 p2s_mapping,
98 index.function_mapping(),
99 stream_scan_type,
100 )
101 .into(),
102 );
103 }
104
105 // Primary table is also an index.
106 let primary_table = table_scan.core();
107 if let Some(primary_table_distribution_key) = primary_table.distribution_key()
108 && primary_table_distribution_key == join_indices
109 {
110 // Check join key is prefix of primary table order key
111 let primary_table_order_key_prefix = primary_table
112 .table_desc
113 .pk
114 .iter()
115 .map(|x| x.column_index)
116 .take(primary_table_distribution_key.len())
117 .collect_vec();
118
119 if primary_table_order_key_prefix != join_indices {
120 return None;
121 }
122
123 if stream_scan_type != table_scan.stream_scan_type() {
124 Some(
125 StreamTableScan::new_with_stream_scan_type(
126 table_scan.core().clone(),
127 stream_scan_type,
128 )
129 .into(),
130 )
131 } else {
132 Some(table_scan.clone().into())
133 }
134 } else {
135 None
136 }
137 }
138
139 // Delta join only needs to backfill one stream flow and others should be upstream only
140 // chain. Here we choose the left one to backfill and right one to upstream only
141 // chain.
142 if let Some(left) = match_indexes(&left_indices, input_left, StreamScanType::Backfill) {
143 if let Some(right) =
144 match_indexes(&right_indices, input_right, StreamScanType::UpstreamOnly)
145 {
146 // We already ensured that index and join use the same distribution, so we directly
147 // replace the children with stream index scan without inserting any exchanges.
148 Some(
149 join.clone()
150 .into_delta_join()
151 .clone_with_left_right(left, right)
152 .into(),
153 )
154 } else {
155 Some(plan)
156 }
157 } else {
158 Some(plan)
159 }
160 }
161}
162
163impl IndexDeltaJoinRule {
164 pub fn create() -> BoxedRule {
165 Box::new(Self {})
166 }
167}