risingwave_frontend/optimizer/plan_node/
logical_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnDesc, Schema};
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_pb::stream_plan::StreamScanType;
23use risingwave_sqlparser::ast::AsOf;
24
25use super::generic::{GenericPlanNode, GenericPlanRef};
26use super::utils::{Distill, childless_record};
27use super::{
28    BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical,
29    LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamTableScan, ToBatch,
30    ToStream, generic,
31};
32use crate::TableCatalog;
33use crate::binder::BoundBaseTable;
34use crate::catalog::ColumnId;
35use crate::catalog::index_catalog::{IndexType, TableIndex, VectorIndex};
36use crate::error::{ErrorCode, Result};
37use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
38use crate::optimizer::ApplyResult;
39use crate::optimizer::optimizer_context::OptimizerContextRef;
40use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
41use crate::optimizer::plan_node::plan_node_meta::AnyPlanNodeMeta;
42use crate::optimizer::plan_node::{
43    BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
44    PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
45};
46use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
47use crate::optimizer::rule::IndexSelectionRule;
48use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
49
50/// `LogicalScan` returns contents of a table or other equivalent object
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct LogicalScan {
53    pub base: PlanBase<Logical>,
54    core: generic::TableScan,
55}
56
57impl From<generic::TableScan> for LogicalScan {
58    fn from(core: generic::TableScan) -> Self {
59        let base = PlanBase::new_logical_with_core(&core);
60        Self { base, core }
61    }
62}
63
64impl From<generic::TableScan> for PlanRef {
65    fn from(core: generic::TableScan) -> Self {
66        LogicalScan::from(core).into()
67    }
68}
69
70impl GenericPlanRef for LogicalScan {
71    fn id(&self) -> PlanNodeId {
72        self.plan_base().id()
73    }
74
75    fn schema(&self) -> &Schema {
76        self.plan_base().schema()
77    }
78
79    fn stream_key(&self) -> Option<&[usize]> {
80        self.plan_base().stream_key()
81    }
82
83    fn ctx(&self) -> OptimizerContextRef {
84        self.plan_base().ctx()
85    }
86
87    fn functional_dependency(&self) -> &FunctionalDependencySet {
88        self.plan_base().functional_dependency()
89    }
90}
91
92impl LogicalScan {
93    /// Create a [`LogicalScan`] node. Used by planner.
94    pub fn create(
95        table_catalog: Arc<TableCatalog>,
96        ctx: OptimizerContextRef,
97        as_of: Option<AsOf>,
98    ) -> Self {
99        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
100        generic::TableScan::new(
101            output_col_idx,
102            table_catalog,
103            vec![],
104            vec![],
105            ctx,
106            Condition::true_cond(),
107            as_of,
108        )
109        .into()
110    }
111
112    pub fn from_base_table(
113        base_table: &BoundBaseTable,
114        ctx: OptimizerContextRef,
115        as_of: Option<AsOf>,
116    ) -> Self {
117        let table_catalog = base_table.table_catalog.clone();
118        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
119        let mut table_indexes = vec![];
120        let mut vector_indexes = vec![];
121        for index in &base_table.table_indexes {
122            match &index.index_type {
123                IndexType::Table(index) => table_indexes.push(index.clone()),
124                IndexType::Vector(index) => vector_indexes.push(index.clone()),
125            }
126        }
127        generic::TableScan::new(
128            output_col_idx,
129            table_catalog,
130            table_indexes,
131            vector_indexes,
132            ctx,
133            Condition::true_cond(),
134            as_of,
135        )
136        .into()
137    }
138
139    pub fn table_name(&self) -> &str {
140        &self.core.table_catalog.name
141    }
142
143    pub fn as_of(&self) -> Option<AsOf> {
144        self.core.as_of.clone()
145    }
146
147    /// The cardinality of the table **without** applying the predicate.
148    pub fn table_cardinality(&self) -> Cardinality {
149        self.core.table_catalog.cardinality
150    }
151
152    pub fn table(&self) -> &Arc<TableCatalog> {
153        &self.core.table_catalog
154    }
155
156    /// Get the descs of the output columns.
157    pub fn column_descs(&self) -> Vec<ColumnDesc> {
158        self.core.column_descs()
159    }
160
161    /// Get the ids of the output columns.
162    pub fn output_column_ids(&self) -> Vec<ColumnId> {
163        self.core.output_column_ids()
164    }
165
166    /// Get all table indexes on this table
167    pub fn table_indexes(&self) -> &[Arc<TableIndex>] {
168        &self.core.table_indexes
169    }
170
171    /// Get all vector indexes on this table
172    pub fn vector_indexes(&self) -> &[Arc<VectorIndex>] {
173        &self.core.vector_indexes
174    }
175
176    /// Get the logical scan's filter predicate
177    pub fn predicate(&self) -> &Condition {
178        &self.core.predicate
179    }
180
181    /// Return indices of fields the output is ordered by and
182    /// corresponding direction
183    pub fn get_out_column_index_order(&self) -> Order {
184        self.core.get_out_column_index_order()
185    }
186
187    pub fn distribution_key(&self) -> Option<Vec<usize>> {
188        self.core.distribution_key()
189    }
190
191    pub fn watermark_columns(&self) -> WatermarkColumns {
192        self.core.watermark_columns()
193    }
194
195    /// Return indexes can satisfy the required order.
196    pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Arc<TableIndex>> {
197        self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
198            .into_iter()
199            .map(|(index, _)| index)
200            .collect()
201    }
202
203    /// Return indexes can satisfy the required order.
204    /// The `prefix` refers to optionally matching columns of the index
205    /// It is unordered initially.
206    /// If any are used, we will return the fixed `Order` prefix.
207    pub fn indexes_satisfy_order_with_prefix(
208        &self,
209        required_order: &Order,
210        prefix: &HashSet<ColumnOrder>,
211    ) -> Vec<(&Arc<TableIndex>, Order)> {
212        let output_col_map = self
213            .output_col_idx()
214            .iter()
215            .cloned()
216            .enumerate()
217            .map(|(id, col)| (col, id))
218            .collect::<BTreeMap<_, _>>();
219        let unmatched_idx = output_col_map.len();
220        let mut index_catalog_and_orders = vec![];
221        for index in self.table_indexes() {
222            let s2p_mapping = index.secondary_to_primary_mapping();
223            let index_orders: Vec<ColumnOrder> = index
224                .index_table
225                .pk()
226                .iter()
227                .map(|idx_item| {
228                    let idx = match s2p_mapping.get(&idx_item.column_index) {
229                        Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
230                        // After we support index on expressions, we need to handle the case where the column is not in the `s2p_mapping`.
231                        None => unmatched_idx,
232                    };
233                    ColumnOrder::new(idx, idx_item.order_type)
234                })
235                .collect();
236
237            let mut index_orders_iter = index_orders.into_iter().peekable();
238
239            // First check the prefix
240            let fixed_prefix = {
241                let mut fixed_prefix = vec![];
242                loop {
243                    match index_orders_iter.peek() {
244                        Some(index_col_order) if prefix.contains(index_col_order) => {
245                            let index_col_order = index_orders_iter.next().unwrap();
246                            fixed_prefix.push(index_col_order);
247                        }
248                        _ => break,
249                    }
250                }
251                Order {
252                    column_orders: fixed_prefix,
253                }
254            };
255
256            let remaining_orders = Order {
257                column_orders: index_orders_iter.collect(),
258            };
259            if remaining_orders.satisfies(required_order) {
260                index_catalog_and_orders.push((index, fixed_prefix));
261            }
262        }
263        index_catalog_and_orders
264    }
265
266    /// If the index can cover the scan, transform it to the index scan.
267    pub fn to_index_scan_if_index_covered(&self, index: &Arc<TableIndex>) -> Option<LogicalScan> {
268        let p2s_mapping = index.primary_to_secondary_mapping();
269        if self
270            .required_col_idx()
271            .iter()
272            .all(|x| p2s_mapping.contains_key(x))
273        {
274            let index_scan = self.core.to_index_scan(
275                index.index_table.clone(),
276                p2s_mapping,
277                index.function_mapping(),
278            );
279            Some(index_scan.into())
280        } else {
281            None
282        }
283    }
284
285    pub fn primary_key(&self) -> &[ColumnOrder] {
286        self.core.primary_key()
287    }
288
289    /// a vec of `InputRef` corresponding to `output_col_idx`, which can represent a pulled project.
290    fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
291        self.output_col_idx()
292            .iter()
293            .enumerate()
294            .map(|(i, &col_idx)| {
295                InputRef::new(
296                    i,
297                    self.table().columns[col_idx].column_desc.data_type.clone(),
298                )
299                .into()
300            })
301            .collect_vec()
302    }
303
304    /// Undo predicate push down when predicate in scan is not supported.
305    pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
306        let mut predicate = self.predicate().clone();
307        if predicate.always_true() {
308            return (self.core.clone(), Condition::true_cond(), None);
309        }
310
311        let mut inverse_mapping = {
312            let mapping = ColIndexMapping::new(
313                self.required_col_idx().iter().map(|i| Some(*i)).collect(),
314                self.table().columns.len(),
315            );
316            // Since `required_col_idx` mapping is not invertible, we need to inverse manually.
317            let mut inverse_map = vec![None; mapping.target_size()];
318            for (src, dst) in mapping.mapping_pairs() {
319                inverse_map[dst] = Some(src);
320            }
321            ColIndexMapping::new(inverse_map, mapping.source_size())
322        };
323
324        predicate = predicate.rewrite_expr(&mut inverse_mapping);
325
326        let scan_without_predicate = generic::TableScan::new(
327            self.required_col_idx().to_vec(),
328            self.core.table_catalog.clone(),
329            self.table_indexes().to_vec(),
330            self.vector_indexes().to_vec(),
331            self.ctx(),
332            Condition::true_cond(),
333            self.as_of(),
334        );
335        let project_expr = if self.required_col_idx() != self.output_col_idx() {
336            Some(self.output_idx_to_input_ref())
337        } else {
338            None
339        };
340        (scan_without_predicate, predicate, project_expr)
341    }
342
343    fn clone_with_predicate(&self, predicate: Condition) -> Self {
344        generic::TableScan::new_inner(
345            self.output_col_idx().to_vec(),
346            self.table().clone(),
347            self.table_indexes().to_vec(),
348            self.vector_indexes().to_vec(),
349            self.base.ctx().clone(),
350            predicate,
351            self.as_of(),
352        )
353        .into()
354    }
355
356    pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
357        generic::TableScan::new_inner(
358            output_col_idx,
359            self.core.table_catalog.clone(),
360            self.table_indexes().to_vec(),
361            self.vector_indexes().to_vec(),
362            self.base.ctx().clone(),
363            self.predicate().clone(),
364            self.as_of(),
365        )
366        .into()
367    }
368
369    pub fn output_col_idx(&self) -> &Vec<usize> {
370        &self.core.output_col_idx
371    }
372
373    pub fn required_col_idx(&self) -> &Vec<usize> {
374        &self.core.required_col_idx
375    }
376}
377
378impl_plan_tree_node_for_leaf! { Logical, LogicalScan}
379
380impl Distill for LogicalScan {
381    fn distill<'a>(&self) -> XmlNode<'a> {
382        let verbose = self.base.ctx().is_explain_verbose();
383        let mut vec = Vec::with_capacity(5);
384        vec.push(("table", Pretty::from(self.table_name().to_owned())));
385        let key_is_columns =
386            self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
387        let key = if key_is_columns {
388            "columns"
389        } else {
390            "output_columns"
391        };
392        vec.push((key, self.core.columns_pretty(verbose)));
393        if !key_is_columns {
394            vec.push((
395                "required_columns",
396                Pretty::Array(
397                    self.required_col_idx()
398                        .iter()
399                        .map(|i| {
400                            let col_name = &self.table().columns[*i].name;
401                            Pretty::from(if verbose {
402                                format!("{}.{}", self.table_name(), col_name)
403                            } else {
404                                col_name.clone()
405                            })
406                        })
407                        .collect(),
408                ),
409            ));
410        }
411
412        if !self.predicate().always_true() {
413            let input_schema = self.core.fields_pretty_schema();
414            vec.push((
415                "predicate",
416                Pretty::display(&ConditionDisplay {
417                    condition: self.predicate(),
418                    input_schema: &input_schema,
419                }),
420            ))
421        }
422
423        if self.table_cardinality() != Cardinality::unknown() {
424            vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
425        }
426
427        childless_record("LogicalScan", vec)
428    }
429}
430
431impl ColPrunable for LogicalScan {
432    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
433        let output_col_idx: Vec<usize> = required_cols
434            .iter()
435            .map(|i| self.required_col_idx()[*i])
436            .collect();
437        assert!(
438            output_col_idx
439                .iter()
440                .all(|i| self.output_col_idx().contains(i))
441        );
442
443        self.clone_with_output_indices(output_col_idx).into()
444    }
445}
446
447impl ExprRewritable<Logical> for LogicalScan {
448    fn has_rewritable_expr(&self) -> bool {
449        true
450    }
451
452    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
453        let mut core = self.core.clone();
454        core.rewrite_exprs(r);
455        Self {
456            base: self.base.clone_with_new_plan_id(),
457            core,
458        }
459        .into()
460    }
461}
462
463impl ExprVisitable for LogicalScan {
464    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
465        self.core.visit_exprs(v);
466    }
467}
468
469impl PredicatePushdown for LogicalScan {
470    fn predicate_pushdown(
471        &self,
472        mut predicate: Condition,
473        _ctx: &mut PredicatePushdownContext,
474    ) -> PlanRef {
475        // If the predicate contains `CorrelatedInputRef` or `now()`. We don't push down.
476        // This case could come from the predicate push down before the subquery unnesting.
477        struct HasCorrelated {
478            has: bool,
479        }
480        impl ExprVisitor for HasCorrelated {
481            fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
482                self.has = true;
483            }
484        }
485        let non_pushable_predicate: Vec<_> = predicate
486            .conjunctions
487            .extract_if(.., |expr| {
488                if expr.count_nows() > 0 {
489                    true
490                } else {
491                    let mut visitor = HasCorrelated { has: false };
492                    visitor.visit_expr(expr);
493                    visitor.has
494                }
495            })
496            .collect();
497        let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
498            self.output_col_idx().iter().map(|i| Some(*i)).collect(),
499            self.table().columns.len(),
500        ));
501        if non_pushable_predicate.is_empty() {
502            self.clone_with_predicate(predicate.and(self.predicate().clone()))
503                .into()
504        } else {
505            LogicalFilter::create(
506                self.clone_with_predicate(predicate.and(self.predicate().clone()))
507                    .into(),
508                Condition {
509                    conjunctions: non_pushable_predicate,
510                },
511            )
512        }
513    }
514}
515
516impl LogicalScan {
517    fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
518        if self.predicate().always_true() {
519            required_order
520                .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
521        } else {
522            let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
523                self.table(),
524                self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
525            )?;
526            let mut scan = self.clone();
527            scan.core.predicate = predicate; // We want to keep `required_col_idx` unchanged, so do not call `clone_with_predicate`.
528
529            let plan: BatchPlanRef = if scan.core.predicate.always_false() {
530                LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
531            } else {
532                let (scan, predicate, project_expr) = scan.predicate_pull_up();
533
534                let mut plan: BatchPlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
535                if !predicate.always_true() {
536                    plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
537                }
538                if let Some(exprs) = project_expr {
539                    plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
540                }
541                plan
542            };
543
544            assert_eq!(plan.schema(), self.schema());
545            required_order.enforce_if_not_satisfies(plan)
546        }
547    }
548
549    // For every index, check if the order of the index satisfies the required_order
550    // If yes, use an index scan
551    fn use_index_scan_if_order_is_satisfied(
552        &self,
553        required_order: &Order,
554    ) -> Option<Result<BatchPlanRef>> {
555        if required_order.column_orders.is_empty() {
556            return None;
557        }
558
559        let order_satisfied_index = self.indexes_satisfy_order(required_order);
560        for index in order_satisfied_index {
561            if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
562                return Some(index_scan.to_batch());
563            }
564        }
565
566        None
567    }
568}
569
570impl ToBatch for LogicalScan {
571    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
572        self.to_batch_with_order_required(&Order::any())
573    }
574
575    fn to_batch_with_order_required(
576        &self,
577        required_order: &Order,
578    ) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
579        let new = self.clone_with_predicate(self.predicate().clone());
580
581        if !new.table_indexes().is_empty()
582            && self
583                .base
584                .ctx()
585                .session_ctx()
586                .config()
587                .enable_index_selection()
588        {
589            let index_selection_rule = IndexSelectionRule::create();
590            if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
591                if let Some(scan) = applied.as_logical_scan() {
592                    // covering index
593                    return required_order.enforce_if_not_satisfies(scan.to_batch()?);
594                } else if let Some(join) = applied.as_logical_join() {
595                    // index lookup join
596                    return required_order
597                        .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
598                } else {
599                    unreachable!();
600                }
601            } else {
602                // Try to make use of index if it satisfies the required order
603                if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
604                    return plan_ref;
605                }
606            }
607        }
608        new.to_batch_inner_with_required(required_order)
609    }
610}
611
612impl ToStream for LogicalScan {
613    fn to_stream(
614        &self,
615        ctx: &mut ToStreamContext,
616    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
617        if self.predicate().always_true() {
618            if self.core.cross_database() && ctx.stream_scan_type() == StreamScanType::UpstreamOnly
619            {
620                return Err(ErrorCode::NotSupported(
621                    "We currently do not support cross database scan in upstream only mode."
622                        .to_owned(),
623                    "Please ensure the source table is in the same database.".to_owned(),
624                )
625                .into());
626            }
627
628            Ok(StreamTableScan::new_with_stream_scan_type(
629                self.core.clone(),
630                ctx.stream_scan_type(),
631            )
632            .into())
633        } else {
634            let (scan, predicate, project_expr) = self.predicate_pull_up();
635            let mut plan = LogicalFilter::create(scan.into(), predicate);
636            if let Some(exprs) = project_expr {
637                plan = LogicalProject::create(plan, exprs)
638            }
639            plan.to_stream(ctx)
640        }
641    }
642
643    fn logical_rewrite_for_stream(
644        &self,
645        _ctx: &mut RewriteStreamContext,
646    ) -> Result<(PlanRef, ColIndexMapping)> {
647        match self.base.stream_key().is_none() {
648            true => {
649                let mut col_ids = HashSet::new();
650
651                for &idx in self.output_col_idx() {
652                    col_ids.insert(self.table().columns[idx].column_id);
653                }
654                let col_need_to_add = self
655                    .table()
656                    .pk
657                    .iter()
658                    .filter_map(|c| {
659                        if !col_ids.contains(&self.table().columns[c.column_index].column_id) {
660                            Some(c.column_index)
661                        } else {
662                            None
663                        }
664                    })
665                    .collect_vec();
666
667                let mut output_col_idx = self.output_col_idx().clone();
668                output_col_idx.extend(col_need_to_add);
669                let new_len = output_col_idx.len();
670                Ok((
671                    self.clone_with_output_indices(output_col_idx).into(),
672                    ColIndexMapping::identity_or_none(self.schema().len(), new_len),
673                ))
674            }
675            false => Ok((
676                self.clone().into(),
677                ColIndexMapping::identity(self.schema().len()),
678            )),
679        }
680    }
681
682    fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
683        if !self
684            .core
685            .ctx()
686            .session_ctx()
687            .config()
688            .enable_index_selection()
689        {
690            return None;
691        }
692        if columns.is_empty() {
693            return None;
694        }
695        if self.table_indexes().is_empty() {
696            return None;
697        }
698        let orders = if columns.len() <= 3 {
699            OrderType::all()
700        } else {
701            // Limit the number of order type combinations to avoid explosion.
702            // For more than 3 columns, we only consider ascending nulls last and descending.
703            // Since by default, indexes are created with ascending nulls last.
704            // This is a heuristic to reduce the search space.
705            vec![OrderType::ascending_nulls_last(), OrderType::descending()]
706        };
707        for order_type_combo in columns
708            .iter()
709            .map(|&col| orders.iter().map(move |ot| ColumnOrder::new(col, *ot)))
710            .multi_cartesian_product()
711            .take(256)
712        // limit the number of combinations
713        {
714            let required_order = Order {
715                column_orders: order_type_combo,
716            };
717
718            let order_satisfied_index = self.indexes_satisfy_order(&required_order);
719            for index in order_satisfied_index {
720                if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
721                    return Some(index_scan.into());
722                }
723            }
724        }
725        None
726    }
727}