risingwave_frontend/optimizer/plan_node/
logical_sys_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::ColumnDesc;
20
21use super::generic::GenericPlanRef;
22use super::utils::{Distill, childless_record};
23use super::{
24    ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PredicatePushdown,
25    ToBatch, ToStream, generic,
26};
27use crate::catalog::system_catalog::SystemTableCatalog;
28use crate::error::Result;
29use crate::optimizer::optimizer_context::OptimizerContextRef;
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::plan_node::{
32    BatchSysSeqScan, ColumnPruningContext, LogicalFilter, PredicatePushdownContext,
33    RewriteStreamContext, ToStreamContext,
34};
35use crate::optimizer::property::Cardinality;
36use crate::utils::{ColIndexMapping, Condition};
37
38/// `LogicalSysScan` returns contents of a table or other equivalent object
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct LogicalSysScan {
41    pub base: PlanBase<Logical>,
42    core: generic::SysScan,
43}
44
45impl From<generic::SysScan> for LogicalSysScan {
46    fn from(core: generic::SysScan) -> Self {
47        let base = PlanBase::new_logical_with_core(&core);
48        Self { base, core }
49    }
50}
51
52impl From<generic::SysScan> for PlanRef {
53    fn from(core: generic::SysScan) -> Self {
54        LogicalSysScan::from(core).into()
55    }
56}
57
58impl LogicalSysScan {
59    /// Create a [`LogicalSysScan`] node. Used by planner.
60    pub fn create(
61        table: Arc<SystemTableCatalog>,
62        ctx: OptimizerContextRef,
63        table_cardinality: Cardinality,
64    ) -> Self {
65        generic::SysScan::new(
66            (0..table.columns.len()).collect(),
67            table,
68            ctx,
69            table_cardinality,
70        )
71        .into()
72    }
73
74    pub fn table_name(&self) -> &str {
75        &self.core.table.name
76    }
77
78    /// The cardinality of the table **without** applying the predicate.
79    pub fn table_cardinality(&self) -> Cardinality {
80        self.core.table_cardinality
81    }
82
83    /// Get a reference to the logical scan's table desc.
84    pub fn table(&self) -> &Arc<SystemTableCatalog> {
85        &self.core.table
86    }
87
88    /// Get the descs of the output columns.
89    pub fn column_descs(&self) -> Vec<ColumnDesc> {
90        self.core.column_descs()
91    }
92
93    fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
94        generic::SysScan::new_inner(
95            output_col_idx,
96            self.table().clone(),
97            self.base.ctx().clone(),
98            self.table_cardinality(),
99        )
100        .into()
101    }
102
103    fn output_col_idx(&self) -> &Vec<usize> {
104        &self.core.output_col_idx
105    }
106}
107
108impl_plan_tree_node_for_leaf! { Logical, LogicalSysScan}
109
110impl Distill for LogicalSysScan {
111    fn distill<'a>(&self) -> XmlNode<'a> {
112        let verbose = self.base.ctx().is_explain_verbose();
113        let mut vec = Vec::with_capacity(5);
114        vec.push(("table", Pretty::from(self.table_name().to_owned())));
115        vec.push(("output_columns", self.core.columns_pretty(verbose)));
116
117        if self.table_cardinality() != Cardinality::unknown() {
118            vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
119        }
120
121        childless_record("LogicalSysScan", vec)
122    }
123}
124
125impl ColPrunable for LogicalSysScan {
126    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
127        let output_col_idx: Vec<usize> = required_cols
128            .iter()
129            .map(|i| self.output_col_idx()[*i])
130            .collect();
131        assert!(
132            output_col_idx
133                .iter()
134                .all(|i| self.output_col_idx().contains(i))
135        );
136
137        self.clone_with_output_indices(output_col_idx).into()
138    }
139}
140
141impl ExprRewritable<Logical> for LogicalSysScan {}
142
143impl ExprVisitable for LogicalSysScan {}
144
145impl PredicatePushdown for LogicalSysScan {
146    // TODO(kwannoel): Unify this with logical_scan.
147    fn predicate_pushdown(
148        &self,
149        predicate: Condition,
150        _ctx: &mut PredicatePushdownContext,
151    ) -> PlanRef {
152        LogicalFilter::create(self.clone().into(), predicate)
153    }
154}
155
156impl ToBatch for LogicalSysScan {
157    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
158        Ok(BatchSysSeqScan::new(self.core.clone()).into())
159    }
160}
161
162impl ToStream for LogicalSysScan {
163    fn to_stream(
164        &self,
165        _ctx: &mut ToStreamContext,
166    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
167        bail_not_implemented!("streaming on system table");
168    }
169
170    fn logical_rewrite_for_stream(
171        &self,
172        _ctx: &mut RewriteStreamContext,
173    ) -> Result<(PlanRef, ColIndexMapping)> {
174        bail_not_implemented!("streaming on system table");
175    }
176}