risingwave_frontend/optimizer/plan_node/generic/
sys_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::rc::Rc;
17
18use educe::Educe;
19use pretty_xmlish::Pretty;
20use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
21use risingwave_common::util::column_index_mapping::ColIndexMapping;
22use risingwave_common::util::sort_util::ColumnOrder;
23
24use super::GenericPlanNode;
25use crate::catalog::ColumnId;
26use crate::expr::{ExprRewriter, ExprVisitor};
27use crate::optimizer::optimizer_context::OptimizerContextRef;
28use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order};
29use crate::utils::{ColIndexMappingRewriteExt, Condition};
30
31/// [`SysScan`] returns contents of a table or other equivalent object
32#[derive(Debug, Clone, Educe)]
33#[educe(PartialEq, Eq, Hash)]
34pub struct SysScan {
35    pub table_name: String,
36    /// Include `output_col_idx` and columns required in `predicate`
37    pub required_col_idx: Vec<usize>,
38    pub output_col_idx: Vec<usize>,
39    /// Descriptor of the table
40    pub table_desc: Rc<TableDesc>,
41    /// The pushed down predicates. It refers to column indexes of the table.
42    pub predicate: Condition,
43    /// Help `RowSeqSysScan` executor use a better chunk size
44    pub chunk_size: Option<u32>,
45    /// The cardinality of the table **without** applying the predicate.
46    pub table_cardinality: Cardinality,
47    #[educe(PartialEq(ignore))]
48    #[educe(Hash(ignore))]
49    pub ctx: OptimizerContextRef,
50}
51
52impl SysScan {
53    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
54        self.predicate = self.predicate.clone().rewrite_expr(r);
55    }
56
57    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
58        self.predicate.visit_expr(v);
59    }
60
61    /// Get the ids of the output columns.
62    pub fn output_column_ids(&self) -> Vec<ColumnId> {
63        self.output_col_idx
64            .iter()
65            .map(|i| self.get_table_columns()[*i].column_id)
66            .collect()
67    }
68
69    pub fn primary_key(&self) -> &[ColumnOrder] {
70        &self.table_desc.pk
71    }
72
73    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
74        self.output_col_idx
75            .iter()
76            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
77            .collect()
78    }
79
80    pub(crate) fn column_names(&self) -> Vec<String> {
81        self.output_col_idx
82            .iter()
83            .map(|&i| self.get_table_columns()[i].name.clone())
84            .collect()
85    }
86
87    pub(crate) fn order_names(&self) -> Vec<String> {
88        self.table_desc
89            .order_column_indices()
90            .iter()
91            .map(|&i| self.get_table_columns()[i].name.clone())
92            .collect()
93    }
94
95    pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
96        self.table_desc
97            .order_column_indices()
98            .iter()
99            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
100            .collect()
101    }
102
103    /// Return indices of fields the output is ordered by and
104    /// corresponding direction
105    pub fn get_out_column_index_order(&self) -> Order {
106        let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping();
107        let order = Order::new(
108            self.table_desc
109                .pk
110                .iter()
111                .map(|order| {
112                    let idx = id_to_tb_idx
113                        .get(&self.table_desc.columns[order.column_index].column_id)
114                        .unwrap();
115                    ColumnOrder::new(*idx, order.order_type)
116                })
117                .collect(),
118        );
119        self.i2o_col_mapping().rewrite_provided_order(&order)
120    }
121
122    /// get the Mapping of columnIndex from internal column index to output column index
123    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
124        ColIndexMapping::with_remaining_columns(
125            &self.output_col_idx,
126            self.get_table_columns().len(),
127        )
128    }
129
130    /// Get the ids of the output columns and primary key columns.
131    pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
132        let mut ids = self.output_column_ids();
133        for column_order in self.primary_key() {
134            let id = self.get_table_columns()[column_order.column_index].column_id;
135            if !ids.contains(&id) {
136                ids.push(id);
137            }
138        }
139        ids
140    }
141
142    /// Create a `LogicalSysScan` node. Used internally by optimizer.
143    #[allow(clippy::too_many_arguments)]
144    pub(crate) fn new(
145        table_name: String,
146        output_col_idx: Vec<usize>, // the column index in the table
147        table_desc: Rc<TableDesc>,
148        ctx: OptimizerContextRef,
149        predicate: Condition, // refers to column indexes of the table
150        table_cardinality: Cardinality,
151    ) -> Self {
152        Self::new_inner(
153            table_name,
154            output_col_idx,
155            table_desc,
156            ctx,
157            predicate,
158            table_cardinality,
159        )
160    }
161
162    #[allow(clippy::too_many_arguments)]
163    pub(crate) fn new_inner(
164        table_name: String,
165        output_col_idx: Vec<usize>, // the column index in the table
166        table_desc: Rc<TableDesc>,
167        ctx: OptimizerContextRef,
168        predicate: Condition, // refers to column indexes of the table
169        table_cardinality: Cardinality,
170    ) -> Self {
171        // here we have 3 concepts
172        // 1. column_id: ColumnId, stored in catalog and a ID to access data from storage.
173        // 2. table_idx: usize, column index in the TableDesc or tableCatalog.
174        // 3. operator_idx: usize, column index in the SysScanOperator's schema.
175        // In a query we get the same version of catalog, so the mapping from column_id and
176        // table_idx will not change. And the `required_col_idx` is the `table_idx` of the
177        // required columns, i.e., the mapping from operator_idx to table_idx.
178
179        let mut required_col_idx = output_col_idx.clone();
180        let predicate_col_idx = predicate.collect_input_refs(table_desc.columns.len());
181        predicate_col_idx.ones().for_each(|idx| {
182            if !required_col_idx.contains(&idx) {
183                required_col_idx.push(idx);
184            }
185        });
186
187        Self {
188            table_name,
189            required_col_idx,
190            output_col_idx,
191            table_desc,
192            predicate,
193            chunk_size: None,
194            ctx,
195            table_cardinality,
196        }
197    }
198
199    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
200        Pretty::Array(
201            match verbose {
202                true => self.column_names_with_table_prefix(),
203                false => self.column_names(),
204            }
205            .into_iter()
206            .map(Pretty::from)
207            .collect(),
208        )
209    }
210
211    pub(crate) fn fields_pretty_schema(&self) -> Schema {
212        let fields = self
213            .table_desc
214            .columns
215            .iter()
216            .map(|col| Field::from_with_table_name_prefix(col, &self.table_name))
217            .collect();
218        Schema { fields }
219    }
220}
221
222impl GenericPlanNode for SysScan {
223    fn schema(&self) -> Schema {
224        let fields = self
225            .output_col_idx
226            .iter()
227            .map(|tb_idx| {
228                let col = &self.get_table_columns()[*tb_idx];
229                Field::from_with_table_name_prefix(col, &self.table_name)
230            })
231            .collect();
232        Schema { fields }
233    }
234
235    fn stream_key(&self) -> Option<Vec<usize>> {
236        let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc);
237        self.table_desc
238            .stream_key
239            .iter()
240            .map(|&c| {
241                id_to_op_idx
242                    .get(&self.table_desc.columns[c].column_id)
243                    .copied()
244            })
245            .collect::<Option<Vec<_>>>()
246    }
247
248    fn ctx(&self) -> OptimizerContextRef {
249        self.ctx.clone()
250    }
251
252    fn functional_dependency(&self) -> FunctionalDependencySet {
253        let pk_indices = self.stream_key();
254        let col_num = self.output_col_idx.len();
255        match &pk_indices {
256            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
257            None => FunctionalDependencySet::new(col_num),
258        }
259    }
260}
261
262impl SysScan {
263    pub fn get_table_columns(&self) -> &[ColumnDesc] {
264        &self.table_desc.columns
265    }
266
267    /// Get the descs of the output columns.
268    pub fn column_descs(&self) -> Vec<ColumnDesc> {
269        self.output_col_idx
270            .iter()
271            .map(|&i| self.get_table_columns()[i].clone())
272            .collect()
273    }
274
275    /// Helper function to create a mapping from `column_id` to `operator_idx`
276    pub fn get_id_to_op_idx_mapping(
277        output_col_idx: &[usize],
278        table_desc: &Rc<TableDesc>,
279    ) -> HashMap<ColumnId, usize> {
280        let mut id_to_op_idx = HashMap::new();
281        output_col_idx
282            .iter()
283            .enumerate()
284            .for_each(|(op_idx, tb_idx)| {
285                let col = &table_desc.columns[*tb_idx];
286                id_to_op_idx.insert(col.column_id, op_idx);
287            });
288        id_to_op_idx
289    }
290}