risingwave_frontend/optimizer/plan_node/
logical_scan.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnDesc, Schema};
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::AsOf;
23
24use super::generic::{GenericPlanNode, GenericPlanRef};
25use super::utils::{Distill, childless_record};
26use super::{
27    BackfillType, BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical,
28    LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamFilter,
29    StreamProject, StreamTableScan, ToBatch, ToStream, generic,
30};
31use crate::TableCatalog;
32use crate::binder::BoundBaseTable;
33use crate::catalog::ColumnId;
34use crate::catalog::index_catalog::{IndexType, TableIndex, VectorIndex};
35use crate::error::{ErrorCode, Result};
36use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
37use crate::optimizer::ApplyResult;
38use crate::optimizer::optimizer_context::OptimizerContextRef;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::plan_node_meta::AnyPlanNodeMeta;
41use crate::optimizer::plan_node::{
42    BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
43    PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
44};
45use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
46use crate::optimizer::rule::IndexSelectionRule;
47use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
48
49/// `LogicalScan` returns contents of a table or other equivalent object
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct LogicalScan {
52    pub base: PlanBase<Logical>,
53    core: generic::TableScan,
54}
55
56impl From<generic::TableScan> for LogicalScan {
57    fn from(core: generic::TableScan) -> Self {
58        let base = PlanBase::new_logical_with_core(&core);
59        Self { base, core }
60    }
61}
62
63impl From<generic::TableScan> for PlanRef {
64    fn from(core: generic::TableScan) -> Self {
65        LogicalScan::from(core).into()
66    }
67}
68
69impl GenericPlanRef for LogicalScan {
70    fn id(&self) -> PlanNodeId {
71        self.plan_base().id()
72    }
73
74    fn schema(&self) -> &Schema {
75        self.plan_base().schema()
76    }
77
78    fn stream_key(&self) -> Option<&[usize]> {
79        self.plan_base().stream_key()
80    }
81
82    fn ctx(&self) -> OptimizerContextRef {
83        self.plan_base().ctx()
84    }
85
86    fn functional_dependency(&self) -> &FunctionalDependencySet {
87        self.plan_base().functional_dependency()
88    }
89}
90
91impl LogicalScan {
92    /// Create a [`LogicalScan`] node. Used by planner.
93    pub fn create(
94        table_catalog: Arc<TableCatalog>,
95        ctx: OptimizerContextRef,
96        as_of: Option<AsOf>,
97    ) -> Self {
98        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
99        generic::TableScan::new(
100            output_col_idx,
101            table_catalog,
102            vec![],
103            vec![],
104            ctx,
105            Condition::true_cond(),
106            as_of,
107        )
108        .into()
109    }
110
111    pub fn from_base_table(
112        base_table: &BoundBaseTable,
113        ctx: OptimizerContextRef,
114        as_of: Option<AsOf>,
115    ) -> Self {
116        let table_catalog = base_table.table_catalog.clone();
117        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
118        let mut table_indexes = vec![];
119        let mut vector_indexes = vec![];
120        for index in &base_table.table_indexes {
121            match &index.index_type {
122                IndexType::Table(index) => table_indexes.push(index.clone()),
123                IndexType::Vector(index) => vector_indexes.push(index.clone()),
124            }
125        }
126        generic::TableScan::new(
127            output_col_idx,
128            table_catalog,
129            table_indexes,
130            vector_indexes,
131            ctx,
132            Condition::true_cond(),
133            as_of,
134        )
135        .into()
136    }
137
138    pub fn table_name(&self) -> &str {
139        &self.core.table_catalog.name
140    }
141
142    pub fn as_of(&self) -> Option<AsOf> {
143        self.core.as_of.clone()
144    }
145
146    /// The cardinality of the table **without** applying the predicate.
147    pub fn table_cardinality(&self) -> Cardinality {
148        self.core.table_catalog.cardinality
149    }
150
151    pub fn table(&self) -> &Arc<TableCatalog> {
152        &self.core.table_catalog
153    }
154
155    /// Get the descs of the output columns.
156    pub fn column_descs(&self) -> Vec<ColumnDesc> {
157        self.core.column_descs()
158    }
159
160    /// Get the ids of the output columns.
161    pub fn output_column_ids(&self) -> Vec<ColumnId> {
162        self.core.output_column_ids()
163    }
164
165    /// Get all table indexes on this table
166    pub fn table_indexes(&self) -> &[Arc<TableIndex>] {
167        &self.core.table_indexes
168    }
169
170    /// Get all vector indexes on this table
171    pub fn vector_indexes(&self) -> &[Arc<VectorIndex>] {
172        &self.core.vector_indexes
173    }
174
175    /// Get the logical scan's filter predicate
176    pub fn predicate(&self) -> &Condition {
177        &self.core.predicate
178    }
179
180    /// Return indices of fields the output is ordered by and
181    /// corresponding direction
182    pub fn get_out_column_index_order(&self) -> Order {
183        self.core.get_out_column_index_order()
184    }
185
186    pub fn distribution_key(&self) -> Option<Vec<usize>> {
187        self.core.distribution_key()
188    }
189
190    pub fn watermark_columns(&self) -> WatermarkColumns {
191        self.core.watermark_columns()
192    }
193
194    pub fn cross_database(&self) -> bool {
195        self.core.cross_database()
196    }
197
198    /// Return indexes can satisfy the required order.
199    pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Arc<TableIndex>> {
200        self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
201            .into_iter()
202            .map(|(index, _)| index)
203            .collect()
204    }
205
206    /// Return indexes can satisfy the required order.
207    /// The `prefix` refers to optionally matching columns of the index
208    /// It is unordered initially.
209    /// If any are used, we will return the fixed `Order` prefix.
210    pub fn indexes_satisfy_order_with_prefix(
211        &self,
212        required_order: &Order,
213        prefix: &HashSet<ColumnOrder>,
214    ) -> Vec<(&Arc<TableIndex>, Order)> {
215        let output_col_map = self
216            .output_col_idx()
217            .iter()
218            .cloned()
219            .enumerate()
220            .map(|(id, col)| (col, id))
221            .collect::<BTreeMap<_, _>>();
222        let unmatched_idx = output_col_map.len();
223        let mut index_catalog_and_orders = vec![];
224        for index in self.table_indexes() {
225            let s2p_mapping = index.secondary_to_primary_mapping();
226            let index_orders: Vec<ColumnOrder> = index
227                .index_table
228                .pk()
229                .iter()
230                .map(|idx_item| {
231                    let idx = match s2p_mapping.get(&idx_item.column_index) {
232                        Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
233                        // After we support index on expressions, we need to handle the case where the column is not in the `s2p_mapping`.
234                        None => unmatched_idx,
235                    };
236                    ColumnOrder::new(idx, idx_item.order_type)
237                })
238                .collect();
239
240            let mut index_orders_iter = index_orders.into_iter().peekable();
241
242            // First check the prefix
243            let fixed_prefix = {
244                let mut fixed_prefix = vec![];
245                loop {
246                    match index_orders_iter.peek() {
247                        Some(index_col_order) if prefix.contains(index_col_order) => {
248                            let index_col_order = index_orders_iter.next().unwrap();
249                            fixed_prefix.push(index_col_order);
250                        }
251                        _ => break,
252                    }
253                }
254                Order {
255                    column_orders: fixed_prefix,
256                }
257            };
258
259            let remaining_orders = Order {
260                column_orders: index_orders_iter.collect(),
261            };
262            if remaining_orders.satisfies(required_order) {
263                index_catalog_and_orders.push((index, fixed_prefix));
264            }
265        }
266        index_catalog_and_orders
267    }
268
269    /// If the index can cover the scan, transform it to the index scan.
270    pub fn to_index_scan_if_index_covered(&self, index: &Arc<TableIndex>) -> Option<LogicalScan> {
271        let p2s_mapping = index.primary_to_secondary_mapping();
272        if self
273            .required_col_idx()
274            .iter()
275            .all(|x| p2s_mapping.contains_key(x))
276        {
277            let index_scan = self.core.to_index_scan(
278                index.index_table.clone(),
279                p2s_mapping,
280                index.function_mapping(),
281            );
282            Some(index_scan.into())
283        } else {
284            None
285        }
286    }
287
288    pub fn primary_key(&self) -> &[ColumnOrder] {
289        self.core.primary_key()
290    }
291
292    /// a vec of `InputRef` corresponding to `output_col_idx`, which can represent a pulled project.
293    fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
294        self.output_col_idx()
295            .iter()
296            .enumerate()
297            .map(|(i, &col_idx)| {
298                InputRef::new(
299                    i,
300                    self.table().columns[col_idx].column_desc.data_type.clone(),
301                )
302                .into()
303            })
304            .collect_vec()
305    }
306
307    /// Undo predicate push down when predicate in scan is not supported.
308    pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
309        let mut predicate = self.predicate().clone();
310        if predicate.always_true() {
311            return (self.core.clone(), Condition::true_cond(), None);
312        }
313
314        let mut inverse_mapping = {
315            let mapping = ColIndexMapping::new(
316                self.required_col_idx().iter().map(|i| Some(*i)).collect(),
317                self.table().columns.len(),
318            );
319            // Since `required_col_idx` mapping is not invertible, we need to inverse manually.
320            let mut inverse_map = vec![None; mapping.target_size()];
321            for (src, dst) in mapping.mapping_pairs() {
322                inverse_map[dst] = Some(src);
323            }
324            ColIndexMapping::new(inverse_map, mapping.source_size())
325        };
326
327        predicate = predicate.rewrite_expr(&mut inverse_mapping);
328
329        let scan_without_predicate = generic::TableScan::new(
330            self.required_col_idx().clone(),
331            self.core.table_catalog.clone(),
332            self.table_indexes().to_vec(),
333            self.vector_indexes().to_vec(),
334            self.ctx(),
335            Condition::true_cond(),
336            self.as_of(),
337        );
338        let project_expr = if self.required_col_idx() != self.output_col_idx() {
339            Some(self.output_idx_to_input_ref())
340        } else {
341            None
342        };
343        (scan_without_predicate, predicate, project_expr)
344    }
345
346    fn clone_with_predicate(&self, predicate: Condition) -> Self {
347        generic::TableScan::new_inner(
348            self.output_col_idx().clone(),
349            self.table().clone(),
350            self.table_indexes().to_vec(),
351            self.vector_indexes().to_vec(),
352            self.base.ctx(),
353            predicate,
354            self.as_of(),
355            self.core.table_scan_stream_key().map(ToOwned::to_owned),
356        )
357        .into()
358    }
359
360    pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
361        generic::TableScan::new_inner(
362            output_col_idx,
363            self.core.table_catalog.clone(),
364            self.table_indexes().to_vec(),
365            self.vector_indexes().to_vec(),
366            self.base.ctx(),
367            self.predicate().clone(),
368            self.as_of(),
369            self.core.table_scan_stream_key().map(ToOwned::to_owned),
370        )
371        .into()
372    }
373
374    pub fn output_col_idx(&self) -> &Vec<usize> {
375        &self.core.output_col_idx
376    }
377
378    pub fn required_col_idx(&self) -> &Vec<usize> {
379        &self.core.required_col_idx
380    }
381}
382
383impl_plan_tree_node_for_leaf! { Logical, LogicalScan}
384
385impl Distill for LogicalScan {
386    fn distill<'a>(&self) -> XmlNode<'a> {
387        let verbose = self.base.ctx().is_explain_verbose();
388        let mut vec = Vec::with_capacity(5);
389        vec.push(("table", Pretty::from(self.table_name().to_owned())));
390        let key_is_columns =
391            self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
392        let key = if key_is_columns {
393            "columns"
394        } else {
395            "output_columns"
396        };
397        vec.push((key, self.core.columns_pretty(verbose)));
398        if !key_is_columns {
399            vec.push((
400                "required_columns",
401                Pretty::Array(
402                    self.required_col_idx()
403                        .iter()
404                        .map(|i| {
405                            let col_name = &self.table().columns[*i].name;
406                            Pretty::from(if verbose {
407                                format!("{}.{}", self.table_name(), col_name)
408                            } else {
409                                col_name.clone()
410                            })
411                        })
412                        .collect(),
413                ),
414            ));
415        }
416
417        if !self.predicate().always_true() {
418            let input_schema = self.core.fields_pretty_schema();
419            vec.push((
420                "predicate",
421                Pretty::display(&ConditionDisplay {
422                    condition: self.predicate(),
423                    input_schema: &input_schema,
424                }),
425            ))
426        }
427
428        if self.table_cardinality() != Cardinality::unknown() {
429            vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
430        }
431
432        childless_record("LogicalScan", vec)
433    }
434}
435
436impl ColPrunable for LogicalScan {
437    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
438        let output_col_idx: Vec<usize> = required_cols
439            .iter()
440            .map(|i| self.required_col_idx()[*i])
441            .collect();
442        assert!(
443            output_col_idx
444                .iter()
445                .all(|i| self.output_col_idx().contains(i))
446        );
447
448        self.clone_with_output_indices(output_col_idx).into()
449    }
450}
451
452impl ExprRewritable<Logical> for LogicalScan {
453    fn has_rewritable_expr(&self) -> bool {
454        true
455    }
456
457    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
458        let mut core = self.core.clone();
459        core.rewrite_exprs(r);
460        Self {
461            base: self.base.clone_with_new_plan_id(),
462            core,
463        }
464        .into()
465    }
466}
467
468impl ExprVisitable for LogicalScan {
469    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
470        self.core.visit_exprs(v);
471    }
472}
473
474impl PredicatePushdown for LogicalScan {
475    fn predicate_pushdown(
476        &self,
477        mut predicate: Condition,
478        _ctx: &mut PredicatePushdownContext,
479    ) -> PlanRef {
480        // If the predicate contains `CorrelatedInputRef` or `now()`. We don't push down.
481        // This case could come from the predicate push down before the subquery unnesting.
482        struct HasCorrelated {
483            has: bool,
484        }
485        impl ExprVisitor for HasCorrelated {
486            fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
487                self.has = true;
488            }
489        }
490        let non_pushable_predicate: Vec<_> = predicate
491            .conjunctions
492            .extract_if(.., |expr| {
493                if expr.count_nows() > 0 {
494                    true
495                } else {
496                    let mut visitor = HasCorrelated { has: false };
497                    visitor.visit_expr(expr);
498                    visitor.has
499                }
500            })
501            .collect();
502        let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
503            self.output_col_idx().iter().map(|i| Some(*i)).collect(),
504            self.table().columns.len(),
505        ));
506        if non_pushable_predicate.is_empty() {
507            self.clone_with_predicate(predicate.and(self.predicate().clone()))
508                .into()
509        } else {
510            LogicalFilter::create(
511                self.clone_with_predicate(predicate.and(self.predicate().clone()))
512                    .into(),
513                Condition {
514                    conjunctions: non_pushable_predicate,
515                },
516            )
517        }
518    }
519}
520
521impl LogicalScan {
522    fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
523        if self.predicate().always_true() {
524            required_order
525                .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
526        } else {
527            let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
528                self.table(),
529                self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
530            )?;
531            let mut scan = self.clone();
532            scan.core.predicate = predicate; // We want to keep `required_col_idx` unchanged, so do not call `clone_with_predicate`.
533
534            let plan: BatchPlanRef = if scan.core.predicate.always_false() {
535                LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
536            } else {
537                let (scan, predicate, project_expr) = scan.predicate_pull_up();
538
539                let mut plan: BatchPlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
540                if !predicate.always_true() {
541                    plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
542                }
543                if let Some(exprs) = project_expr {
544                    plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
545                }
546                plan
547            };
548
549            assert_eq!(plan.schema(), self.schema());
550            required_order.enforce_if_not_satisfies(plan)
551        }
552    }
553
554    // For every index, check if the order of the index satisfies the required_order
555    // If yes, use an index scan
556    fn use_index_scan_if_order_is_satisfied(
557        &self,
558        required_order: &Order,
559    ) -> Option<Result<BatchPlanRef>> {
560        if required_order.column_orders.is_empty() {
561            return None;
562        }
563
564        let order_satisfied_index = self.indexes_satisfy_order(required_order);
565        for index in order_satisfied_index {
566            if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
567                return Some(index_scan.to_batch());
568            }
569        }
570
571        None
572    }
573}
574
575impl ToBatch for LogicalScan {
576    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
577        self.to_batch_with_order_required(&Order::any())
578    }
579
580    fn to_batch_with_order_required(
581        &self,
582        required_order: &Order,
583    ) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
584        let new = self.clone_with_predicate(self.predicate().clone());
585
586        if !new.table_indexes().is_empty()
587            && self
588                .base
589                .ctx()
590                .session_ctx()
591                .config()
592                .enable_index_selection()
593        {
594            let index_selection_rule = IndexSelectionRule::create();
595            if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
596                if let Some(scan) = applied.as_logical_scan() {
597                    // covering index
598                    return required_order.enforce_if_not_satisfies(scan.to_batch()?);
599                } else if let Some(join) = applied.as_logical_join() {
600                    // index lookup join
601                    return required_order
602                        .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
603                } else {
604                    unreachable!();
605                }
606            } else {
607                // Try to make use of index if it satisfies the required order
608                if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
609                    return plan_ref;
610                }
611            }
612        }
613        new.to_batch_inner_with_required(required_order)
614    }
615}
616
617impl ToStream for LogicalScan {
618    fn to_stream(
619        &self,
620        ctx: &mut ToStreamContext,
621    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
622        if self.predicate().always_true() {
623            if self.core.cross_database() && ctx.backfill_type() == BackfillType::UpstreamOnly {
624                return Err(ErrorCode::NotSupported(
625                    "We currently do not support cross database scan in upstream only mode."
626                        .to_owned(),
627                    "Please ensure the source table is in the same database.".to_owned(),
628                )
629                .into());
630            }
631
632            Ok(StreamTableScan::new_with_stream_scan_type(
633                self.core.clone(),
634                ctx.backfill_type().to_stream_scan_type(),
635            )
636            .into())
637        } else {
638            if ctx.backfill_type() == BackfillType::SnapshotBackfill {
639                let (scan_ranges, _residual) = self.predicate().clone().split_to_scan_ranges(
640                    self.table(),
641                    self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
642                )?;
643
644                if scan_ranges.len() == 1 && !scan_ranges[0].is_full_table_scan() {
645                    let (core, predicate, project_expr) = self.predicate_pull_up();
646                    let scan = StreamTableScan::new_with_scan_range(
647                        core,
648                        ctx.backfill_type().to_stream_scan_type(),
649                        Some(scan_ranges.into_iter().next().unwrap()),
650                    );
651                    LogicalFilter::check_stream_predicate(&predicate)?;
652
653                    let mut plan: crate::optimizer::plan_node::StreamPlanRef =
654                        StreamFilter::new(generic::Filter::new(predicate, scan.into())).into();
655                    if let Some(exprs) = project_expr {
656                        plan = StreamProject::new(generic::Project::new(exprs, plan)).into();
657                    }
658                    return Ok(plan);
659                }
660            }
661
662            let (scan, predicate, project_expr) = self.predicate_pull_up();
663            let mut plan = LogicalFilter::create(scan.into(), predicate);
664            if let Some(exprs) = project_expr {
665                plan = LogicalProject::create(plan, exprs)
666            }
667            plan.to_stream(ctx)
668        }
669    }
670
671    fn logical_rewrite_for_stream(
672        &self,
673        ctx: &mut RewriteStreamContext,
674    ) -> Result<(PlanRef, ColIndexMapping)> {
675        let mut output_col_idx = self.output_col_idx().clone();
676        let original_len = output_col_idx.len();
677
678        // Ensure pk columns are in the output.
679        for idx in self.table().pk.iter().map(|c| c.column_index) {
680            if !output_col_idx.contains(&idx) {
681                output_col_idx.push(idx);
682            }
683        }
684        // Ensure catalog stream-key columns are in the output for stream planning.
685        for idx in self.table().stream_key() {
686            if !output_col_idx.contains(&idx) {
687                output_col_idx.push(idx);
688            }
689        }
690
691        let (mut scan, col_index_mapping) = if output_col_idx.len() == original_len {
692            (self.clone(), ColIndexMapping::identity(self.schema().len()))
693        } else {
694            let new_len = output_col_idx.len();
695            (
696                self.clone_with_output_indices(output_col_idx),
697                ColIndexMapping::identity_or_none(self.schema().len(), new_len),
698            )
699        };
700
701        if matches!(ctx.backfill_type(), BackfillType::SnapshotBackfill)
702            || self.core.cross_database()
703        {
704            // Snapshot and cross-database backfill must use the upstream table primary key while
705            // planning operators above the scan, before `StreamTableScan` is built. Other scan
706            // types keep the logical stream key here so they preserve the original
707            // normal-backfill behavior.
708            scan.core.extend_table_scan_stream_key_with_primary_key();
709            scan.base = PlanBase::new_logical_with_core(&scan.core);
710        }
711        Ok((scan.into(), col_index_mapping))
712    }
713
714    fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
715        if columns.is_empty() {
716            return None;
717        }
718        let enable_index_selection = self
719            .core
720            .ctx()
721            .session_ctx()
722            .config()
723            .enable_index_selection();
724        let has_indexes = !self.table_indexes().is_empty();
725        let primary_order = self.get_out_column_index_order();
726        let primary_dist_key_satisfied = self
727            .distribution_key()
728            .is_some_and(|dist_key| dist_key.iter().all(|k| columns.contains(k)));
729        let orders = if columns.len() <= 3 {
730            OrderType::all()
731        } else {
732            // Limit the number of order type combinations to avoid explosion.
733            // For more than 3 columns, we only consider ascending nulls last and descending.
734            // Since by default, indexes are created with ascending nulls last.
735            // This is a heuristic to reduce the search space.
736            vec![OrderType::ascending_nulls_last(), OrderType::descending()]
737        };
738        for order_type_combo in columns
739            .iter()
740            .map(|&col| orders.iter().map(move |ot| ColumnOrder::new(col, *ot)))
741            .multi_cartesian_product()
742            .take(256)
743        // limit the number of combinations
744        {
745            let required_order = Order {
746                column_orders: order_type_combo,
747            };
748
749            if primary_dist_key_satisfied && primary_order.satisfies(&required_order) {
750                return Some(self.clone().into());
751            }
752
753            if !enable_index_selection || !has_indexes {
754                continue;
755            }
756
757            let order_satisfied_index = self.indexes_satisfy_order(&required_order);
758            for index in order_satisfied_index {
759                if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
760                    // The selected index's distribution key must be the subset the locality columns.
761                    // Because index's stream key is [distribution key] + [primary table's primary key].
762                    // For streaming queries, we have to ensure any updates ordering (U-/U+) isn't disturbed
763                    // after the later shuffle introduced by the locality operator,
764                    // so we have to ensure the distribution key of the index scan is the subset of the locality columns.
765                    if let Some(dist_key) = index_scan.distribution_key()
766                        && dist_key.iter().all(|k| columns.contains(k))
767                    {
768                        return Some(index_scan.into());
769                    }
770                }
771            }
772        }
773        None
774    }
775}