risingwave_frontend/optimizer/rule/
index_selection_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # Index selection cost matrix
16//!
17//! |`column_idx`| 0   |  1 | 2  | 3  | 4  | remark |
18//! |-----------|-----|----|----|----|----|---|
19//! |Equal      | 1   | 1  | 1  | 1  | 1  | |
20//! |In         | 10  | 8  | 5  | 5  | 5  | take the minimum value with actual in number |
21//! |Range(Two) | 600 | 50 | 20 | 10 | 10 | `RangeTwoSideBound` like a between 1 and 2 |
22//! |Range(One) | 1400| 70 | 25 | 15 | 10 | `RangeOneSideBound` like a > 1, a >= 1, a < 1|
23//! |All        | 4000| 100| 30 | 20 | 10 | |
24//!
25//! ```text
26//! index cost = cost(match type of 0 idx)
27//! * cost(match type of 1 idx)
28//! * ... cost(match type of the last idx)
29//! ```
30//!
31//! ## Example
32//!
33//! Given index order key (a, b, c)
34//!
35//! - For `a = 1 and b = 1 and c = 1`, its cost is 1 = Equal0 * Equal1 * Equal2 = 1
36//! - For `a in (xxx) and b = 1 and c = 1`, its cost is In0 * Equal1 * Equal2 = 10
37//! - For `a = 1 and b in (xxx)`, its cost is Equal0 * In1 * All2 = 1 * 8 * 50 = 400
38//! - For `a between xxx and yyy`, its cost is Range(Two)0 = 600
39//! - For `a = 1 and b between xxx and yyy`, its cost is Equal0 * Range(Two)1 = 50
40//! - For `a = 1 and b > 1`, its cost is Equal0 * Range(One)1 = 70
41//! - For `a = 1`, its cost is 100 = Equal0 * All1 = 100
42//! - For no condition, its cost is All0 = 4000
43//!
44//! With the assumption that the most effective part of a index is its prefix,
45//! cost decreases as `column_idx` increasing.
46//!
47//! For index order key length > 5, we just ignore the rest.
48
49use std::cmp::min;
50use std::collections::hash_map::Entry::{Occupied, Vacant};
51use std::collections::{BTreeMap, HashMap};
52use std::rc::Rc;
53
54use itertools::Itertools;
55use risingwave_common::catalog::Schema;
56use risingwave_common::types::{
57    DataType, Date, Decimal, Int256, Interval, Serial, Time, Timestamp, Timestamptz,
58};
59use risingwave_common::util::iter_util::ZipEqFast;
60use risingwave_pb::plan_common::JoinType;
61use risingwave_sqlparser::ast::AsOf;
62
63use super::{BoxedRule, Rule};
64use crate::catalog::IndexCatalog;
65use crate::expr::{
66    Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef, to_conjunctions,
67    to_disjunctions,
68};
69use crate::optimizer::PlanRef;
70use crate::optimizer::optimizer_context::OptimizerContextRef;
71use crate::optimizer::plan_node::generic::GenericPlanRef;
72use crate::optimizer::plan_node::{
73    ColumnPruningContext, LogicalJoin, LogicalScan, LogicalUnion, PlanTreeNode, PlanTreeNodeBinary,
74    PredicatePushdown, PredicatePushdownContext, generic,
75};
76use crate::utils::Condition;
77
78const INDEX_MAX_LEN: usize = 5;
79const INDEX_COST_MATRIX: [[usize; INDEX_MAX_LEN]; 5] = [
80    [1, 1, 1, 1, 1],
81    [10, 8, 5, 5, 5],
82    [600, 50, 20, 10, 10],
83    [1400, 70, 25, 15, 10],
84    [4000, 100, 30, 20, 20],
85];
86const LOOKUP_COST_CONST: usize = 3;
87const MAX_COMBINATION_SIZE: usize = 3;
88const MAX_CONJUNCTION_SIZE: usize = 8;
89
90pub struct IndexSelectionRule {}
91
92impl Rule for IndexSelectionRule {
93    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
94        let logical_scan: &LogicalScan = plan.as_logical_scan()?;
95        let indexes = logical_scan.indexes();
96        if indexes.is_empty() {
97            return None;
98        }
99        let primary_table_row_size = TableScanIoEstimator::estimate_row_size(logical_scan);
100        let primary_cost = min(
101            self.estimate_table_scan_cost(logical_scan, primary_table_row_size),
102            self.estimate_full_table_scan_cost(logical_scan, primary_table_row_size),
103        );
104
105        // If it is a primary lookup plan, avoid checking other indexes.
106        if primary_cost.primary_lookup {
107            return None;
108        }
109
110        let mut final_plan: PlanRef = logical_scan.clone().into();
111        #[expect(
112            clippy::redundant_clone,
113            reason = "false positive https://github.com/rust-lang/rust-clippy/issues/10545"
114        )]
115        let mut min_cost = primary_cost.clone();
116
117        for index in indexes {
118            if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
119                let index_cost = self.estimate_table_scan_cost(
120                    &index_scan,
121                    TableScanIoEstimator::estimate_row_size(&index_scan),
122                );
123
124                if index_cost.le(&min_cost) {
125                    min_cost = index_cost;
126                    final_plan = index_scan.into();
127                }
128            } else {
129                // non-covering index selection
130                let (index_lookup, lookup_cost) = self.gen_index_lookup(logical_scan, index);
131                if lookup_cost.le(&min_cost) {
132                    min_cost = lookup_cost;
133                    final_plan = index_lookup;
134                }
135            }
136        }
137
138        if let Some((merge_index, merge_index_cost)) = self.index_merge_selection(logical_scan) {
139            if merge_index_cost.le(&min_cost) {
140                min_cost = merge_index_cost;
141                final_plan = merge_index;
142            }
143        }
144
145        if min_cost == primary_cost {
146            None
147        } else {
148            Some(final_plan)
149        }
150    }
151}
152
153struct IndexPredicateRewriter<'a> {
154    p2s_mapping: &'a BTreeMap<usize, usize>,
155    function_mapping: &'a HashMap<FunctionCall, usize>,
156    offset: usize,
157    covered_by_index: bool,
158}
159
160impl<'a> IndexPredicateRewriter<'a> {
161    fn new(
162        p2s_mapping: &'a BTreeMap<usize, usize>,
163        function_mapping: &'a HashMap<FunctionCall, usize>,
164        offset: usize,
165    ) -> Self {
166        Self {
167            p2s_mapping,
168            function_mapping,
169            offset,
170            covered_by_index: true,
171        }
172    }
173
174    fn covered_by_index(&self) -> bool {
175        self.covered_by_index
176    }
177}
178
179impl ExprRewriter for IndexPredicateRewriter<'_> {
180    fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
181        // transform primary predicate to index predicate if it can
182        if self.p2s_mapping.contains_key(&input_ref.index) {
183            InputRef::new(
184                *self.p2s_mapping.get(&input_ref.index()).unwrap(),
185                input_ref.return_type(),
186            )
187            .into()
188        } else {
189            self.covered_by_index = false;
190            InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into()
191        }
192    }
193
194    fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
195        if let Some(index) = self.function_mapping.get(&func_call) {
196            return InputRef::new(*index, func_call.return_type()).into();
197        }
198
199        let (func_type, inputs, ret) = func_call.decompose();
200        let inputs = inputs
201            .into_iter()
202            .map(|expr| self.rewrite_expr(expr))
203            .collect();
204        FunctionCall::new_unchecked(func_type, inputs, ret).into()
205    }
206}
207
208impl IndexSelectionRule {
209    fn gen_index_lookup(
210        &self,
211        logical_scan: &LogicalScan,
212        index: &IndexCatalog,
213    ) -> (PlanRef, IndexCost) {
214        // 1. logical_scan ->  logical_join
215        //                      /        \
216        //                index_scan   primary_table_scan
217        let index_scan = LogicalScan::create(
218            index.index_table.name.clone(),
219            index.index_table.clone(),
220            vec![],
221            logical_scan.ctx(),
222            logical_scan.as_of().clone(),
223            index.index_table.cardinality,
224        );
225        // We use `schema.len` instead of `index_item.len` here,
226        // because schema contains system columns like `_rw_timestamp` column which is not represented in the index item.
227        let offset = index_scan.table_catalog().columns().len();
228
229        let primary_table_scan = LogicalScan::create(
230            index.primary_table.name.clone(),
231            index.primary_table.clone(),
232            vec![],
233            logical_scan.ctx(),
234            logical_scan.as_of().clone(),
235            index.primary_table.cardinality,
236        );
237
238        let predicate = logical_scan.predicate().clone();
239        let mut rewriter = IndexPredicateRewriter::new(
240            index.primary_to_secondary_mapping(),
241            index.function_mapping(),
242            offset,
243        );
244        let new_predicate = predicate.rewrite_expr(&mut rewriter);
245
246        let conjunctions = index
247            .primary_table_pk_ref_to_index_table()
248            .iter()
249            .zip_eq_fast(index.primary_table.pk.iter())
250            .map(|(x, y)| {
251                Self::create_null_safe_equal_expr(
252                    x.column_index,
253                    index.index_table.columns[x.column_index]
254                        .data_type()
255                        .clone(),
256                    y.column_index + offset,
257                    index.primary_table.columns[y.column_index]
258                        .data_type()
259                        .clone(),
260                )
261            })
262            .chain(new_predicate)
263            .collect_vec();
264        let on = Condition { conjunctions };
265        let join: PlanRef = LogicalJoin::new(
266            index_scan.into(),
267            primary_table_scan.into(),
268            JoinType::Inner,
269            on,
270        )
271        .into();
272
273        // 2. push down predicate, so we can calculate the cost of index lookup
274        let join_ref = join.predicate_pushdown(
275            Condition::true_cond(),
276            &mut PredicatePushdownContext::new(join.clone()),
277        );
278
279        let join_with_predicate_push_down =
280            join_ref.as_logical_join().expect("must be a logical join");
281        let new_join_left = join_with_predicate_push_down.left();
282        let index_scan_with_predicate: &LogicalScan = new_join_left
283            .as_logical_scan()
284            .expect("must be a logical scan");
285
286        // 3. calculate index cost, index lookup use primary table to estimate row size.
287        let index_cost = self.estimate_table_scan_cost(
288            index_scan_with_predicate,
289            TableScanIoEstimator::estimate_row_size(logical_scan),
290        );
291        // lookup cost = index cost * LOOKUP_COST_CONST
292        let lookup_cost = index_cost.mul(&IndexCost::new(LOOKUP_COST_CONST, false));
293
294        // 4. keep the same schema with original logical_scan
295        let scan_output_col_idx = logical_scan.output_col_idx();
296        let lookup_join = join_ref.prune_col(
297            &scan_output_col_idx
298                .iter()
299                .map(|&col_idx| col_idx + offset)
300                .collect_vec(),
301            &mut ColumnPruningContext::new(join_ref.clone()),
302        );
303
304        (lookup_join, lookup_cost)
305    }
306
307    /// Index Merge Selection
308    /// Deal with predicate like a = 1 or b = 1
309    /// Merge index scans from a table, currently merge is union semantic.
310    fn index_merge_selection(&self, logical_scan: &LogicalScan) -> Option<(PlanRef, IndexCost)> {
311        let predicate = logical_scan.predicate().clone();
312        // Index merge is kind of index lookup join so use primary table row size to estimate index
313        // cost.
314        let primary_table_row_size = TableScanIoEstimator::estimate_row_size(logical_scan);
315        // 1. choose lowest cost index merge path
316        let paths = self.gen_paths(
317            &predicate.conjunctions,
318            logical_scan,
319            primary_table_row_size,
320        );
321        let (index_access, index_access_cost) =
322            self.choose_min_cost_path(&paths, primary_table_row_size)?;
323
324        // 2. lookup primary table
325        // the schema of index_access is the order key of primary table .
326        let schema: &Schema = index_access.schema();
327        let index_access_len = schema.len();
328
329        let mut shift_input_ref_rewriter = ShiftInputRefRewriter {
330            offset: index_access_len,
331        };
332        let new_predicate = predicate.rewrite_expr(&mut shift_input_ref_rewriter);
333
334        let primary_table_desc = logical_scan.table_desc();
335
336        let primary_table_scan = LogicalScan::create(
337            logical_scan.table_name().to_owned(),
338            logical_scan.table_catalog(),
339            vec![],
340            logical_scan.ctx(),
341            logical_scan.as_of().clone(),
342            logical_scan.table_cardinality(),
343        );
344
345        let conjunctions = primary_table_desc
346            .pk
347            .iter()
348            .enumerate()
349            .map(|(x, y)| {
350                Self::create_null_safe_equal_expr(
351                    x,
352                    schema.fields[x].data_type.clone(),
353                    y.column_index + index_access_len,
354                    primary_table_desc.columns[y.column_index].data_type.clone(),
355                )
356            })
357            .chain(new_predicate)
358            .collect_vec();
359
360        let on = Condition { conjunctions };
361        let join: PlanRef =
362            LogicalJoin::new(index_access, primary_table_scan.into(), JoinType::Inner, on).into();
363
364        // 3 push down predicate
365        let join_ref = join.predicate_pushdown(
366            Condition::true_cond(),
367            &mut PredicatePushdownContext::new(join.clone()),
368        );
369
370        // 4. keep the same schema with original logical_scan
371        let scan_output_col_idx = logical_scan.output_col_idx();
372        let lookup_join = join_ref.prune_col(
373            &scan_output_col_idx
374                .iter()
375                .map(|&col_idx| col_idx + index_access_len)
376                .collect_vec(),
377            &mut ColumnPruningContext::new(join_ref.clone()),
378        );
379
380        Some((
381            lookup_join,
382            index_access_cost.mul(&IndexCost::new(LOOKUP_COST_CONST, false)),
383        ))
384    }
385
386    /// Generate possible paths that can be used to access.
387    /// The schema of output is the order key of primary table, so it can be used to lookup primary
388    /// table later.
389    /// Method `gen_paths` handles the complex condition recursively which may contains nested `AND`
390    /// and `OR`. However, Method `gen_index_path` handles one arm of an OR clause which is a
391    /// basic unit for index selection.
392    fn gen_paths(
393        &self,
394        conjunctions: &[ExprImpl],
395        logical_scan: &LogicalScan,
396        primary_table_row_size: usize,
397    ) -> Vec<PlanRef> {
398        let mut result = vec![];
399        for expr in conjunctions {
400            // it's OR clause!
401            if let ExprImpl::FunctionCall(function_call) = expr
402                && function_call.func_type() == ExprType::Or
403            {
404                let mut index_to_be_merged = vec![];
405
406                let disjunctions = to_disjunctions(expr.clone());
407                let (map, others) = self.clustering_disjunction(disjunctions);
408                let iter = map
409                    .into_iter()
410                    .map(|(column_index, expr)| (Some(column_index), expr))
411                    .chain(others.into_iter().map(|expr| (None, expr)));
412                for (column_index, expr) in iter {
413                    let mut index_paths = vec![];
414                    let conjunctions = to_conjunctions(expr);
415                    index_paths.extend(
416                        self.gen_index_path(column_index, &conjunctions, logical_scan)
417                            .into_iter(),
418                    );
419                    // complex condition, recursively gen paths
420                    if conjunctions.len() > 1 {
421                        index_paths.extend(
422                            self.gen_paths(&conjunctions, logical_scan, primary_table_row_size)
423                                .into_iter(),
424                        );
425                    }
426
427                    match self.choose_min_cost_path(&index_paths, primary_table_row_size) {
428                        None => {
429                            // One arm of OR clause can't use index, bail out
430                            index_to_be_merged.clear();
431                            break;
432                        }
433                        Some((path, _)) => index_to_be_merged.push(path),
434                    }
435                }
436
437                if let Some(path) = self.merge(index_to_be_merged) {
438                    result.push(path)
439                }
440            }
441        }
442
443        result
444    }
445
446    /// Clustering disjunction or expr by column index. If expr is complex, classify them as others.
447    ///
448    /// a = 1, b = 2, b = 3 -> map: [a, (a = 1)], [b, (b = 2 or b = 3)], others: []
449    ///
450    /// a = 1, (b = 2 and c = 3) -> map: [a, (a = 1)], others:
451    ///
452    /// (a > 1 and a < 8) or (c > 1 and c < 8)
453    /// -> map: [], others: [(a > 1 and a < 8), (c > 1 and c < 8)]
454    fn clustering_disjunction(
455        &self,
456        disjunctions: Vec<ExprImpl>,
457    ) -> (HashMap<usize, ExprImpl>, Vec<ExprImpl>) {
458        let mut map: HashMap<usize, ExprImpl> = HashMap::new();
459        let mut others = vec![];
460        for expr in disjunctions {
461            let idx = {
462                if let Some((input_ref, _const_expr)) = expr.as_eq_const() {
463                    Some(input_ref.index)
464                } else if let Some((input_ref, _in_const_list)) = expr.as_in_const_list() {
465                    Some(input_ref.index)
466                } else if let Some((input_ref, _op, _const_expr)) = expr.as_comparison_const() {
467                    Some(input_ref.index)
468                } else {
469                    None
470                }
471            };
472
473            if let Some(idx) = idx {
474                match map.entry(idx) {
475                    Occupied(mut entry) => {
476                        let expr2: ExprImpl = entry.get().to_owned();
477                        let or_expr = ExprImpl::FunctionCall(
478                            FunctionCall::new_unchecked(
479                                ExprType::Or,
480                                vec![expr, expr2],
481                                DataType::Boolean,
482                            )
483                            .into(),
484                        );
485                        entry.insert(or_expr);
486                    }
487                    Vacant(entry) => {
488                        entry.insert(expr);
489                    }
490                };
491            } else {
492                others.push(expr);
493                continue;
494            }
495        }
496
497        (map, others)
498    }
499
500    /// Given a conjunctions from one arm of an OR clause (basic unit to index selection), generate
501    /// all matching index path (including primary index) for the relation.
502    /// `column_index` (refers to primary table) is a hint can be used to prune index.
503    /// Steps:
504    /// 1. Take the combination of `conjunctions` to extract the potential clauses.
505    /// 2. For each potential clauses, generate index path if it can.
506    fn gen_index_path(
507        &self,
508        column_index: Option<usize>,
509        conjunctions: &[ExprImpl],
510        logical_scan: &LogicalScan,
511    ) -> Vec<PlanRef> {
512        // Assumption: use at most `MAX_COMBINATION_SIZE` clauses, we can determine which is the
513        // best index.
514        let mut combinations = vec![];
515        for i in 1..min(conjunctions.len(), MAX_COMBINATION_SIZE) + 1 {
516            combinations.extend(
517                conjunctions
518                    .iter()
519                    .take(min(conjunctions.len(), MAX_CONJUNCTION_SIZE))
520                    .combinations(i),
521            );
522        }
523
524        let mut result = vec![];
525
526        for index in logical_scan.indexes() {
527            if column_index.is_some() {
528                assert_eq!(conjunctions.len(), 1);
529                let p2s_mapping = index.primary_to_secondary_mapping();
530                match p2s_mapping.get(column_index.as_ref().unwrap()) {
531                    None => continue, // not found, prune this index
532                    Some(&idx) => {
533                        if index.index_table.pk()[0].column_index != idx {
534                            // not match, prune this index
535                            continue;
536                        }
537                    }
538                }
539            }
540
541            // try secondary index
542            for conj in &combinations {
543                let condition = Condition {
544                    conjunctions: conj.iter().map(|&x| x.to_owned()).collect(),
545                };
546                if let Some(index_access) = self.build_index_access(
547                    index.clone(),
548                    condition,
549                    logical_scan.ctx().clone(),
550                    logical_scan.as_of().clone(),
551                ) {
552                    result.push(index_access);
553                }
554            }
555        }
556
557        // try primary index
558        let primary_table_desc = logical_scan.table_desc();
559        if let Some(idx) = column_index {
560            assert_eq!(conjunctions.len(), 1);
561            if primary_table_desc.pk[0].column_index != idx {
562                return result;
563            }
564        }
565
566        let primary_access = generic::TableScan::new(
567            logical_scan.table_name().to_owned(),
568            primary_table_desc
569                .pk
570                .iter()
571                .map(|x| x.column_index)
572                .collect_vec(),
573            logical_scan.table_catalog(),
574            vec![],
575            logical_scan.ctx(),
576            Condition {
577                conjunctions: conjunctions.to_vec(),
578            },
579            logical_scan.as_of().clone(),
580            logical_scan.table_cardinality(),
581        );
582
583        result.push(primary_access.into());
584
585        result
586    }
587
588    /// build index access if predicate (refers to primary table) is covered by index
589    fn build_index_access(
590        &self,
591        index: Rc<IndexCatalog>,
592        predicate: Condition,
593        ctx: OptimizerContextRef,
594        as_of: Option<AsOf>,
595    ) -> Option<PlanRef> {
596        let mut rewriter = IndexPredicateRewriter::new(
597            index.primary_to_secondary_mapping(),
598            index.function_mapping(),
599            0,
600        );
601        let new_predicate = predicate.rewrite_expr(&mut rewriter);
602
603        // check condition is covered by index.
604        if !rewriter.covered_by_index() {
605            return None;
606        }
607
608        Some(
609            generic::TableScan::new(
610                index.index_table.name.clone(),
611                index
612                    .primary_table_pk_ref_to_index_table()
613                    .iter()
614                    .map(|x| x.column_index)
615                    .collect_vec(),
616                index.index_table.clone(),
617                vec![],
618                ctx,
619                new_predicate,
620                as_of,
621                index.index_table.cardinality,
622            )
623            .into(),
624        )
625    }
626
627    fn merge(&self, paths: Vec<PlanRef>) -> Option<PlanRef> {
628        if paths.is_empty() {
629            return None;
630        }
631
632        let new_paths = paths
633            .iter()
634            .flat_map(|path| {
635                if let Some(union) = path.as_logical_union() {
636                    union.inputs().to_vec()
637                } else if let Some(_scan) = path.as_logical_scan() {
638                    vec![path.clone()]
639                } else {
640                    unreachable!();
641                }
642            })
643            .sorted_by(|a, b| {
644                // sort inputs to make plan deterministic
645                a.as_logical_scan()
646                    .expect("expect to be a logical scan")
647                    .table_name()
648                    .cmp(
649                        b.as_logical_scan()
650                            .expect("expect to be a logical scan")
651                            .table_name(),
652                    )
653            })
654            .collect_vec();
655
656        Some(LogicalUnion::create(false, new_paths))
657    }
658
659    fn choose_min_cost_path(
660        &self,
661        paths: &[PlanRef],
662        primary_table_row_size: usize,
663    ) -> Option<(PlanRef, IndexCost)> {
664        paths
665            .iter()
666            .map(|path| {
667                if let Some(scan) = path.as_logical_scan() {
668                    let cost = self.estimate_table_scan_cost(scan, primary_table_row_size);
669                    (scan.clone().into(), cost)
670                } else if let Some(union) = path.as_logical_union() {
671                    let cost = union
672                        .inputs()
673                        .iter()
674                        .map(|input| {
675                            self.estimate_table_scan_cost(
676                                input.as_logical_scan().expect("expect to be a scan"),
677                                primary_table_row_size,
678                            )
679                        })
680                        .reduce(|a, b| a.add(&b))
681                        .unwrap();
682                    (union.clone().into(), cost)
683                } else {
684                    unreachable!()
685                }
686            })
687            .min_by(|(_, cost1), (_, cost2)| Ord::cmp(cost1, cost2))
688    }
689
690    fn estimate_table_scan_cost(&self, scan: &LogicalScan, row_size: usize) -> IndexCost {
691        let mut table_scan_io_estimator = TableScanIoEstimator::new(scan, row_size);
692        table_scan_io_estimator.estimate(scan.predicate())
693    }
694
695    fn estimate_full_table_scan_cost(&self, scan: &LogicalScan, row_size: usize) -> IndexCost {
696        let mut table_scan_io_estimator = TableScanIoEstimator::new(scan, row_size);
697        table_scan_io_estimator.estimate(&Condition::true_cond())
698    }
699
700    fn create_null_safe_equal_expr(
701        left: usize,
702        left_data_type: DataType,
703        right: usize,
704        right_data_type: DataType,
705    ) -> ExprImpl {
706        ExprImpl::FunctionCall(Box::new(FunctionCall::new_unchecked(
707            ExprType::IsNotDistinctFrom,
708            vec![
709                ExprImpl::InputRef(Box::new(InputRef::new(left, left_data_type))),
710                ExprImpl::InputRef(Box::new(InputRef::new(right, right_data_type))),
711            ],
712            DataType::Boolean,
713        )))
714    }
715}
716
717struct TableScanIoEstimator<'a> {
718    table_scan: &'a LogicalScan,
719    row_size: usize,
720    cost: Option<IndexCost>,
721}
722
723impl<'a> TableScanIoEstimator<'a> {
724    pub fn new(table_scan: &'a LogicalScan, row_size: usize) -> Self {
725        Self {
726            table_scan,
727            row_size,
728            cost: None,
729        }
730    }
731
732    pub fn estimate_row_size(table_scan: &LogicalScan) -> usize {
733        // 5 for table_id + 1 for vnode + 8 for epoch
734        let row_meta_field_estimate_size = 14_usize;
735        let table_desc = table_scan.table_desc();
736        row_meta_field_estimate_size
737            + table_desc
738                .columns
739                .iter()
740                // add order key twice for its appearance both in key and value
741                .chain(
742                    table_desc
743                        .pk
744                        .iter()
745                        .map(|x| &table_desc.columns[x.column_index]),
746                )
747                .map(|x| TableScanIoEstimator::estimate_data_type_size(&x.data_type))
748                .sum::<usize>()
749    }
750
751    fn estimate_data_type_size(data_type: &DataType) -> usize {
752        use std::mem::size_of;
753
754        match data_type {
755            DataType::Boolean => size_of::<bool>(),
756            DataType::Int16 => size_of::<i16>(),
757            DataType::Int32 => size_of::<i32>(),
758            DataType::Int64 => size_of::<i64>(),
759            DataType::Serial => size_of::<Serial>(),
760            DataType::Float32 => size_of::<f32>(),
761            DataType::Float64 => size_of::<f64>(),
762            DataType::Decimal => size_of::<Decimal>(),
763            DataType::Date => size_of::<Date>(),
764            DataType::Time => size_of::<Time>(),
765            DataType::Timestamp => size_of::<Timestamp>(),
766            DataType::Timestamptz => size_of::<Timestamptz>(),
767            DataType::Interval => size_of::<Interval>(),
768            DataType::Int256 => Int256::size(),
769            DataType::Varchar => 20,
770            DataType::Bytea => 20,
771            DataType::Jsonb => 20,
772            DataType::Struct { .. } => 20,
773            DataType::List { .. } => 20,
774            DataType::Map(_) => 20,
775        }
776    }
777
778    pub fn estimate(&mut self, predicate: &Condition) -> IndexCost {
779        // try to deal with OR condition
780        if predicate.conjunctions.len() == 1 {
781            self.visit_expr(&predicate.conjunctions[0]);
782            self.cost.take().unwrap_or_default()
783        } else {
784            self.estimate_conjunctions(&predicate.conjunctions)
785        }
786    }
787
788    fn estimate_conjunctions(&mut self, conjunctions: &[ExprImpl]) -> IndexCost {
789        let order_column_indices = self.table_scan.table_desc().order_column_indices();
790
791        let mut new_conjunctions = conjunctions.to_owned();
792
793        let mut match_item_vec = vec![];
794
795        for column_idx in order_column_indices {
796            let match_item = self.match_index_column(column_idx, &mut new_conjunctions);
797            // seeing range, we don't need to match anymore.
798            let should_break = match match_item {
799                MatchItem::Equal | MatchItem::In(_) => false,
800                MatchItem::RangeOneSideBound | MatchItem::RangeTwoSideBound | MatchItem::All => {
801                    true
802                }
803            };
804            match_item_vec.push(match_item);
805            if should_break {
806                break;
807            }
808        }
809
810        let index_cost = match_item_vec
811            .iter()
812            .enumerate()
813            .take(INDEX_MAX_LEN)
814            .map(|(i, match_item)| match match_item {
815                MatchItem::Equal => INDEX_COST_MATRIX[0][i],
816                MatchItem::In(num) => min(INDEX_COST_MATRIX[1][i], *num),
817                MatchItem::RangeTwoSideBound => INDEX_COST_MATRIX[2][i],
818                MatchItem::RangeOneSideBound => INDEX_COST_MATRIX[3][i],
819                MatchItem::All => INDEX_COST_MATRIX[4][i],
820            })
821            .reduce(|x, y| x * y)
822            .unwrap();
823
824        // If `index_cost` equals 1, it is a primary lookup
825        let primary_lookup = index_cost == 1;
826
827        IndexCost::new(index_cost, primary_lookup)
828            .mul(&IndexCost::new(self.row_size, primary_lookup))
829    }
830
831    fn match_index_column(
832        &mut self,
833        column_idx: usize,
834        conjunctions: &mut Vec<ExprImpl>,
835    ) -> MatchItem {
836        // Equal
837        for (i, expr) in conjunctions.iter().enumerate() {
838            if let Some((input_ref, _const_expr)) = expr.as_eq_const()
839                && input_ref.index == column_idx
840            {
841                conjunctions.remove(i);
842                return MatchItem::Equal;
843            }
844        }
845
846        // In
847        for (i, expr) in conjunctions.iter().enumerate() {
848            if let Some((input_ref, in_const_list)) = expr.as_in_const_list()
849                && input_ref.index == column_idx
850            {
851                conjunctions.remove(i);
852                return MatchItem::In(in_const_list.len());
853            }
854        }
855
856        // Range
857        let mut left_side_bound = false;
858        let mut right_side_bound = false;
859        let mut i = 0;
860        while i < conjunctions.len() {
861            let expr = &conjunctions[i];
862            if let Some((input_ref, op, _const_expr)) = expr.as_comparison_const()
863                && input_ref.index == column_idx
864            {
865                conjunctions.remove(i);
866                match op {
867                    ExprType::LessThan | ExprType::LessThanOrEqual => right_side_bound = true,
868                    ExprType::GreaterThan | ExprType::GreaterThanOrEqual => left_side_bound = true,
869                    _ => unreachable!(),
870                };
871            } else {
872                i += 1;
873            }
874        }
875
876        if left_side_bound && right_side_bound {
877            MatchItem::RangeTwoSideBound
878        } else if left_side_bound || right_side_bound {
879            MatchItem::RangeOneSideBound
880        } else {
881            MatchItem::All
882        }
883    }
884}
885
886enum MatchItem {
887    Equal,
888    In(usize),
889    RangeTwoSideBound,
890    RangeOneSideBound,
891    All,
892}
893
894#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord)]
895struct IndexCost {
896    cost: usize,
897    primary_lookup: bool,
898}
899
900impl Default for IndexCost {
901    fn default() -> Self {
902        Self {
903            cost: IndexCost::maximum(),
904            primary_lookup: false,
905        }
906    }
907}
908
909impl IndexCost {
910    fn new(cost: usize, primary_lookup: bool) -> IndexCost {
911        Self {
912            cost: min(cost, IndexCost::maximum()),
913            primary_lookup,
914        }
915    }
916
917    fn maximum() -> usize {
918        10000000
919    }
920
921    fn add(&self, other: &IndexCost) -> IndexCost {
922        IndexCost::new(
923            self.cost
924                .checked_add(other.cost)
925                .unwrap_or_else(IndexCost::maximum),
926            self.primary_lookup && other.primary_lookup,
927        )
928    }
929
930    fn mul(&self, other: &IndexCost) -> IndexCost {
931        IndexCost::new(
932            self.cost
933                .checked_mul(other.cost)
934                .unwrap_or_else(IndexCost::maximum),
935            self.primary_lookup && other.primary_lookup,
936        )
937    }
938
939    fn le(&self, other: &IndexCost) -> bool {
940        self.cost < other.cost
941    }
942}
943
944impl ExprVisitor for TableScanIoEstimator<'_> {
945    fn visit_function_call(&mut self, func_call: &FunctionCall) {
946        let cost = match func_call.func_type() {
947            ExprType::Or => func_call
948                .inputs()
949                .iter()
950                .map(|x| {
951                    let mut estimator = TableScanIoEstimator::new(self.table_scan, self.row_size);
952                    estimator.visit_expr(x);
953                    estimator.cost.take().unwrap_or_default()
954                })
955                .reduce(|x, y| x.add(&y))
956                .unwrap(),
957            ExprType::And => self.estimate_conjunctions(func_call.inputs()),
958            _ => {
959                let single = vec![ExprImpl::FunctionCall(func_call.clone().into())];
960                self.estimate_conjunctions(&single)
961            }
962        };
963        self.cost = Some(cost);
964    }
965}
966
967struct ShiftInputRefRewriter {
968    offset: usize,
969}
970impl ExprRewriter for ShiftInputRefRewriter {
971    fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
972        InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into()
973    }
974}
975
976impl IndexSelectionRule {
977    pub fn create() -> BoxedRule {
978        Box::new(IndexSelectionRule {})
979    }
980}