Skip to main content

risingwave_frontend/optimizer/rule/
index_delta_join_rule.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17
18use crate::optimizer::plan_node::{Stream, StreamPlanRef as PlanRef, *};
19use crate::optimizer::rule::{BoxedRule, Rule};
20
21/// Use index scan and delta joins for supported queries.
22pub struct IndexDeltaJoinRule {}
23
24impl Rule<Stream> for IndexDeltaJoinRule {
25    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
26        let join = plan.as_stream_hash_join()?;
27        if join.eq_join_predicate().has_non_eq() || join.join_type() != JoinType::Inner {
28            return Some(plan);
29        }
30
31        /// FIXME: Exchanges still may exist after table scan, because table scan's distribution
32        /// follows upstream materialize node(e.g. subset of pk), whereas join distributes by join
33        /// key.
34        fn match_through_exchange(plan: PlanRef) -> Option<PlanRef> {
35            if let Some(exchange) = plan.as_stream_exchange() {
36                match_through_exchange(exchange.input())
37            } else if plan.as_stream_table_scan().is_some() {
38                Some(plan)
39            } else {
40                None
41            }
42        }
43
44        let input_left_dyn = match_through_exchange(join.inputs()[0].clone())?;
45        let input_left = input_left_dyn.as_stream_table_scan()?;
46        let input_right_dyn = match_through_exchange(join.inputs()[1].clone())?;
47        let input_right = input_right_dyn.as_stream_table_scan()?;
48        let left_indices = join.eq_join_predicate().left_eq_indexes();
49        let right_indices = join.eq_join_predicate().right_eq_indexes();
50        let left_backfill_type = input_left.backfill_type();
51        if left_backfill_type != BackfillType::ArrangementBackfill {
52            plan.ctx().session_ctx().notice_to_user(format!(
53                "Delta join requires ArrangementBackfill on the backfilled side, but got {:?}. Fall back to regular join.",
54                left_backfill_type
55            ));
56            return None;
57        }
58
59        fn match_indexes(
60            join_indices: &[usize],
61            table_scan: &StreamTableScan,
62            backfill_type: BackfillType,
63        ) -> Option<PlanRef> {
64            if table_scan.core().cross_database()
65                && matches!(backfill_type, BackfillType::Replicated)
66            {
67                // We currently do not support cross database index scan in upstream only mode.
68                return None;
69            }
70
71            for index in &table_scan.core().table_indexes {
72                // Only full covering index can be used in delta join
73                if !index.full_covering() {
74                    continue;
75                }
76
77                let p2s_mapping = index.primary_to_secondary_mapping();
78
79                // 1. Check if distribution keys are the same.
80                // We don't assume the hash function we are using satisfies commutativity
81                // `Hash(A, B) == Hash(B, A)`, so we consider order of each item in distribution
82                // keys here.
83                let join_indices_ref_to_index_table = join_indices
84                    .iter()
85                    .map(|&i| table_scan.core().output_col_idx[i])
86                    .map(|x| *p2s_mapping.get(&x).unwrap())
87                    .collect_vec();
88
89                if index.index_table.distribution_key != join_indices_ref_to_index_table {
90                    continue;
91                }
92
93                // 2. Check join key is prefix of index order key
94                let index_order_key_prefix = index
95                    .index_table
96                    .pk
97                    .iter()
98                    .map(|x| x.column_index)
99                    .take(index.index_table.distribution_key.len())
100                    .collect_vec();
101
102                if index_order_key_prefix != join_indices_ref_to_index_table {
103                    continue;
104                }
105
106                return Some(
107                    table_scan
108                        .to_index_scan(
109                            index.index_table.clone(),
110                            p2s_mapping,
111                            index.function_mapping(),
112                            backfill_type,
113                        )
114                        .into(),
115                );
116            }
117
118            // Primary table is also an index.
119            let primary_table = table_scan.core();
120            if let Some(primary_table_distribution_key) = primary_table.distribution_key()
121                && primary_table_distribution_key == join_indices
122            {
123                // Check join key is prefix of primary table order key
124                let primary_table_order_key_prefix = primary_table
125                    .table_catalog
126                    .pk
127                    .iter()
128                    .map(|x| x.column_index)
129                    .take(primary_table_distribution_key.len())
130                    .collect_vec();
131
132                if primary_table_order_key_prefix != join_indices {
133                    return None;
134                }
135
136                if backfill_type != table_scan.backfill_type() {
137                    Some(
138                        StreamTableScan::new_with_backfill_type(
139                            table_scan.core().clone(),
140                            backfill_type,
141                        )
142                        .into(),
143                    )
144                } else {
145                    Some(table_scan.clone().into())
146                }
147            } else {
148                None
149            }
150        }
151
152        // Delta join only needs to backfill one stream flow and others should be upstream only
153        // chain. Here we choose the left one to backfill and right one to upstream only
154        // chain.
155        if let Some(left) = match_indexes(&left_indices, input_left, left_backfill_type) {
156            if let Some(right) =
157                match_indexes(&right_indices, input_right, BackfillType::Replicated)
158            {
159                // We already ensured that index and join use the same distribution, so we directly
160                // replace the children with stream index scan without inserting any exchanges.
161                Some(
162                    join.clone()
163                        .into_delta_join()
164                        .clone_with_left_right(left, right)
165                        .into(),
166                )
167            } else {
168                Some(plan)
169            }
170        } else {
171            Some(plan)
172        }
173    }
174}
175
176impl IndexDeltaJoinRule {
177    pub fn create() -> BoxedRule<Stream> {
178        Box::new(Self {})
179    }
180}