risingwave_frontend/optimizer/rule/
index_delta_join_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17use risingwave_pb::stream_plan::StreamScanType;
18
19use crate::optimizer::plan_node::{Stream, StreamPlanRef as PlanRef, *};
20use crate::optimizer::rule::{BoxedRule, Rule};
21
22/// Use index scan and delta joins for supported queries.
23pub struct IndexDeltaJoinRule {}
24
25impl Rule<Stream> for IndexDeltaJoinRule {
26    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
27        let join = plan.as_stream_hash_join()?;
28        if join.eq_join_predicate().has_non_eq() || join.join_type() != JoinType::Inner {
29            return Some(plan);
30        }
31
32        /// FIXME: Exchanges still may exist after table scan, because table scan's distribution
33        /// follows upstream materialize node(e.g. subset of pk), whereas join distributes by join
34        /// key.
35        fn match_through_exchange(plan: PlanRef) -> Option<PlanRef> {
36            if let Some(exchange) = plan.as_stream_exchange() {
37                match_through_exchange(exchange.input())
38            } else if plan.as_stream_table_scan().is_some() {
39                Some(plan)
40            } else {
41                None
42            }
43        }
44
45        let input_left_dyn = match_through_exchange(join.inputs()[0].clone())?;
46        let input_left = input_left_dyn.as_stream_table_scan()?;
47        let input_right_dyn = match_through_exchange(join.inputs()[1].clone())?;
48        let input_right = input_right_dyn.as_stream_table_scan()?;
49        let left_indices = join.eq_join_predicate().left_eq_indexes();
50        let right_indices = join.eq_join_predicate().right_eq_indexes();
51
52        fn match_indexes(
53            join_indices: &[usize],
54            table_scan: &StreamTableScan,
55            stream_scan_type: StreamScanType,
56        ) -> Option<PlanRef> {
57            if table_scan.core().cross_database()
58                && stream_scan_type == StreamScanType::UpstreamOnly
59            {
60                // We currently do not support cross database index scan in upstream only mode.
61                return None;
62            }
63
64            for index in &table_scan.core().table_indexes {
65                // Only full covering index can be used in delta join
66                if !index.full_covering() {
67                    continue;
68                }
69
70                let p2s_mapping = index.primary_to_secondary_mapping();
71
72                // 1. Check if distribution keys are the same.
73                // We don't assume the hash function we are using satisfies commutativity
74                // `Hash(A, B) == Hash(B, A)`, so we consider order of each item in distribution
75                // keys here.
76                let join_indices_ref_to_index_table = join_indices
77                    .iter()
78                    .map(|&i| table_scan.core().output_col_idx[i])
79                    .map(|x| *p2s_mapping.get(&x).unwrap())
80                    .collect_vec();
81
82                if index.index_table.distribution_key != join_indices_ref_to_index_table {
83                    continue;
84                }
85
86                // 2. Check join key is prefix of index order key
87                let index_order_key_prefix = index
88                    .index_table
89                    .pk
90                    .iter()
91                    .map(|x| x.column_index)
92                    .take(index.index_table.distribution_key.len())
93                    .collect_vec();
94
95                if index_order_key_prefix != join_indices_ref_to_index_table {
96                    continue;
97                }
98
99                return Some(
100                    table_scan
101                        .to_index_scan(
102                            index.index_table.clone(),
103                            p2s_mapping,
104                            index.function_mapping(),
105                            stream_scan_type,
106                        )
107                        .into(),
108                );
109            }
110
111            // Primary table is also an index.
112            let primary_table = table_scan.core();
113            if let Some(primary_table_distribution_key) = primary_table.distribution_key()
114                && primary_table_distribution_key == join_indices
115            {
116                // Check join key is prefix of primary table order key
117                let primary_table_order_key_prefix = primary_table
118                    .table_catalog
119                    .pk
120                    .iter()
121                    .map(|x| x.column_index)
122                    .take(primary_table_distribution_key.len())
123                    .collect_vec();
124
125                if primary_table_order_key_prefix != join_indices {
126                    return None;
127                }
128
129                if stream_scan_type != table_scan.stream_scan_type() {
130                    Some(
131                        StreamTableScan::new_with_stream_scan_type(
132                            table_scan.core().clone(),
133                            stream_scan_type,
134                        )
135                        .into(),
136                    )
137                } else {
138                    Some(table_scan.clone().into())
139                }
140            } else {
141                None
142            }
143        }
144
145        // Delta join only needs to backfill one stream flow and others should be upstream only
146        // chain. Here we choose the left one to backfill and right one to upstream only
147        // chain.
148        if let Some(left) = match_indexes(&left_indices, input_left, StreamScanType::Backfill) {
149            if let Some(right) =
150                match_indexes(&right_indices, input_right, StreamScanType::UpstreamOnly)
151            {
152                // We already ensured that index and join use the same distribution, so we directly
153                // replace the children with stream index scan without inserting any exchanges.
154                Some(
155                    join.clone()
156                        .into_delta_join()
157                        .clone_with_left_right(left, right)
158                        .into(),
159                )
160            } else {
161                Some(plan)
162            }
163        } else {
164            Some(plan)
165        }
166    }
167}
168
169impl IndexDeltaJoinRule {
170    pub fn create() -> BoxedRule<Stream> {
171        Box::new(Self {})
172    }
173}