risingwave_frontend/optimizer/plan_node/
logical_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnDesc, TableDesc};
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_pb::stream_plan::StreamScanType;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::generic::{GenericPlanNode, GenericPlanRef};
27use super::utils::{Distill, childless_record};
28use super::{
29    BatchFilter, BatchProject, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef,
30    PredicatePushdown, StreamTableScan, ToBatch, ToStream, generic,
31};
32use crate::TableCatalog;
33use crate::catalog::{ColumnId, IndexCatalog};
34use crate::error::Result;
35use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
36use crate::optimizer::ApplyResult;
37use crate::optimizer::optimizer_context::OptimizerContextRef;
38use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
39use crate::optimizer::plan_node::{
40    BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
41    PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
42};
43use crate::optimizer::property::{Cardinality, Order, WatermarkColumns};
44use crate::optimizer::rule::IndexSelectionRule;
45use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
46
47/// `LogicalScan` returns contents of a table or other equivalent object
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct LogicalScan {
50    pub base: PlanBase<Logical>,
51    core: generic::TableScan,
52}
53
54impl From<generic::TableScan> for LogicalScan {
55    fn from(core: generic::TableScan) -> Self {
56        let base = PlanBase::new_logical_with_core(&core);
57        Self { base, core }
58    }
59}
60
61impl From<generic::TableScan> for PlanRef {
62    fn from(core: generic::TableScan) -> Self {
63        LogicalScan::from(core).into()
64    }
65}
66
67impl LogicalScan {
68    /// Create a [`LogicalScan`] node. Used by planner.
69    pub fn create(
70        table_name: String, // explain-only
71        table_catalog: Arc<TableCatalog>,
72        indexes: Vec<Rc<IndexCatalog>>,
73        ctx: OptimizerContextRef,
74        as_of: Option<AsOf>,
75        table_cardinality: Cardinality,
76    ) -> Self {
77        let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
78        generic::TableScan::new(
79            table_name,
80            output_col_idx,
81            table_catalog,
82            indexes,
83            ctx,
84            Condition::true_cond(),
85            as_of,
86            table_cardinality,
87        )
88        .into()
89    }
90
91    pub fn table_name(&self) -> &str {
92        &self.core.table_name
93    }
94
95    pub fn as_of(&self) -> Option<AsOf> {
96        self.core.as_of.clone()
97    }
98
99    /// The cardinality of the table **without** applying the predicate.
100    pub fn table_cardinality(&self) -> Cardinality {
101        self.core.table_cardinality
102    }
103
104    // FIXME(kwannoel): Fetch from `table_catalog` + lazily instantiate?
105    /// Get a reference to the logical scan's table desc.
106    pub fn table_desc(&self) -> &TableDesc {
107        self.core.table_desc.as_ref()
108    }
109
110    pub fn table_catalog(&self) -> Arc<TableCatalog> {
111        self.core.table_catalog.clone()
112    }
113
114    /// Get the descs of the output columns.
115    pub fn column_descs(&self) -> Vec<ColumnDesc> {
116        self.core.column_descs()
117    }
118
119    /// Get the ids of the output columns.
120    pub fn output_column_ids(&self) -> Vec<ColumnId> {
121        self.core.output_column_ids()
122    }
123
124    /// Get all indexes on this table
125    pub fn indexes(&self) -> &[Rc<IndexCatalog>] {
126        &self.core.indexes
127    }
128
129    /// Get the logical scan's filter predicate
130    pub fn predicate(&self) -> &Condition {
131        &self.core.predicate
132    }
133
134    /// Return indices of fields the output is ordered by and
135    /// corresponding direction
136    pub fn get_out_column_index_order(&self) -> Order {
137        self.core.get_out_column_index_order()
138    }
139
140    pub fn distribution_key(&self) -> Option<Vec<usize>> {
141        self.core.distribution_key()
142    }
143
144    pub fn watermark_columns(&self) -> WatermarkColumns {
145        self.core.watermark_columns()
146    }
147
148    /// Return indexes can satisfy the required order.
149    pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Rc<IndexCatalog>> {
150        self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
151            .into_iter()
152            .map(|(index, _)| index)
153            .collect()
154    }
155
156    /// Return indexes can satisfy the required order.
157    /// The `prefix` refers to optionally matching columns of the index
158    /// It is unordered initially.
159    /// If any are used, we will return the fixed `Order` prefix.
160    pub fn indexes_satisfy_order_with_prefix(
161        &self,
162        required_order: &Order,
163        prefix: &HashSet<ColumnOrder>,
164    ) -> Vec<(&Rc<IndexCatalog>, Order)> {
165        let output_col_map = self
166            .output_col_idx()
167            .iter()
168            .cloned()
169            .enumerate()
170            .map(|(id, col)| (col, id))
171            .collect::<BTreeMap<_, _>>();
172        let unmatched_idx = output_col_map.len();
173        let mut index_catalog_and_orders = vec![];
174        for index in self.indexes() {
175            let s2p_mapping = index.secondary_to_primary_mapping();
176            let index_orders: Vec<ColumnOrder> = index
177                .index_table
178                .pk()
179                .iter()
180                .map(|idx_item| {
181                    let idx = match s2p_mapping.get(&idx_item.column_index) {
182                        Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
183                        // After we support index on expressions, we need to handle the case where the column is not in the `s2p_mapping`.
184                        None => unmatched_idx,
185                    };
186                    ColumnOrder::new(idx, idx_item.order_type)
187                })
188                .collect();
189
190            let mut index_orders_iter = index_orders.into_iter().peekable();
191
192            // First check the prefix
193            let fixed_prefix = {
194                let mut fixed_prefix = vec![];
195                loop {
196                    match index_orders_iter.peek() {
197                        Some(index_col_order) if prefix.contains(index_col_order) => {
198                            let index_col_order = index_orders_iter.next().unwrap();
199                            fixed_prefix.push(index_col_order);
200                        }
201                        _ => break,
202                    }
203                }
204                Order {
205                    column_orders: fixed_prefix,
206                }
207            };
208
209            let remaining_orders = Order {
210                column_orders: index_orders_iter.collect(),
211            };
212            if remaining_orders.satisfies(required_order) {
213                index_catalog_and_orders.push((index, fixed_prefix));
214            }
215        }
216        index_catalog_and_orders
217    }
218
219    /// If the index can cover the scan, transform it to the index scan.
220    pub fn to_index_scan_if_index_covered(&self, index: &Rc<IndexCatalog>) -> Option<LogicalScan> {
221        let p2s_mapping = index.primary_to_secondary_mapping();
222        if self
223            .required_col_idx()
224            .iter()
225            .all(|x| p2s_mapping.contains_key(x))
226        {
227            let index_scan = self.core.to_index_scan(
228                &index.name,
229                index.index_table.clone(),
230                p2s_mapping,
231                index.function_mapping(),
232            );
233            Some(index_scan.into())
234        } else {
235            None
236        }
237    }
238
239    pub fn primary_key(&self) -> &[ColumnOrder] {
240        self.core.primary_key()
241    }
242
243    /// a vec of `InputRef` corresponding to `output_col_idx`, which can represent a pulled project.
244    fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
245        self.output_col_idx()
246            .iter()
247            .enumerate()
248            .map(|(i, &col_idx)| {
249                InputRef::new(i, self.table_desc().columns[col_idx].data_type.clone()).into()
250            })
251            .collect_vec()
252    }
253
254    /// Undo predicate push down when predicate in scan is not supported.
255    pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
256        let mut predicate = self.predicate().clone();
257        if predicate.always_true() {
258            return (self.core.clone(), Condition::true_cond(), None);
259        }
260
261        let mut inverse_mapping = {
262            let mapping = ColIndexMapping::new(
263                self.required_col_idx().iter().map(|i| Some(*i)).collect(),
264                self.table_desc().columns.len(),
265            );
266            // Since `required_col_idx` mapping is not invertible, we need to inverse manually.
267            let mut inverse_map = vec![None; mapping.target_size()];
268            for (src, dst) in mapping.mapping_pairs() {
269                inverse_map[dst] = Some(src);
270            }
271            ColIndexMapping::new(inverse_map, mapping.source_size())
272        };
273
274        predicate = predicate.rewrite_expr(&mut inverse_mapping);
275
276        let scan_without_predicate = generic::TableScan::new(
277            self.table_name().to_owned(),
278            self.required_col_idx().to_vec(),
279            self.core.table_catalog.clone(),
280            self.indexes().to_vec(),
281            self.ctx(),
282            Condition::true_cond(),
283            self.as_of(),
284            self.table_cardinality(),
285        );
286        let project_expr = if self.required_col_idx() != self.output_col_idx() {
287            Some(self.output_idx_to_input_ref())
288        } else {
289            None
290        };
291        (scan_without_predicate, predicate, project_expr)
292    }
293
294    fn clone_with_predicate(&self, predicate: Condition) -> Self {
295        generic::TableScan::new_inner(
296            self.table_name().to_owned(),
297            self.output_col_idx().to_vec(),
298            self.table_catalog(),
299            self.indexes().to_vec(),
300            self.base.ctx().clone(),
301            predicate,
302            self.as_of(),
303            self.table_cardinality(),
304        )
305        .into()
306    }
307
308    pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
309        generic::TableScan::new_inner(
310            self.table_name().to_owned(),
311            output_col_idx,
312            self.core.table_catalog.clone(),
313            self.indexes().to_vec(),
314            self.base.ctx().clone(),
315            self.predicate().clone(),
316            self.as_of(),
317            self.table_cardinality(),
318        )
319        .into()
320    }
321
322    pub fn output_col_idx(&self) -> &Vec<usize> {
323        &self.core.output_col_idx
324    }
325
326    pub fn required_col_idx(&self) -> &Vec<usize> {
327        &self.core.required_col_idx
328    }
329}
330
331impl_plan_tree_node_for_leaf! {LogicalScan}
332
333impl Distill for LogicalScan {
334    fn distill<'a>(&self) -> XmlNode<'a> {
335        let verbose = self.base.ctx().is_explain_verbose();
336        let mut vec = Vec::with_capacity(5);
337        vec.push(("table", Pretty::from(self.table_name().to_owned())));
338        let key_is_columns =
339            self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
340        let key = if key_is_columns {
341            "columns"
342        } else {
343            "output_columns"
344        };
345        vec.push((key, self.core.columns_pretty(verbose)));
346        if !key_is_columns {
347            vec.push((
348                "required_columns",
349                Pretty::Array(
350                    self.required_col_idx()
351                        .iter()
352                        .map(|i| {
353                            let col_name = &self.table_desc().columns[*i].name;
354                            Pretty::from(if verbose {
355                                format!("{}.{}", self.table_name(), col_name)
356                            } else {
357                                col_name.to_string()
358                            })
359                        })
360                        .collect(),
361                ),
362            ));
363        }
364
365        if !self.predicate().always_true() {
366            let input_schema = self.core.fields_pretty_schema();
367            vec.push((
368                "predicate",
369                Pretty::display(&ConditionDisplay {
370                    condition: self.predicate(),
371                    input_schema: &input_schema,
372                }),
373            ))
374        }
375
376        if self.table_cardinality() != Cardinality::unknown() {
377            vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
378        }
379
380        childless_record("LogicalScan", vec)
381    }
382}
383
384impl ColPrunable for LogicalScan {
385    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
386        let output_col_idx: Vec<usize> = required_cols
387            .iter()
388            .map(|i| self.required_col_idx()[*i])
389            .collect();
390        assert!(
391            output_col_idx
392                .iter()
393                .all(|i| self.output_col_idx().contains(i))
394        );
395
396        self.clone_with_output_indices(output_col_idx).into()
397    }
398}
399
400impl ExprRewritable for LogicalScan {
401    fn has_rewritable_expr(&self) -> bool {
402        true
403    }
404
405    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
406        let mut core = self.core.clone();
407        core.rewrite_exprs(r);
408        Self {
409            base: self.base.clone_with_new_plan_id(),
410            core,
411        }
412        .into()
413    }
414}
415
416impl ExprVisitable for LogicalScan {
417    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
418        self.core.visit_exprs(v);
419    }
420}
421
422impl PredicatePushdown for LogicalScan {
423    fn predicate_pushdown(
424        &self,
425        mut predicate: Condition,
426        _ctx: &mut PredicatePushdownContext,
427    ) -> PlanRef {
428        // If the predicate contains `CorrelatedInputRef` or `now()`. We don't push down.
429        // This case could come from the predicate push down before the subquery unnesting.
430        struct HasCorrelated {
431            has: bool,
432        }
433        impl ExprVisitor for HasCorrelated {
434            fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
435                self.has = true;
436            }
437        }
438        let non_pushable_predicate: Vec<_> = predicate
439            .conjunctions
440            .extract_if(.., |expr| {
441                if expr.count_nows() > 0 {
442                    true
443                } else {
444                    let mut visitor = HasCorrelated { has: false };
445                    visitor.visit_expr(expr);
446                    visitor.has
447                }
448            })
449            .collect();
450        let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
451            self.output_col_idx().iter().map(|i| Some(*i)).collect(),
452            self.table_desc().columns.len(),
453        ));
454        if non_pushable_predicate.is_empty() {
455            self.clone_with_predicate(predicate.and(self.predicate().clone()))
456                .into()
457        } else {
458            LogicalFilter::create(
459                self.clone_with_predicate(predicate.and(self.predicate().clone()))
460                    .into(),
461                Condition {
462                    conjunctions: non_pushable_predicate,
463                },
464            )
465        }
466    }
467}
468
469impl LogicalScan {
470    fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<PlanRef> {
471        if self.predicate().always_true() {
472            required_order
473                .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
474        } else {
475            let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
476                self.core.table_desc.clone(),
477                self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
478            )?;
479            let mut scan = self.clone();
480            scan.core.predicate = predicate; // We want to keep `required_col_idx` unchanged, so do not call `clone_with_predicate`.
481
482            let plan: PlanRef = if scan.core.predicate.always_false() {
483                LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
484            } else {
485                let (scan, predicate, project_expr) = scan.predicate_pull_up();
486
487                let mut plan: PlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
488                if !predicate.always_true() {
489                    plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
490                }
491                if let Some(exprs) = project_expr {
492                    plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
493                }
494                plan
495            };
496
497            assert_eq!(plan.schema(), self.schema());
498            required_order.enforce_if_not_satisfies(plan)
499        }
500    }
501
502    // For every index, check if the order of the index satisfies the required_order
503    // If yes, use an index scan
504    fn use_index_scan_if_order_is_satisfied(
505        &self,
506        required_order: &Order,
507    ) -> Option<Result<PlanRef>> {
508        if required_order.column_orders.is_empty() {
509            return None;
510        }
511
512        let order_satisfied_index = self.indexes_satisfy_order(required_order);
513        for index in order_satisfied_index {
514            if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
515                return Some(index_scan.to_batch());
516            }
517        }
518
519        None
520    }
521}
522
523impl ToBatch for LogicalScan {
524    fn to_batch(&self) -> Result<PlanRef> {
525        self.to_batch_with_order_required(&Order::any())
526    }
527
528    fn to_batch_with_order_required(&self, required_order: &Order) -> Result<PlanRef> {
529        let new = self.clone_with_predicate(self.predicate().clone());
530
531        if !new.indexes().is_empty() {
532            let index_selection_rule = IndexSelectionRule::create();
533            if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
534                if let Some(scan) = applied.as_logical_scan() {
535                    // covering index
536                    return required_order.enforce_if_not_satisfies(scan.to_batch()?);
537                } else if let Some(join) = applied.as_logical_join() {
538                    // index lookup join
539                    return required_order
540                        .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
541                } else {
542                    unreachable!();
543                }
544            } else {
545                // Try to make use of index if it satisfies the required order
546                if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
547                    return plan_ref;
548                }
549            }
550        }
551        new.to_batch_inner_with_required(required_order)
552    }
553}
554
555impl ToStream for LogicalScan {
556    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
557        if self.predicate().always_true() {
558            // Force rewrite scan type to cross-db scan
559            if self.core.table_catalog.database_id != self.base.ctx().session_ctx().database_id() {
560                Ok(StreamTableScan::new_with_stream_scan_type(
561                    self.core.clone(),
562                    StreamScanType::CrossDbSnapshotBackfill,
563                )
564                .into())
565            } else {
566                Ok(StreamTableScan::new_with_stream_scan_type(
567                    self.core.clone(),
568                    ctx.stream_scan_type(),
569                )
570                .into())
571            }
572        } else {
573            let (scan, predicate, project_expr) = self.predicate_pull_up();
574            let mut plan = LogicalFilter::create(scan.into(), predicate);
575            if let Some(exprs) = project_expr {
576                plan = LogicalProject::create(plan, exprs)
577            }
578            plan.to_stream(ctx)
579        }
580    }
581
582    fn logical_rewrite_for_stream(
583        &self,
584        _ctx: &mut RewriteStreamContext,
585    ) -> Result<(PlanRef, ColIndexMapping)> {
586        match self.base.stream_key().is_none() {
587            true => {
588                let mut col_ids = HashSet::new();
589
590                for &idx in self.output_col_idx() {
591                    col_ids.insert(self.table_desc().columns[idx].column_id);
592                }
593                let col_need_to_add = self
594                    .table_desc()
595                    .pk
596                    .iter()
597                    .filter_map(|c| {
598                        if !col_ids.contains(&self.table_desc().columns[c.column_index].column_id) {
599                            Some(c.column_index)
600                        } else {
601                            None
602                        }
603                    })
604                    .collect_vec();
605
606                let mut output_col_idx = self.output_col_idx().clone();
607                output_col_idx.extend(col_need_to_add);
608                let new_len = output_col_idx.len();
609                Ok((
610                    self.clone_with_output_indices(output_col_idx).into(),
611                    ColIndexMapping::identity_or_none(self.schema().len(), new_len),
612                ))
613            }
614            false => Ok((
615                self.clone().into(),
616                ColIndexMapping::identity(self.schema().len()),
617            )),
618        }
619    }
620}