risingwave_frontend/optimizer/plan_node/
logical_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnDesc, Schema};
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::AsOf;
23
24use super::generic::{GenericPlanNode, GenericPlanRef};
25use super::utils::{Distill, childless_record};
26use super::{
27    BackfillType, BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical,
28    LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamTableScan, ToBatch,
29    ToStream, generic,
30};
31use crate::TableCatalog;
32use crate::binder::BoundBaseTable;
33use crate::catalog::ColumnId;
34use crate::catalog::index_catalog::{IndexType, TableIndex, VectorIndex};
35use crate::error::{ErrorCode, Result};
36use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
37use crate::optimizer::ApplyResult;
38use crate::optimizer::optimizer_context::OptimizerContextRef;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::plan_node_meta::AnyPlanNodeMeta;
41use crate::optimizer::plan_node::{
42    BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
43    PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
44};
45use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
46use crate::optimizer::rule::IndexSelectionRule;
47use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
48
49/// `LogicalScan` returns contents of a table or other equivalent object
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct LogicalScan {
52    pub base: PlanBase<Logical>,
53    core: generic::TableScan,
54}
55
56impl From<generic::TableScan> for LogicalScan {
57    fn from(core: generic::TableScan) -> Self {
58        let base = PlanBase::new_logical_with_core(&core);
59        Self { base, core }
60    }
61}
62
63impl From<generic::TableScan> for PlanRef {
64    fn from(core: generic::TableScan) -> Self {
65        LogicalScan::from(core).into()
66    }
67}
68
69impl GenericPlanRef for LogicalScan {
70    fn id(&self) -> PlanNodeId {
71        self.plan_base().id()
72    }
73
74    fn schema(&self) -> &Schema {
75        self.plan_base().schema()
76    }
77
78    fn stream_key(&self) -> Option<&[usize]> {
79        self.plan_base().stream_key()
80    }
81
82    fn ctx(&self) -> OptimizerContextRef {
83        self.plan_base().ctx()
84    }
85
86    fn functional_dependency(&self) -> &FunctionalDependencySet {
87        self.plan_base().functional_dependency()
88    }
89}
90
91impl LogicalScan {
92    /// Create a [`LogicalScan`] node. Used by planner.
93    pub fn create(
94        table_catalog: Arc<TableCatalog>,
95        ctx: OptimizerContextRef,
96        as_of: Option<AsOf>,
97    ) -> Self {
98        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
99        generic::TableScan::new(
100            output_col_idx,
101            table_catalog,
102            vec![],
103            vec![],
104            ctx,
105            Condition::true_cond(),
106            as_of,
107        )
108        .into()
109    }
110
111    pub fn from_base_table(
112        base_table: &BoundBaseTable,
113        ctx: OptimizerContextRef,
114        as_of: Option<AsOf>,
115    ) -> Self {
116        let table_catalog = base_table.table_catalog.clone();
117        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
118        let mut table_indexes = vec![];
119        let mut vector_indexes = vec![];
120        for index in &base_table.table_indexes {
121            match &index.index_type {
122                IndexType::Table(index) => table_indexes.push(index.clone()),
123                IndexType::Vector(index) => vector_indexes.push(index.clone()),
124            }
125        }
126        generic::TableScan::new(
127            output_col_idx,
128            table_catalog,
129            table_indexes,
130            vector_indexes,
131            ctx,
132            Condition::true_cond(),
133            as_of,
134        )
135        .into()
136    }
137
138    pub fn table_name(&self) -> &str {
139        &self.core.table_catalog.name
140    }
141
142    pub fn as_of(&self) -> Option<AsOf> {
143        self.core.as_of.clone()
144    }
145
146    /// The cardinality of the table **without** applying the predicate.
147    pub fn table_cardinality(&self) -> Cardinality {
148        self.core.table_catalog.cardinality
149    }
150
151    pub fn table(&self) -> &Arc<TableCatalog> {
152        &self.core.table_catalog
153    }
154
155    /// Get the descs of the output columns.
156    pub fn column_descs(&self) -> Vec<ColumnDesc> {
157        self.core.column_descs()
158    }
159
160    /// Get the ids of the output columns.
161    pub fn output_column_ids(&self) -> Vec<ColumnId> {
162        self.core.output_column_ids()
163    }
164
165    /// Get all table indexes on this table
166    pub fn table_indexes(&self) -> &[Arc<TableIndex>] {
167        &self.core.table_indexes
168    }
169
170    /// Get all vector indexes on this table
171    pub fn vector_indexes(&self) -> &[Arc<VectorIndex>] {
172        &self.core.vector_indexes
173    }
174
175    /// Get the logical scan's filter predicate
176    pub fn predicate(&self) -> &Condition {
177        &self.core.predicate
178    }
179
180    /// Return indices of fields the output is ordered by and
181    /// corresponding direction
182    pub fn get_out_column_index_order(&self) -> Order {
183        self.core.get_out_column_index_order()
184    }
185
186    pub fn distribution_key(&self) -> Option<Vec<usize>> {
187        self.core.distribution_key()
188    }
189
190    pub fn watermark_columns(&self) -> WatermarkColumns {
191        self.core.watermark_columns()
192    }
193
194    pub fn cross_database(&self) -> bool {
195        self.core.cross_database()
196    }
197
198    /// Return indexes can satisfy the required order.
199    pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Arc<TableIndex>> {
200        self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
201            .into_iter()
202            .map(|(index, _)| index)
203            .collect()
204    }
205
206    /// Return indexes can satisfy the required order.
207    /// The `prefix` refers to optionally matching columns of the index
208    /// It is unordered initially.
209    /// If any are used, we will return the fixed `Order` prefix.
210    pub fn indexes_satisfy_order_with_prefix(
211        &self,
212        required_order: &Order,
213        prefix: &HashSet<ColumnOrder>,
214    ) -> Vec<(&Arc<TableIndex>, Order)> {
215        let output_col_map = self
216            .output_col_idx()
217            .iter()
218            .cloned()
219            .enumerate()
220            .map(|(id, col)| (col, id))
221            .collect::<BTreeMap<_, _>>();
222        let unmatched_idx = output_col_map.len();
223        let mut index_catalog_and_orders = vec![];
224        for index in self.table_indexes() {
225            let s2p_mapping = index.secondary_to_primary_mapping();
226            let index_orders: Vec<ColumnOrder> = index
227                .index_table
228                .pk()
229                .iter()
230                .map(|idx_item| {
231                    let idx = match s2p_mapping.get(&idx_item.column_index) {
232                        Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
233                        // After we support index on expressions, we need to handle the case where the column is not in the `s2p_mapping`.
234                        None => unmatched_idx,
235                    };
236                    ColumnOrder::new(idx, idx_item.order_type)
237                })
238                .collect();
239
240            let mut index_orders_iter = index_orders.into_iter().peekable();
241
242            // First check the prefix
243            let fixed_prefix = {
244                let mut fixed_prefix = vec![];
245                loop {
246                    match index_orders_iter.peek() {
247                        Some(index_col_order) if prefix.contains(index_col_order) => {
248                            let index_col_order = index_orders_iter.next().unwrap();
249                            fixed_prefix.push(index_col_order);
250                        }
251                        _ => break,
252                    }
253                }
254                Order {
255                    column_orders: fixed_prefix,
256                }
257            };
258
259            let remaining_orders = Order {
260                column_orders: index_orders_iter.collect(),
261            };
262            if remaining_orders.satisfies(required_order) {
263                index_catalog_and_orders.push((index, fixed_prefix));
264            }
265        }
266        index_catalog_and_orders
267    }
268
269    /// If the index can cover the scan, transform it to the index scan.
270    pub fn to_index_scan_if_index_covered(&self, index: &Arc<TableIndex>) -> Option<LogicalScan> {
271        let p2s_mapping = index.primary_to_secondary_mapping();
272        if self
273            .required_col_idx()
274            .iter()
275            .all(|x| p2s_mapping.contains_key(x))
276        {
277            let index_scan = self.core.to_index_scan(
278                index.index_table.clone(),
279                p2s_mapping,
280                index.function_mapping(),
281            );
282            Some(index_scan.into())
283        } else {
284            None
285        }
286    }
287
288    pub fn primary_key(&self) -> &[ColumnOrder] {
289        self.core.primary_key()
290    }
291
292    /// a vec of `InputRef` corresponding to `output_col_idx`, which can represent a pulled project.
293    fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
294        self.output_col_idx()
295            .iter()
296            .enumerate()
297            .map(|(i, &col_idx)| {
298                InputRef::new(
299                    i,
300                    self.table().columns[col_idx].column_desc.data_type.clone(),
301                )
302                .into()
303            })
304            .collect_vec()
305    }
306
307    /// Undo predicate push down when predicate in scan is not supported.
308    pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
309        let mut predicate = self.predicate().clone();
310        if predicate.always_true() {
311            return (self.core.clone(), Condition::true_cond(), None);
312        }
313
314        let mut inverse_mapping = {
315            let mapping = ColIndexMapping::new(
316                self.required_col_idx().iter().map(|i| Some(*i)).collect(),
317                self.table().columns.len(),
318            );
319            // Since `required_col_idx` mapping is not invertible, we need to inverse manually.
320            let mut inverse_map = vec![None; mapping.target_size()];
321            for (src, dst) in mapping.mapping_pairs() {
322                inverse_map[dst] = Some(src);
323            }
324            ColIndexMapping::new(inverse_map, mapping.source_size())
325        };
326
327        predicate = predicate.rewrite_expr(&mut inverse_mapping);
328
329        let scan_without_predicate = generic::TableScan::new(
330            self.required_col_idx().clone(),
331            self.core.table_catalog.clone(),
332            self.table_indexes().to_vec(),
333            self.vector_indexes().to_vec(),
334            self.ctx(),
335            Condition::true_cond(),
336            self.as_of(),
337        );
338        let project_expr = if self.required_col_idx() != self.output_col_idx() {
339            Some(self.output_idx_to_input_ref())
340        } else {
341            None
342        };
343        (scan_without_predicate, predicate, project_expr)
344    }
345
346    fn clone_with_predicate(&self, predicate: Condition) -> Self {
347        generic::TableScan::new_inner(
348            self.output_col_idx().clone(),
349            self.table().clone(),
350            self.table_indexes().to_vec(),
351            self.vector_indexes().to_vec(),
352            self.base.ctx(),
353            predicate,
354            self.as_of(),
355        )
356        .into()
357    }
358
359    pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
360        generic::TableScan::new_inner(
361            output_col_idx,
362            self.core.table_catalog.clone(),
363            self.table_indexes().to_vec(),
364            self.vector_indexes().to_vec(),
365            self.base.ctx(),
366            self.predicate().clone(),
367            self.as_of(),
368        )
369        .into()
370    }
371
372    pub fn output_col_idx(&self) -> &Vec<usize> {
373        &self.core.output_col_idx
374    }
375
376    pub fn required_col_idx(&self) -> &Vec<usize> {
377        &self.core.required_col_idx
378    }
379}
380
381impl_plan_tree_node_for_leaf! { Logical, LogicalScan}
382
383impl Distill for LogicalScan {
384    fn distill<'a>(&self) -> XmlNode<'a> {
385        let verbose = self.base.ctx().is_explain_verbose();
386        let mut vec = Vec::with_capacity(5);
387        vec.push(("table", Pretty::from(self.table_name().to_owned())));
388        let key_is_columns =
389            self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
390        let key = if key_is_columns {
391            "columns"
392        } else {
393            "output_columns"
394        };
395        vec.push((key, self.core.columns_pretty(verbose)));
396        if !key_is_columns {
397            vec.push((
398                "required_columns",
399                Pretty::Array(
400                    self.required_col_idx()
401                        .iter()
402                        .map(|i| {
403                            let col_name = &self.table().columns[*i].name;
404                            Pretty::from(if verbose {
405                                format!("{}.{}", self.table_name(), col_name)
406                            } else {
407                                col_name.clone()
408                            })
409                        })
410                        .collect(),
411                ),
412            ));
413        }
414
415        if !self.predicate().always_true() {
416            let input_schema = self.core.fields_pretty_schema();
417            vec.push((
418                "predicate",
419                Pretty::display(&ConditionDisplay {
420                    condition: self.predicate(),
421                    input_schema: &input_schema,
422                }),
423            ))
424        }
425
426        if self.table_cardinality() != Cardinality::unknown() {
427            vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
428        }
429
430        childless_record("LogicalScan", vec)
431    }
432}
433
434impl ColPrunable for LogicalScan {
435    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
436        let output_col_idx: Vec<usize> = required_cols
437            .iter()
438            .map(|i| self.required_col_idx()[*i])
439            .collect();
440        assert!(
441            output_col_idx
442                .iter()
443                .all(|i| self.output_col_idx().contains(i))
444        );
445
446        self.clone_with_output_indices(output_col_idx).into()
447    }
448}
449
450impl ExprRewritable<Logical> for LogicalScan {
451    fn has_rewritable_expr(&self) -> bool {
452        true
453    }
454
455    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
456        let mut core = self.core.clone();
457        core.rewrite_exprs(r);
458        Self {
459            base: self.base.clone_with_new_plan_id(),
460            core,
461        }
462        .into()
463    }
464}
465
466impl ExprVisitable for LogicalScan {
467    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
468        self.core.visit_exprs(v);
469    }
470}
471
472impl PredicatePushdown for LogicalScan {
473    fn predicate_pushdown(
474        &self,
475        mut predicate: Condition,
476        _ctx: &mut PredicatePushdownContext,
477    ) -> PlanRef {
478        // If the predicate contains `CorrelatedInputRef` or `now()`. We don't push down.
479        // This case could come from the predicate push down before the subquery unnesting.
480        struct HasCorrelated {
481            has: bool,
482        }
483        impl ExprVisitor for HasCorrelated {
484            fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
485                self.has = true;
486            }
487        }
488        let non_pushable_predicate: Vec<_> = predicate
489            .conjunctions
490            .extract_if(.., |expr| {
491                if expr.count_nows() > 0 {
492                    true
493                } else {
494                    let mut visitor = HasCorrelated { has: false };
495                    visitor.visit_expr(expr);
496                    visitor.has
497                }
498            })
499            .collect();
500        let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
501            self.output_col_idx().iter().map(|i| Some(*i)).collect(),
502            self.table().columns.len(),
503        ));
504        if non_pushable_predicate.is_empty() {
505            self.clone_with_predicate(predicate.and(self.predicate().clone()))
506                .into()
507        } else {
508            LogicalFilter::create(
509                self.clone_with_predicate(predicate.and(self.predicate().clone()))
510                    .into(),
511                Condition {
512                    conjunctions: non_pushable_predicate,
513                },
514            )
515        }
516    }
517}
518
519impl LogicalScan {
520    fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
521        if self.predicate().always_true() {
522            required_order
523                .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
524        } else {
525            let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
526                self.table(),
527                self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
528            )?;
529            let mut scan = self.clone();
530            scan.core.predicate = predicate; // We want to keep `required_col_idx` unchanged, so do not call `clone_with_predicate`.
531
532            let plan: BatchPlanRef = if scan.core.predicate.always_false() {
533                LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
534            } else {
535                let (scan, predicate, project_expr) = scan.predicate_pull_up();
536
537                let mut plan: BatchPlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
538                if !predicate.always_true() {
539                    plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
540                }
541                if let Some(exprs) = project_expr {
542                    plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
543                }
544                plan
545            };
546
547            assert_eq!(plan.schema(), self.schema());
548            required_order.enforce_if_not_satisfies(plan)
549        }
550    }
551
552    // For every index, check if the order of the index satisfies the required_order
553    // If yes, use an index scan
554    fn use_index_scan_if_order_is_satisfied(
555        &self,
556        required_order: &Order,
557    ) -> Option<Result<BatchPlanRef>> {
558        if required_order.column_orders.is_empty() {
559            return None;
560        }
561
562        let order_satisfied_index = self.indexes_satisfy_order(required_order);
563        for index in order_satisfied_index {
564            if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
565                return Some(index_scan.to_batch());
566            }
567        }
568
569        None
570    }
571}
572
573impl ToBatch for LogicalScan {
574    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
575        self.to_batch_with_order_required(&Order::any())
576    }
577
578    fn to_batch_with_order_required(
579        &self,
580        required_order: &Order,
581    ) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
582        let new = self.clone_with_predicate(self.predicate().clone());
583
584        if !new.table_indexes().is_empty()
585            && self
586                .base
587                .ctx()
588                .session_ctx()
589                .config()
590                .enable_index_selection()
591        {
592            let index_selection_rule = IndexSelectionRule::create();
593            if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
594                if let Some(scan) = applied.as_logical_scan() {
595                    // covering index
596                    return required_order.enforce_if_not_satisfies(scan.to_batch()?);
597                } else if let Some(join) = applied.as_logical_join() {
598                    // index lookup join
599                    return required_order
600                        .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
601                } else {
602                    unreachable!();
603                }
604            } else {
605                // Try to make use of index if it satisfies the required order
606                if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
607                    return plan_ref;
608                }
609            }
610        }
611        new.to_batch_inner_with_required(required_order)
612    }
613}
614
615impl ToStream for LogicalScan {
616    fn to_stream(
617        &self,
618        ctx: &mut ToStreamContext,
619    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
620        if self.predicate().always_true() {
621            if self.core.cross_database() && ctx.backfill_type() == BackfillType::UpstreamOnly {
622                return Err(ErrorCode::NotSupported(
623                    "We currently do not support cross database scan in upstream only mode."
624                        .to_owned(),
625                    "Please ensure the source table is in the same database.".to_owned(),
626                )
627                .into());
628            }
629
630            Ok(StreamTableScan::new_with_stream_scan_type(
631                self.core.clone(),
632                ctx.backfill_type().to_stream_scan_type(),
633            )
634            .into())
635        } else {
636            let (scan, predicate, project_expr) = self.predicate_pull_up();
637            let mut plan = LogicalFilter::create(scan.into(), predicate);
638            if let Some(exprs) = project_expr {
639                plan = LogicalProject::create(plan, exprs)
640            }
641            plan.to_stream(ctx)
642        }
643    }
644
645    fn logical_rewrite_for_stream(
646        &self,
647        _ctx: &mut RewriteStreamContext,
648    ) -> Result<(PlanRef, ColIndexMapping)> {
649        match self.base.stream_key().is_none() {
650            true => {
651                let mut col_ids = HashSet::new();
652
653                for &idx in self.output_col_idx() {
654                    col_ids.insert(self.table().columns[idx].column_id);
655                }
656                let col_need_to_add = self
657                    .table()
658                    .pk
659                    .iter()
660                    .filter_map(|c| {
661                        if !col_ids.contains(&self.table().columns[c.column_index].column_id) {
662                            Some(c.column_index)
663                        } else {
664                            None
665                        }
666                    })
667                    .collect_vec();
668
669                let mut output_col_idx = self.output_col_idx().clone();
670                output_col_idx.extend(col_need_to_add);
671                let new_len = output_col_idx.len();
672                Ok((
673                    self.clone_with_output_indices(output_col_idx).into(),
674                    ColIndexMapping::identity_or_none(self.schema().len(), new_len),
675                ))
676            }
677            false => Ok((
678                self.clone().into(),
679                ColIndexMapping::identity(self.schema().len()),
680            )),
681        }
682    }
683
684    fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
685        if !self
686            .core
687            .ctx()
688            .session_ctx()
689            .config()
690            .enable_index_selection()
691        {
692            return None;
693        }
694        if columns.is_empty() {
695            return None;
696        }
697        if self.table_indexes().is_empty() {
698            return None;
699        }
700        let orders = if columns.len() <= 3 {
701            OrderType::all()
702        } else {
703            // Limit the number of order type combinations to avoid explosion.
704            // For more than 3 columns, we only consider ascending nulls last and descending.
705            // Since by default, indexes are created with ascending nulls last.
706            // This is a heuristic to reduce the search space.
707            vec![OrderType::ascending_nulls_last(), OrderType::descending()]
708        };
709        for order_type_combo in columns
710            .iter()
711            .map(|&col| orders.iter().map(move |ot| ColumnOrder::new(col, *ot)))
712            .multi_cartesian_product()
713            .take(256)
714        // limit the number of combinations
715        {
716            let required_order = Order {
717                column_orders: order_type_combo,
718            };
719
720            let order_satisfied_index = self.indexes_satisfy_order(&required_order);
721            for index in order_satisfied_index {
722                if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
723                    // The selected index's distribution key must be the subset the locality columns.
724                    // Because index's stream key is [distribution key] + [primary table's primary key].
725                    // For streaming queries, we have to ensure any updates ordering (U-/U+) isn't disturbed
726                    // after the later shuffle introduced by the locality operator,
727                    // so we have to ensure the distribution key of the index scan is the subset of the locality columns.
728                    if let Some(dist_key) = index_scan.distribution_key()
729                        && dist_key.iter().all(|k| columns.contains(k))
730                    {
731                        return Some(index_scan.into());
732                    }
733                }
734            }
735        }
736        None
737    }
738}