risingwave_frontend/optimizer/plan_node/
logical_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnDesc, Schema};
21use risingwave_common::util::sort_util::ColumnOrder;
22use risingwave_pb::stream_plan::StreamScanType;
23use risingwave_sqlparser::ast::AsOf;
24
25use super::generic::{GenericPlanNode, GenericPlanRef};
26use super::utils::{Distill, childless_record};
27use super::{
28    BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical,
29    LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamTableScan, ToBatch,
30    ToStream, generic,
31};
32use crate::TableCatalog;
33use crate::binder::BoundBaseTable;
34use crate::catalog::ColumnId;
35use crate::catalog::index_catalog::{IndexType, TableIndex};
36use crate::error::Result;
37use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
38use crate::optimizer::ApplyResult;
39use crate::optimizer::optimizer_context::OptimizerContextRef;
40use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
41use crate::optimizer::plan_node::plan_node_meta::AnyPlanNodeMeta;
42use crate::optimizer::plan_node::{
43    BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
44    PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
45};
46use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
47use crate::optimizer::rule::IndexSelectionRule;
48use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
49
50/// `LogicalScan` returns contents of a table or other equivalent object
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct LogicalScan {
53    pub base: PlanBase<Logical>,
54    core: generic::TableScan,
55}
56
57impl From<generic::TableScan> for LogicalScan {
58    fn from(core: generic::TableScan) -> Self {
59        let base = PlanBase::new_logical_with_core(&core);
60        Self { base, core }
61    }
62}
63
64impl From<generic::TableScan> for PlanRef {
65    fn from(core: generic::TableScan) -> Self {
66        LogicalScan::from(core).into()
67    }
68}
69
70impl GenericPlanRef for LogicalScan {
71    fn id(&self) -> PlanNodeId {
72        self.plan_base().id()
73    }
74
75    fn schema(&self) -> &Schema {
76        self.plan_base().schema()
77    }
78
79    fn stream_key(&self) -> Option<&[usize]> {
80        self.plan_base().stream_key()
81    }
82
83    fn ctx(&self) -> OptimizerContextRef {
84        self.plan_base().ctx()
85    }
86
87    fn functional_dependency(&self) -> &FunctionalDependencySet {
88        self.plan_base().functional_dependency()
89    }
90}
91
92impl LogicalScan {
93    /// Create a [`LogicalScan`] node. Used by planner.
94    pub fn create(
95        table_catalog: Arc<TableCatalog>,
96        ctx: OptimizerContextRef,
97        as_of: Option<AsOf>,
98    ) -> Self {
99        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
100        generic::TableScan::new(
101            output_col_idx,
102            table_catalog,
103            vec![],
104            ctx,
105            Condition::true_cond(),
106            as_of,
107        )
108        .into()
109    }
110
111    pub fn from_base_table(
112        base_table: &BoundBaseTable,
113        ctx: OptimizerContextRef,
114        as_of: Option<AsOf>,
115    ) -> Self {
116        let table_catalog = base_table.table_catalog.clone();
117        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
118        let mut table_indexes = vec![];
119        for index in &base_table.table_indexes {
120            match &index.index_type {
121                IndexType::Table(index) => table_indexes.push(index.clone()),
122            }
123        }
124        generic::TableScan::new(
125            output_col_idx,
126            table_catalog,
127            table_indexes,
128            ctx,
129            Condition::true_cond(),
130            as_of,
131        )
132        .into()
133    }
134
135    pub fn table_name(&self) -> &str {
136        &self.core.table_catalog.name
137    }
138
139    pub fn as_of(&self) -> Option<AsOf> {
140        self.core.as_of.clone()
141    }
142
143    /// The cardinality of the table **without** applying the predicate.
144    pub fn table_cardinality(&self) -> Cardinality {
145        self.core.table_catalog.cardinality
146    }
147
148    pub fn table(&self) -> &Arc<TableCatalog> {
149        &self.core.table_catalog
150    }
151
152    /// Get the descs of the output columns.
153    pub fn column_descs(&self) -> Vec<ColumnDesc> {
154        self.core.column_descs()
155    }
156
157    /// Get the ids of the output columns.
158    pub fn output_column_ids(&self) -> Vec<ColumnId> {
159        self.core.output_column_ids()
160    }
161
162    /// Get all table indexes on this table
163    pub fn table_indexes(&self) -> &[Arc<TableIndex>] {
164        &self.core.table_indexes
165    }
166
167    /// Get the logical scan's filter predicate
168    pub fn predicate(&self) -> &Condition {
169        &self.core.predicate
170    }
171
172    /// Return indices of fields the output is ordered by and
173    /// corresponding direction
174    pub fn get_out_column_index_order(&self) -> Order {
175        self.core.get_out_column_index_order()
176    }
177
178    pub fn distribution_key(&self) -> Option<Vec<usize>> {
179        self.core.distribution_key()
180    }
181
182    pub fn watermark_columns(&self) -> WatermarkColumns {
183        self.core.watermark_columns()
184    }
185
186    /// Return indexes can satisfy the required order.
187    pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Arc<TableIndex>> {
188        self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
189            .into_iter()
190            .map(|(index, _)| index)
191            .collect()
192    }
193
194    /// Return indexes can satisfy the required order.
195    /// The `prefix` refers to optionally matching columns of the index
196    /// It is unordered initially.
197    /// If any are used, we will return the fixed `Order` prefix.
198    pub fn indexes_satisfy_order_with_prefix(
199        &self,
200        required_order: &Order,
201        prefix: &HashSet<ColumnOrder>,
202    ) -> Vec<(&Arc<TableIndex>, Order)> {
203        let output_col_map = self
204            .output_col_idx()
205            .iter()
206            .cloned()
207            .enumerate()
208            .map(|(id, col)| (col, id))
209            .collect::<BTreeMap<_, _>>();
210        let unmatched_idx = output_col_map.len();
211        let mut index_catalog_and_orders = vec![];
212        for index in self.table_indexes() {
213            let s2p_mapping = index.secondary_to_primary_mapping();
214            let index_orders: Vec<ColumnOrder> = index
215                .index_table
216                .pk()
217                .iter()
218                .map(|idx_item| {
219                    let idx = match s2p_mapping.get(&idx_item.column_index) {
220                        Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
221                        // After we support index on expressions, we need to handle the case where the column is not in the `s2p_mapping`.
222                        None => unmatched_idx,
223                    };
224                    ColumnOrder::new(idx, idx_item.order_type)
225                })
226                .collect();
227
228            let mut index_orders_iter = index_orders.into_iter().peekable();
229
230            // First check the prefix
231            let fixed_prefix = {
232                let mut fixed_prefix = vec![];
233                loop {
234                    match index_orders_iter.peek() {
235                        Some(index_col_order) if prefix.contains(index_col_order) => {
236                            let index_col_order = index_orders_iter.next().unwrap();
237                            fixed_prefix.push(index_col_order);
238                        }
239                        _ => break,
240                    }
241                }
242                Order {
243                    column_orders: fixed_prefix,
244                }
245            };
246
247            let remaining_orders = Order {
248                column_orders: index_orders_iter.collect(),
249            };
250            if remaining_orders.satisfies(required_order) {
251                index_catalog_and_orders.push((index, fixed_prefix));
252            }
253        }
254        index_catalog_and_orders
255    }
256
257    /// If the index can cover the scan, transform it to the index scan.
258    pub fn to_index_scan_if_index_covered(&self, index: &Arc<TableIndex>) -> Option<LogicalScan> {
259        let p2s_mapping = index.primary_to_secondary_mapping();
260        if self
261            .required_col_idx()
262            .iter()
263            .all(|x| p2s_mapping.contains_key(x))
264        {
265            let index_scan = self.core.to_index_scan(
266                index.index_table.clone(),
267                p2s_mapping,
268                index.function_mapping(),
269            );
270            Some(index_scan.into())
271        } else {
272            None
273        }
274    }
275
276    pub fn primary_key(&self) -> &[ColumnOrder] {
277        self.core.primary_key()
278    }
279
280    /// a vec of `InputRef` corresponding to `output_col_idx`, which can represent a pulled project.
281    fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
282        self.output_col_idx()
283            .iter()
284            .enumerate()
285            .map(|(i, &col_idx)| {
286                InputRef::new(
287                    i,
288                    self.table().columns[col_idx].column_desc.data_type.clone(),
289                )
290                .into()
291            })
292            .collect_vec()
293    }
294
295    /// Undo predicate push down when predicate in scan is not supported.
296    pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
297        let mut predicate = self.predicate().clone();
298        if predicate.always_true() {
299            return (self.core.clone(), Condition::true_cond(), None);
300        }
301
302        let mut inverse_mapping = {
303            let mapping = ColIndexMapping::new(
304                self.required_col_idx().iter().map(|i| Some(*i)).collect(),
305                self.table().columns.len(),
306            );
307            // Since `required_col_idx` mapping is not invertible, we need to inverse manually.
308            let mut inverse_map = vec![None; mapping.target_size()];
309            for (src, dst) in mapping.mapping_pairs() {
310                inverse_map[dst] = Some(src);
311            }
312            ColIndexMapping::new(inverse_map, mapping.source_size())
313        };
314
315        predicate = predicate.rewrite_expr(&mut inverse_mapping);
316
317        let scan_without_predicate = generic::TableScan::new(
318            self.required_col_idx().to_vec(),
319            self.core.table_catalog.clone(),
320            self.table_indexes().to_vec(),
321            self.ctx(),
322            Condition::true_cond(),
323            self.as_of(),
324        );
325        let project_expr = if self.required_col_idx() != self.output_col_idx() {
326            Some(self.output_idx_to_input_ref())
327        } else {
328            None
329        };
330        (scan_without_predicate, predicate, project_expr)
331    }
332
333    fn clone_with_predicate(&self, predicate: Condition) -> Self {
334        generic::TableScan::new_inner(
335            self.output_col_idx().to_vec(),
336            self.table().clone(),
337            self.table_indexes().to_vec(),
338            self.base.ctx().clone(),
339            predicate,
340            self.as_of(),
341        )
342        .into()
343    }
344
345    pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
346        generic::TableScan::new_inner(
347            output_col_idx,
348            self.core.table_catalog.clone(),
349            self.table_indexes().to_vec(),
350            self.base.ctx().clone(),
351            self.predicate().clone(),
352            self.as_of(),
353        )
354        .into()
355    }
356
357    pub fn output_col_idx(&self) -> &Vec<usize> {
358        &self.core.output_col_idx
359    }
360
361    pub fn required_col_idx(&self) -> &Vec<usize> {
362        &self.core.required_col_idx
363    }
364}
365
366impl_plan_tree_node_for_leaf! { Logical, LogicalScan}
367
368impl Distill for LogicalScan {
369    fn distill<'a>(&self) -> XmlNode<'a> {
370        let verbose = self.base.ctx().is_explain_verbose();
371        let mut vec = Vec::with_capacity(5);
372        vec.push(("table", Pretty::from(self.table_name().to_owned())));
373        let key_is_columns =
374            self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
375        let key = if key_is_columns {
376            "columns"
377        } else {
378            "output_columns"
379        };
380        vec.push((key, self.core.columns_pretty(verbose)));
381        if !key_is_columns {
382            vec.push((
383                "required_columns",
384                Pretty::Array(
385                    self.required_col_idx()
386                        .iter()
387                        .map(|i| {
388                            let col_name = &self.table().columns[*i].name;
389                            Pretty::from(if verbose {
390                                format!("{}.{}", self.table_name(), col_name)
391                            } else {
392                                col_name.clone()
393                            })
394                        })
395                        .collect(),
396                ),
397            ));
398        }
399
400        if !self.predicate().always_true() {
401            let input_schema = self.core.fields_pretty_schema();
402            vec.push((
403                "predicate",
404                Pretty::display(&ConditionDisplay {
405                    condition: self.predicate(),
406                    input_schema: &input_schema,
407                }),
408            ))
409        }
410
411        if self.table_cardinality() != Cardinality::unknown() {
412            vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
413        }
414
415        childless_record("LogicalScan", vec)
416    }
417}
418
419impl ColPrunable for LogicalScan {
420    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
421        let output_col_idx: Vec<usize> = required_cols
422            .iter()
423            .map(|i| self.required_col_idx()[*i])
424            .collect();
425        assert!(
426            output_col_idx
427                .iter()
428                .all(|i| self.output_col_idx().contains(i))
429        );
430
431        self.clone_with_output_indices(output_col_idx).into()
432    }
433}
434
435impl ExprRewritable<Logical> for LogicalScan {
436    fn has_rewritable_expr(&self) -> bool {
437        true
438    }
439
440    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
441        let mut core = self.core.clone();
442        core.rewrite_exprs(r);
443        Self {
444            base: self.base.clone_with_new_plan_id(),
445            core,
446        }
447        .into()
448    }
449}
450
451impl ExprVisitable for LogicalScan {
452    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
453        self.core.visit_exprs(v);
454    }
455}
456
457impl PredicatePushdown for LogicalScan {
458    fn predicate_pushdown(
459        &self,
460        mut predicate: Condition,
461        _ctx: &mut PredicatePushdownContext,
462    ) -> PlanRef {
463        // If the predicate contains `CorrelatedInputRef` or `now()`. We don't push down.
464        // This case could come from the predicate push down before the subquery unnesting.
465        struct HasCorrelated {
466            has: bool,
467        }
468        impl ExprVisitor for HasCorrelated {
469            fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
470                self.has = true;
471            }
472        }
473        let non_pushable_predicate: Vec<_> = predicate
474            .conjunctions
475            .extract_if(.., |expr| {
476                if expr.count_nows() > 0 {
477                    true
478                } else {
479                    let mut visitor = HasCorrelated { has: false };
480                    visitor.visit_expr(expr);
481                    visitor.has
482                }
483            })
484            .collect();
485        let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
486            self.output_col_idx().iter().map(|i| Some(*i)).collect(),
487            self.table().columns.len(),
488        ));
489        if non_pushable_predicate.is_empty() {
490            self.clone_with_predicate(predicate.and(self.predicate().clone()))
491                .into()
492        } else {
493            LogicalFilter::create(
494                self.clone_with_predicate(predicate.and(self.predicate().clone()))
495                    .into(),
496                Condition {
497                    conjunctions: non_pushable_predicate,
498                },
499            )
500        }
501    }
502}
503
504impl LogicalScan {
505    fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
506        if self.predicate().always_true() {
507            required_order
508                .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
509        } else {
510            let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
511                self.table(),
512                self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
513            )?;
514            let mut scan = self.clone();
515            scan.core.predicate = predicate; // We want to keep `required_col_idx` unchanged, so do not call `clone_with_predicate`.
516
517            let plan: BatchPlanRef = if scan.core.predicate.always_false() {
518                LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
519            } else {
520                let (scan, predicate, project_expr) = scan.predicate_pull_up();
521
522                let mut plan: BatchPlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
523                if !predicate.always_true() {
524                    plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
525                }
526                if let Some(exprs) = project_expr {
527                    plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
528                }
529                plan
530            };
531
532            assert_eq!(plan.schema(), self.schema());
533            required_order.enforce_if_not_satisfies(plan)
534        }
535    }
536
537    // For every index, check if the order of the index satisfies the required_order
538    // If yes, use an index scan
539    fn use_index_scan_if_order_is_satisfied(
540        &self,
541        required_order: &Order,
542    ) -> Option<Result<BatchPlanRef>> {
543        if required_order.column_orders.is_empty() {
544            return None;
545        }
546
547        let order_satisfied_index = self.indexes_satisfy_order(required_order);
548        for index in order_satisfied_index {
549            if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
550                return Some(index_scan.to_batch());
551            }
552        }
553
554        None
555    }
556}
557
558impl ToBatch for LogicalScan {
559    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
560        self.to_batch_with_order_required(&Order::any())
561    }
562
563    fn to_batch_with_order_required(
564        &self,
565        required_order: &Order,
566    ) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
567        let new = self.clone_with_predicate(self.predicate().clone());
568
569        if !new.table_indexes().is_empty() {
570            let index_selection_rule = IndexSelectionRule::create();
571            if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
572                if let Some(scan) = applied.as_logical_scan() {
573                    // covering index
574                    return required_order.enforce_if_not_satisfies(scan.to_batch()?);
575                } else if let Some(join) = applied.as_logical_join() {
576                    // index lookup join
577                    return required_order
578                        .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
579                } else {
580                    unreachable!();
581                }
582            } else {
583                // Try to make use of index if it satisfies the required order
584                if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
585                    return plan_ref;
586                }
587            }
588        }
589        new.to_batch_inner_with_required(required_order)
590    }
591}
592
593impl ToStream for LogicalScan {
594    fn to_stream(
595        &self,
596        ctx: &mut ToStreamContext,
597    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
598        if self.predicate().always_true() {
599            // Force rewrite scan type to cross-db scan
600            if self.core.table_catalog.database_id != self.base.ctx().session_ctx().database_id() {
601                Ok(StreamTableScan::new_with_stream_scan_type(
602                    self.core.clone(),
603                    StreamScanType::CrossDbSnapshotBackfill,
604                )
605                .into())
606            } else {
607                Ok(StreamTableScan::new_with_stream_scan_type(
608                    self.core.clone(),
609                    ctx.stream_scan_type(),
610                )
611                .into())
612            }
613        } else {
614            let (scan, predicate, project_expr) = self.predicate_pull_up();
615            let mut plan = LogicalFilter::create(scan.into(), predicate);
616            if let Some(exprs) = project_expr {
617                plan = LogicalProject::create(plan, exprs)
618            }
619            plan.to_stream(ctx)
620        }
621    }
622
623    fn logical_rewrite_for_stream(
624        &self,
625        _ctx: &mut RewriteStreamContext,
626    ) -> Result<(PlanRef, ColIndexMapping)> {
627        match self.base.stream_key().is_none() {
628            true => {
629                let mut col_ids = HashSet::new();
630
631                for &idx in self.output_col_idx() {
632                    col_ids.insert(self.table().columns[idx].column_id);
633                }
634                let col_need_to_add = self
635                    .table()
636                    .pk
637                    .iter()
638                    .filter_map(|c| {
639                        if !col_ids.contains(&self.table().columns[c.column_index].column_id) {
640                            Some(c.column_index)
641                        } else {
642                            None
643                        }
644                    })
645                    .collect_vec();
646
647                let mut output_col_idx = self.output_col_idx().clone();
648                output_col_idx.extend(col_need_to_add);
649                let new_len = output_col_idx.len();
650                Ok((
651                    self.clone_with_output_indices(output_col_idx).into(),
652                    ColIndexMapping::identity_or_none(self.schema().len(), new_len),
653                ))
654            }
655            false => Ok((
656                self.clone().into(),
657                ColIndexMapping::identity(self.schema().len()),
658            )),
659        }
660    }
661}