risingwave_frontend/optimizer/rule/
index_delta_join_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17use risingwave_pb::stream_plan::StreamScanType;
18
19use super::super::plan_node::*;
20use super::{BoxedRule, Rule};
21
22/// Use index scan and delta joins for supported queries.
23pub struct IndexDeltaJoinRule {}
24
25impl Rule for IndexDeltaJoinRule {
26    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
27        let join = plan.as_stream_hash_join()?;
28        if join.eq_join_predicate().has_non_eq() || join.join_type() != JoinType::Inner {
29            return Some(plan);
30        }
31
32        /// FIXME: Exchanges still may exist after table scan, because table scan's distribution
33        /// follows upstream materialize node(e.g. subset of pk), whereas join distributes by join
34        /// key.
35        fn match_through_exchange(plan: PlanRef) -> Option<PlanRef> {
36            if let Some(exchange) = plan.as_stream_exchange() {
37                match_through_exchange(exchange.input())
38            } else if plan.as_stream_table_scan().is_some() {
39                Some(plan)
40            } else {
41                None
42            }
43        }
44
45        let input_left_dyn = match_through_exchange(join.inputs()[0].clone())?;
46        let input_left = input_left_dyn.as_stream_table_scan()?;
47        let input_right_dyn = match_through_exchange(join.inputs()[1].clone())?;
48        let input_right = input_right_dyn.as_stream_table_scan()?;
49        let left_indices = join.eq_join_predicate().left_eq_indexes();
50        let right_indices = join.eq_join_predicate().right_eq_indexes();
51
52        fn match_indexes(
53            join_indices: &[usize],
54            table_scan: &StreamTableScan,
55            stream_scan_type: StreamScanType,
56        ) -> Option<PlanRef> {
57            for index in &table_scan.core().indexes {
58                // Only full covering index can be used in delta join
59                if !index.full_covering() {
60                    continue;
61                }
62
63                let p2s_mapping = index.primary_to_secondary_mapping();
64
65                // 1. Check if distribution keys are the same.
66                // We don't assume the hash function we are using satisfies commutativity
67                // `Hash(A, B) == Hash(B, A)`, so we consider order of each item in distribution
68                // keys here.
69                let join_indices_ref_to_index_table = join_indices
70                    .iter()
71                    .map(|&i| table_scan.core().output_col_idx[i])
72                    .map(|x| *p2s_mapping.get(&x).unwrap())
73                    .collect_vec();
74
75                if index.index_table.distribution_key != join_indices_ref_to_index_table {
76                    continue;
77                }
78
79                // 2. Check join key is prefix of index order key
80                let index_order_key_prefix = index
81                    .index_table
82                    .pk
83                    .iter()
84                    .map(|x| x.column_index)
85                    .take(index.index_table.distribution_key.len())
86                    .collect_vec();
87
88                if index_order_key_prefix != join_indices_ref_to_index_table {
89                    continue;
90                }
91
92                return Some(
93                    table_scan
94                        .to_index_scan(
95                            index.index_table.name.as_str(),
96                            index.index_table.clone(),
97                            p2s_mapping,
98                            index.function_mapping(),
99                            stream_scan_type,
100                        )
101                        .into(),
102                );
103            }
104
105            // Primary table is also an index.
106            let primary_table = table_scan.core();
107            if let Some(primary_table_distribution_key) = primary_table.distribution_key()
108                && primary_table_distribution_key == join_indices
109            {
110                // Check join key is prefix of primary table order key
111                let primary_table_order_key_prefix = primary_table
112                    .table_desc
113                    .pk
114                    .iter()
115                    .map(|x| x.column_index)
116                    .take(primary_table_distribution_key.len())
117                    .collect_vec();
118
119                if primary_table_order_key_prefix != join_indices {
120                    return None;
121                }
122
123                if stream_scan_type != table_scan.stream_scan_type() {
124                    Some(
125                        StreamTableScan::new_with_stream_scan_type(
126                            table_scan.core().clone(),
127                            stream_scan_type,
128                        )
129                        .into(),
130                    )
131                } else {
132                    Some(table_scan.clone().into())
133                }
134            } else {
135                None
136            }
137        }
138
139        // Delta join only needs to backfill one stream flow and others should be upstream only
140        // chain. Here we choose the left one to backfill and right one to upstream only
141        // chain.
142        if let Some(left) = match_indexes(&left_indices, input_left, StreamScanType::Backfill) {
143            if let Some(right) =
144                match_indexes(&right_indices, input_right, StreamScanType::UpstreamOnly)
145            {
146                // We already ensured that index and join use the same distribution, so we directly
147                // replace the children with stream index scan without inserting any exchanges.
148                Some(
149                    join.clone()
150                        .into_delta_join()
151                        .clone_with_left_right(left, right)
152                        .into(),
153                )
154            } else {
155                Some(plan)
156            }
157        } else {
158            Some(plan)
159        }
160    }
161}
162
163impl IndexDeltaJoinRule {
164    pub fn create() -> BoxedRule {
165        Box::new(Self {})
166    }
167}