risingwave_frontend/optimizer/rule/
index_delta_join_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17use risingwave_pb::stream_plan::StreamScanType;
18
19use crate::optimizer::plan_node::{Stream, StreamPlanRef as PlanRef, *};
20use crate::optimizer::rule::{BoxedRule, Rule};
21
22/// Use index scan and delta joins for supported queries.
23pub struct IndexDeltaJoinRule {}
24
25impl Rule<Stream> for IndexDeltaJoinRule {
26    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
27        let join = plan.as_stream_hash_join()?;
28        if join.eq_join_predicate().has_non_eq() || join.join_type() != JoinType::Inner {
29            return Some(plan);
30        }
31
32        /// FIXME: Exchanges still may exist after table scan, because table scan's distribution
33        /// follows upstream materialize node(e.g. subset of pk), whereas join distributes by join
34        /// key.
35        fn match_through_exchange(plan: PlanRef) -> Option<PlanRef> {
36            if let Some(exchange) = plan.as_stream_exchange() {
37                match_through_exchange(exchange.input())
38            } else if plan.as_stream_table_scan().is_some() {
39                Some(plan)
40            } else {
41                None
42            }
43        }
44
45        let input_left_dyn = match_through_exchange(join.inputs()[0].clone())?;
46        let input_left = input_left_dyn.as_stream_table_scan()?;
47        let input_right_dyn = match_through_exchange(join.inputs()[1].clone())?;
48        let input_right = input_right_dyn.as_stream_table_scan()?;
49        let left_indices = join.eq_join_predicate().left_eq_indexes();
50        let right_indices = join.eq_join_predicate().right_eq_indexes();
51
52        fn match_indexes(
53            join_indices: &[usize],
54            table_scan: &StreamTableScan,
55            stream_scan_type: StreamScanType,
56        ) -> Option<PlanRef> {
57            for index in &table_scan.core().table_indexes {
58                // Only full covering index can be used in delta join
59                if !index.full_covering() {
60                    continue;
61                }
62
63                let p2s_mapping = index.primary_to_secondary_mapping();
64
65                // 1. Check if distribution keys are the same.
66                // We don't assume the hash function we are using satisfies commutativity
67                // `Hash(A, B) == Hash(B, A)`, so we consider order of each item in distribution
68                // keys here.
69                let join_indices_ref_to_index_table = join_indices
70                    .iter()
71                    .map(|&i| table_scan.core().output_col_idx[i])
72                    .map(|x| *p2s_mapping.get(&x).unwrap())
73                    .collect_vec();
74
75                if index.index_table.distribution_key != join_indices_ref_to_index_table {
76                    continue;
77                }
78
79                // 2. Check join key is prefix of index order key
80                let index_order_key_prefix = index
81                    .index_table
82                    .pk
83                    .iter()
84                    .map(|x| x.column_index)
85                    .take(index.index_table.distribution_key.len())
86                    .collect_vec();
87
88                if index_order_key_prefix != join_indices_ref_to_index_table {
89                    continue;
90                }
91
92                return Some(
93                    table_scan
94                        .to_index_scan(
95                            index.index_table.clone(),
96                            p2s_mapping,
97                            index.function_mapping(),
98                            stream_scan_type,
99                        )
100                        .into(),
101                );
102            }
103
104            // Primary table is also an index.
105            let primary_table = table_scan.core();
106            if let Some(primary_table_distribution_key) = primary_table.distribution_key()
107                && primary_table_distribution_key == join_indices
108            {
109                // Check join key is prefix of primary table order key
110                let primary_table_order_key_prefix = primary_table
111                    .table_catalog
112                    .pk
113                    .iter()
114                    .map(|x| x.column_index)
115                    .take(primary_table_distribution_key.len())
116                    .collect_vec();
117
118                if primary_table_order_key_prefix != join_indices {
119                    return None;
120                }
121
122                if stream_scan_type != table_scan.stream_scan_type() {
123                    Some(
124                        StreamTableScan::new_with_stream_scan_type(
125                            table_scan.core().clone(),
126                            stream_scan_type,
127                        )
128                        .into(),
129                    )
130                } else {
131                    Some(table_scan.clone().into())
132                }
133            } else {
134                None
135            }
136        }
137
138        // Delta join only needs to backfill one stream flow and others should be upstream only
139        // chain. Here we choose the left one to backfill and right one to upstream only
140        // chain.
141        if let Some(left) = match_indexes(&left_indices, input_left, StreamScanType::Backfill) {
142            if let Some(right) =
143                match_indexes(&right_indices, input_right, StreamScanType::UpstreamOnly)
144            {
145                // We already ensured that index and join use the same distribution, so we directly
146                // replace the children with stream index scan without inserting any exchanges.
147                Some(
148                    join.clone()
149                        .into_delta_join()
150                        .clone_with_left_right(left, right)
151                        .into(),
152                )
153            } else {
154                Some(plan)
155            }
156        } else {
157            Some(plan)
158        }
159    }
160}
161
162impl IndexDeltaJoinRule {
163    pub fn create() -> BoxedRule<Stream> {
164        Box::new(Self {})
165    }
166}