risingwave_frontend/optimizer/plan_node/generic/
sys_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use educe::Educe;
19use pretty_xmlish::Pretty;
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
21use risingwave_common::util::column_index_mapping::ColIndexMapping;
22
23use super::GenericPlanNode;
24use crate::catalog::ColumnId;
25use crate::catalog::system_catalog::SystemTableCatalog;
26use crate::optimizer::optimizer_context::OptimizerContextRef;
27use crate::optimizer::property::{Cardinality, FunctionalDependencySet};
28
29/// [`SysScan`] returns contents of a table or other equivalent object
30#[derive(Debug, Clone, Educe)]
31#[educe(PartialEq, Eq, Hash)]
32pub struct SysScan {
33    pub output_col_idx: Vec<usize>,
34    pub table: Arc<SystemTableCatalog>,
35    /// Help `RowSeqSysScan` executor use a better chunk size
36    pub chunk_size: Option<u32>,
37    /// The cardinality of the table **without** applying the predicate.
38    pub table_cardinality: Cardinality,
39    #[educe(PartialEq(ignore))]
40    #[educe(Hash(ignore))]
41    pub ctx: OptimizerContextRef,
42}
43
44impl SysScan {
45    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
46        self.output_col_idx
47            .iter()
48            .map(|&i| format!("{}.{}", self.table.name, self.get_table_columns()[i].name))
49            .collect()
50    }
51
52    pub(crate) fn column_names(&self) -> Vec<String> {
53        self.output_col_idx
54            .iter()
55            .map(|&i| self.get_table_columns()[i].name.clone())
56            .collect()
57    }
58
59    /// get the Mapping of columnIndex from internal column index to output column index
60    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
61        ColIndexMapping::with_remaining_columns(
62            &self.output_col_idx,
63            self.get_table_columns().len(),
64        )
65    }
66
67    /// Create a `LogicalSysScan` node. Used internally by optimizer.
68    #[allow(clippy::too_many_arguments)]
69    pub(crate) fn new(
70        output_col_idx: Vec<usize>, // the column index in the table
71        table: Arc<SystemTableCatalog>,
72        ctx: OptimizerContextRef,
73        table_cardinality: Cardinality,
74    ) -> Self {
75        Self::new_inner(output_col_idx, table, ctx, table_cardinality)
76    }
77
78    #[allow(clippy::too_many_arguments)]
79    pub(crate) fn new_inner(
80        output_col_idx: Vec<usize>, // the column index in the table
81        table: Arc<SystemTableCatalog>,
82        ctx: OptimizerContextRef,
83        table_cardinality: Cardinality,
84    ) -> Self {
85        Self {
86            output_col_idx,
87            table,
88            chunk_size: None,
89            ctx,
90            table_cardinality,
91        }
92    }
93
94    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
95        Pretty::Array(
96            match verbose {
97                true => self.column_names_with_table_prefix(),
98                false => self.column_names(),
99            }
100            .into_iter()
101            .map(Pretty::from)
102            .collect(),
103        )
104    }
105}
106
107impl GenericPlanNode for SysScan {
108    fn schema(&self) -> Schema {
109        let fields = self
110            .output_col_idx
111            .iter()
112            .map(|tb_idx| {
113                let col = &self.get_table_columns()[*tb_idx];
114                Field::from_with_table_name_prefix(col, &self.table.name)
115            })
116            .collect();
117        Schema { fields }
118    }
119
120    fn stream_key(&self) -> Option<Vec<usize>> {
121        let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table);
122        self.table
123            .pk
124            .iter()
125            .map(|&c| id_to_op_idx.get(&self.table.columns[c].column_id).copied())
126            .collect::<Option<Vec<_>>>()
127    }
128
129    fn ctx(&self) -> OptimizerContextRef {
130        self.ctx.clone()
131    }
132
133    fn functional_dependency(&self) -> FunctionalDependencySet {
134        let pk_indices = self.stream_key();
135        let col_num = self.output_col_idx.len();
136        match &pk_indices {
137            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
138            None => FunctionalDependencySet::new(col_num),
139        }
140    }
141}
142
143impl SysScan {
144    pub fn get_table_columns(&self) -> &[ColumnCatalog] {
145        &self.table.columns
146    }
147
148    /// Get the descs of the output columns.
149    pub fn column_descs(&self) -> Vec<ColumnDesc> {
150        self.output_col_idx
151            .iter()
152            .map(|&i| self.get_table_columns()[i].column_desc.clone())
153            .collect()
154    }
155
156    /// Helper function to create a mapping from `column_id` to `operator_idx`
157    pub fn get_id_to_op_idx_mapping(
158        output_col_idx: &[usize],
159        table: &SystemTableCatalog,
160    ) -> HashMap<ColumnId, usize> {
161        ColumnDesc::get_id_to_op_idx_mapping(&table.columns, Some(output_col_idx))
162    }
163}