risingwave_frontend/optimizer/rule/streaming_index_selection_rule.rs
1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Streaming index selection rule for backfill.
16//!
17//! Unlike the batch [`IndexSelectionRule`](IndexSelectionRule), this rule:
18//! - Only considers **covering** indexes (no lookup join in streaming backfill)
19//! - Handles IN predicates by expanding into a `LogicalUnion` of `LogicalScan`s,
20//! each with a branch-local equality predicate for correct post-backfill routing
21
22use std::ops::Bound;
23
24use super::index_selection_rule::TableScanIoEstimator;
25use super::prelude::PlanRef;
26use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, Literal};
27use crate::optimizer::plan_node::generic::GenericPlanRef;
28use crate::optimizer::plan_node::{LogicalScan, LogicalUnion};
29use crate::optimizer::rule::IndexSelectionRule;
30use crate::utils::Condition;
31
32pub struct StreamingIndexSelectionRule;
33
34impl StreamingIndexSelectionRule {
35 /// Rewrites a `LogicalScan` for backfill.
36 ///
37 /// Applies two optimizations in order:
38 /// 1. **Covering index selection**: picks the lowest-cost covering index
39 /// 2. **IN expansion**: splits IN predicates into a `LogicalUnion` of `LogicalScan`s,
40 /// each with a single-value equality predicate
41 ///
42 /// Returns `Some(PlanRef)` which is either a `LogicalScan` (index only) or a
43 /// `LogicalUnion` of `LogicalScan`s (IN expansion, possibly on an index).
44 pub fn rewrite(logical_scan: &LogicalScan) -> Option<PlanRef> {
45 // Step 1: Try covering index selection.
46 let best_scan =
47 Self::select_covering_index(logical_scan).unwrap_or_else(|| logical_scan.clone());
48
49 // Step 2: Try IN expansion on the (possibly index-backed) scan.
50 if let Some(union) = Self::try_expand_in(&best_scan) {
51 return Some(union);
52 }
53
54 // If we selected a better index (different from original), return it.
55 if best_scan.table().id != logical_scan.table().id {
56 return Some(best_scan.into());
57 }
58
59 None
60 }
61
62 /// Picks the lowest-cost covering index for a streaming scan.
63 fn select_covering_index(logical_scan: &LogicalScan) -> Option<LogicalScan> {
64 let indexes = logical_scan.table_indexes();
65 if indexes.is_empty() {
66 return None;
67 }
68
69 let rule = IndexSelectionRule {};
70 let primary_table_row_size = TableScanIoEstimator::estimate_row_size(logical_scan);
71 let primary_cost = std::cmp::min(
72 rule.estimate_table_scan_cost(logical_scan, primary_table_row_size),
73 rule.estimate_full_table_scan_cost(logical_scan, primary_table_row_size),
74 );
75
76 if primary_cost.primary_lookup {
77 return None;
78 }
79
80 let mut best_scan: Option<LogicalScan> = None;
81 let mut min_cost = primary_cost;
82
83 for index in indexes {
84 if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
85 let index_cost = rule.estimate_table_scan_cost(
86 &index_scan,
87 TableScanIoEstimator::estimate_row_size(&index_scan),
88 );
89 if index_cost.le(&min_cost) {
90 min_cost = index_cost;
91 best_scan = Some(index_scan);
92 }
93 }
94 // Skip non-covering indexes — no lookup join for backfill.
95 }
96
97 best_scan
98 }
99
100 /// Expands an IN predicate into a `LogicalUnion` of `LogicalScan`s.
101 ///
102 /// Each branch gets a single-value equality predicate (e.g., `a = 1`) plus residual
103 /// conditions. Only applies when all scan ranges are non-trivial.
104 fn try_expand_in(logical_scan: &LogicalScan) -> Option<PlanRef> {
105 let (scan_ranges, residual) = logical_scan
106 .predicate()
107 .clone()
108 .split_to_scan_ranges(
109 logical_scan.table(),
110 logical_scan
111 .base
112 .ctx()
113 .session_ctx()
114 .config()
115 .max_split_range_gap() as u64,
116 )
117 .ok()?;
118
119 if scan_ranges.len() <= 1
120 || !scan_ranges.iter().all(|r| {
121 !r.is_full_table_scan()
122 && matches!(r.range.0, Bound::Unbounded)
123 && matches!(r.range.1, Bound::Unbounded)
124 })
125 {
126 return None;
127 }
128
129 let pk = &logical_scan.table().pk;
130 let table_cols = &logical_scan.table().columns;
131
132 let branches: Vec<PlanRef> = scan_ranges
133 .into_iter()
134 .map(|range| -> Option<PlanRef> {
135 // Build a branch predicate from this scan range's eq_conds.
136 // InputRef indices are in table-column space.
137 let mut conjunctions: Vec<ExprImpl> = range
138 .eq_conds
139 .iter()
140 .enumerate()
141 .map(|(i, datum)| {
142 let table_col_idx = pk[i].column_index;
143 let col_type = table_cols[table_col_idx].data_type().clone();
144 let input_ref = InputRef::new(table_col_idx, col_type.clone());
145 let literal = Literal::new(datum.clone(), col_type);
146 FunctionCall::new(ExprType::Equal, vec![input_ref.into(), literal.into()])
147 .map(ExprImpl::from)
148 })
149 .collect::<Result<_, _>>()
150 .ok()?;
151 // Add residual conditions so each branch is self-contained.
152 conjunctions.extend(residual.conjunctions.iter().cloned());
153 let branch_predicate = Condition { conjunctions };
154 Some(logical_scan.clone_with_predicate(branch_predicate).into())
155 })
156 .collect::<Option<_>>()?;
157
158 Some(LogicalUnion::create(true, branches))
159 }
160}