risingwave_frontend/optimizer/rule/
streaming_index_selection_rule.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Streaming index selection rule for backfill.
16//!
17//! Unlike the batch [`IndexSelectionRule`](IndexSelectionRule), this rule:
18//! - Only considers **covering** indexes (no lookup join in streaming backfill)
19//! - Handles IN predicates by expanding into a `LogicalUnion` of `LogicalScan`s,
20//!   each with a branch-local equality predicate for correct post-backfill routing
21
22use std::ops::Bound;
23
24use super::index_selection_rule::TableScanIoEstimator;
25use super::prelude::PlanRef;
26use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, Literal};
27use crate::optimizer::plan_node::generic::GenericPlanRef;
28use crate::optimizer::plan_node::{LogicalScan, LogicalUnion};
29use crate::optimizer::rule::IndexSelectionRule;
30use crate::utils::Condition;
31
32pub struct StreamingIndexSelectionRule;
33
34impl StreamingIndexSelectionRule {
35    /// Rewrites a `LogicalScan` for backfill.
36    ///
37    /// Applies two optimizations in order:
38    /// 1. **Covering index selection**: picks the lowest-cost covering index
39    /// 2. **IN expansion**: splits IN predicates into a `LogicalUnion` of `LogicalScan`s,
40    ///    each with a single-value equality predicate
41    ///
42    /// Returns `Some(PlanRef)` which is either a `LogicalScan` (index only) or a
43    /// `LogicalUnion` of `LogicalScan`s (IN expansion, possibly on an index).
44    pub fn rewrite(logical_scan: &LogicalScan) -> Option<PlanRef> {
45        // Step 1: Try covering index selection.
46        let best_scan =
47            Self::select_covering_index(logical_scan).unwrap_or_else(|| logical_scan.clone());
48
49        // Step 2: Try IN expansion on the (possibly index-backed) scan.
50        if let Some(union) = Self::try_expand_in(&best_scan) {
51            return Some(union);
52        }
53
54        // If we selected a better index (different from original), return it.
55        if best_scan.table().id != logical_scan.table().id {
56            return Some(best_scan.into());
57        }
58
59        None
60    }
61
62    /// Picks the lowest-cost covering index for a streaming scan.
63    fn select_covering_index(logical_scan: &LogicalScan) -> Option<LogicalScan> {
64        let indexes = logical_scan.table_indexes();
65        if indexes.is_empty() {
66            return None;
67        }
68
69        let rule = IndexSelectionRule {};
70        let primary_table_row_size = TableScanIoEstimator::estimate_row_size(logical_scan);
71        let primary_cost = std::cmp::min(
72            rule.estimate_table_scan_cost(logical_scan, primary_table_row_size),
73            rule.estimate_full_table_scan_cost(logical_scan, primary_table_row_size),
74        );
75
76        if primary_cost.primary_lookup {
77            return None;
78        }
79
80        let mut best_scan: Option<LogicalScan> = None;
81        let mut min_cost = primary_cost;
82
83        for index in indexes {
84            if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
85                let index_cost = rule.estimate_table_scan_cost(
86                    &index_scan,
87                    TableScanIoEstimator::estimate_row_size(&index_scan),
88                );
89                if index_cost.le(&min_cost) {
90                    min_cost = index_cost;
91                    best_scan = Some(index_scan);
92                }
93            }
94            // Skip non-covering indexes — no lookup join for backfill.
95        }
96
97        best_scan
98    }
99
100    /// Expands an IN predicate into a `LogicalUnion` of `LogicalScan`s.
101    ///
102    /// Each branch gets a single-value equality predicate (e.g., `a = 1`) plus residual
103    /// conditions. Only applies when all scan ranges are non-trivial.
104    fn try_expand_in(logical_scan: &LogicalScan) -> Option<PlanRef> {
105        let (scan_ranges, residual) = logical_scan
106            .predicate()
107            .clone()
108            .split_to_scan_ranges(
109                logical_scan.table(),
110                logical_scan
111                    .base
112                    .ctx()
113                    .session_ctx()
114                    .config()
115                    .max_split_range_gap() as u64,
116            )
117            .ok()?;
118
119        if scan_ranges.len() <= 1
120            || !scan_ranges.iter().all(|r| {
121                !r.is_full_table_scan()
122                    && matches!(r.range.0, Bound::Unbounded)
123                    && matches!(r.range.1, Bound::Unbounded)
124            })
125        {
126            return None;
127        }
128
129        let pk = &logical_scan.table().pk;
130        let table_cols = &logical_scan.table().columns;
131
132        let branches: Vec<PlanRef> = scan_ranges
133            .into_iter()
134            .map(|range| -> Option<PlanRef> {
135                // Build a branch predicate from this scan range's eq_conds.
136                // InputRef indices are in table-column space.
137                let mut conjunctions: Vec<ExprImpl> = range
138                    .eq_conds
139                    .iter()
140                    .enumerate()
141                    .map(|(i, datum)| {
142                        let table_col_idx = pk[i].column_index;
143                        let col_type = table_cols[table_col_idx].data_type().clone();
144                        let input_ref = InputRef::new(table_col_idx, col_type.clone());
145                        let literal = Literal::new(datum.clone(), col_type);
146                        FunctionCall::new(ExprType::Equal, vec![input_ref.into(), literal.into()])
147                            .map(ExprImpl::from)
148                    })
149                    .collect::<Result<_, _>>()
150                    .ok()?;
151                // Add residual conditions so each branch is self-contained.
152                conjunctions.extend(residual.conjunctions.iter().cloned());
153                let branch_predicate = Condition { conjunctions };
154                Some(logical_scan.clone_with_predicate(branch_predicate).into())
155            })
156            .collect::<Option<_>>()?;
157
158        Some(LogicalUnion::create(true, branches))
159    }
160}