risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use educe::Educe;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::GenericPlanNode;
27use crate::TableCatalog;
28use crate::catalog::table_catalog::TableType;
29use crate::catalog::{ColumnId, IndexCatalog};
30use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
31use crate::optimizer::optimizer_context::OptimizerContextRef;
32use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
33use crate::utils::{ColIndexMappingRewriteExt, Condition};
34
35/// [`TableScan`] returns contents of a RisingWave Table.
36#[derive(Debug, Clone, Educe)]
37#[educe(PartialEq, Eq, Hash)]
38pub struct TableScan {
39    pub table_name: String,
40    /// Include `output_col_idx` and columns required in `predicate`
41    pub required_col_idx: Vec<usize>,
42    pub output_col_idx: Vec<usize>,
43    /// Table Catalog of the upstream table that the descriptor is derived from.
44    pub table_catalog: Arc<TableCatalog>,
45    // FIXME(kwannoel): Currently many places in the code reference this,
46    // but now we have table catalog.
47    // We should remove this and use table catalog in those call-sites instead.
48    // It's introduced in https://github.com/risingwavelabs/risingwave/pull/13622.
49    // We kept this field to avoid extensive refactor in that PR.
50    /// Table Desc (subset of table catalog).
51    pub table_desc: Rc<TableDesc>,
52    /// Descriptors of all indexes on this table
53    pub indexes: Vec<Rc<IndexCatalog>>,
54    /// The pushed down predicates. It refers to column indexes of the table.
55    pub predicate: Condition,
56    /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join.
57    /// syntax `FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00'` is used for iceberg.
58    /// syntax `FOR SYSTEM_TIME AS OF 499162860` is used for iceberg.
59    /// syntax `FOR SYSTEM_VERSION AS OF 10963874102873;` is used for iceberg.
60    pub as_of: Option<AsOf>,
61    /// The cardinality of the table **without** applying the predicate.
62    pub table_cardinality: Cardinality,
63    #[educe(PartialEq(ignore))]
64    #[educe(Hash(ignore))]
65    pub ctx: OptimizerContextRef,
66}
67
68impl TableScan {
69    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
70        self.predicate = self.predicate.clone().rewrite_expr(r);
71    }
72
73    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
74        self.predicate.visit_expr(v);
75    }
76
77    /// The mapped distribution key of the scan operator.
78    ///
79    /// The column indices in it is the position in the `output_col_idx`, instead of the position
80    /// in all the columns of the table (which is the table's distribution key).
81    ///
82    /// Return `None` if the table's distribution key are not all in the `output_col_idx`.
83    pub fn distribution_key(&self) -> Option<Vec<usize>> {
84        let tb_idx_to_op_idx = self
85            .output_col_idx
86            .iter()
87            .enumerate()
88            .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
89            .collect::<HashMap<_, _>>();
90        self.table_desc
91            .distribution_key
92            .iter()
93            .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
94            .collect()
95    }
96
97    /// Get the ids of the output columns.
98    pub fn output_column_ids(&self) -> Vec<ColumnId> {
99        self.output_col_idx
100            .iter()
101            .map(|i| self.get_table_columns()[*i].column_id)
102            .collect()
103    }
104
105    pub fn primary_key(&self) -> &[ColumnOrder] {
106        &self.table_desc.pk
107    }
108
109    pub fn watermark_columns(&self) -> WatermarkColumns {
110        // TODO(rc): For now, we still use `FixedBitSet` for watermark columns in `TableDesc`.
111        // So when we scan from a table, we have to conservatively assign each watermark column
112        // a separate watermark group. We should record the watermark group information in
113        // `TableDesc` later.
114        let mut watermark_columns = WatermarkColumns::new();
115        for idx in self.table_desc.watermark_columns.ones() {
116            watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
117        }
118        watermark_columns.map_clone(&self.i2o_col_mapping())
119    }
120
121    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
122        self.output_col_idx
123            .iter()
124            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
125            .collect()
126    }
127
128    pub(crate) fn column_names(&self) -> Vec<String> {
129        self.output_col_idx
130            .iter()
131            .map(|&i| self.get_table_columns()[i].name.clone())
132            .collect()
133    }
134
135    pub(crate) fn order_names(&self) -> Vec<String> {
136        self.table_desc
137            .order_column_indices()
138            .iter()
139            .map(|&i| self.get_table_columns()[i].name.clone())
140            .collect()
141    }
142
143    pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
144        self.table_desc
145            .order_column_indices()
146            .iter()
147            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
148            .collect()
149    }
150
151    /// Return indices of fields the output is ordered by and
152    /// corresponding direction
153    pub fn get_out_column_index_order(&self) -> Order {
154        let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping();
155        let order = Order::new(
156            self.table_desc
157                .pk
158                .iter()
159                .map(|order| {
160                    let idx = id_to_tb_idx
161                        .get(&self.table_desc.columns[order.column_index].column_id)
162                        .unwrap();
163                    ColumnOrder::new(*idx, order.order_type)
164                })
165                .collect(),
166        );
167        self.i2o_col_mapping().rewrite_provided_order(&order)
168    }
169
170    /// get the Mapping of columnIndex from internal column index to output column index
171    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
172        ColIndexMapping::with_remaining_columns(
173            &self.output_col_idx,
174            self.get_table_columns().len(),
175        )
176    }
177
178    /// Get the ids of the output columns and primary key columns.
179    pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
180        let mut ids = self.output_column_ids();
181        for column_order in self.primary_key() {
182            let id = self.get_table_columns()[column_order.column_index].column_id;
183            if !ids.contains(&id) {
184                ids.push(id);
185            }
186        }
187        ids
188    }
189
190    /// Prerequisite: the caller should guarantee that `primary_to_secondary_mapping` must cover the
191    /// scan.
192    pub fn to_index_scan(
193        &self,
194        index_name: &str,
195        index_table_catalog: Arc<TableCatalog>,
196        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
197        function_mapping: &HashMap<FunctionCall, usize>,
198    ) -> Self {
199        let new_output_col_idx = self
200            .output_col_idx
201            .iter()
202            .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
203            .collect();
204
205        struct Rewriter<'a> {
206            primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
207            function_mapping: &'a HashMap<FunctionCall, usize>,
208        }
209        impl ExprRewriter for Rewriter<'_> {
210            fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
211                InputRef::new(
212                    *self
213                        .primary_to_secondary_mapping
214                        .get(&input_ref.index)
215                        .unwrap(),
216                    input_ref.return_type(),
217                )
218                .into()
219            }
220
221            fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
222                if let Some(index) = self.function_mapping.get(&func_call) {
223                    return InputRef::new(*index, func_call.return_type()).into();
224                }
225
226                let (func_type, inputs, ret) = func_call.decompose();
227                let inputs = inputs
228                    .into_iter()
229                    .map(|expr| self.rewrite_expr(expr))
230                    .collect();
231                FunctionCall::new_unchecked(func_type, inputs, ret).into()
232            }
233        }
234        let mut rewriter = Rewriter {
235            primary_to_secondary_mapping,
236            function_mapping,
237        };
238
239        let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
240
241        Self::new(
242            index_name.to_owned(),
243            new_output_col_idx,
244            index_table_catalog,
245            vec![],
246            self.ctx.clone(),
247            new_predicate,
248            self.as_of.clone(),
249            self.table_cardinality,
250        )
251    }
252
253    /// Create a `LogicalScan` node. Used internally by optimizer.
254    #[allow(clippy::too_many_arguments)]
255    pub(crate) fn new(
256        table_name: String,
257        output_col_idx: Vec<usize>, // the column index in the table
258        table_catalog: Arc<TableCatalog>,
259        indexes: Vec<Rc<IndexCatalog>>,
260        ctx: OptimizerContextRef,
261        predicate: Condition, // refers to column indexes of the table
262        as_of: Option<AsOf>,
263        table_cardinality: Cardinality,
264    ) -> Self {
265        Self::new_inner(
266            table_name,
267            output_col_idx,
268            table_catalog,
269            indexes,
270            ctx,
271            predicate,
272            as_of,
273            table_cardinality,
274        )
275    }
276
277    #[allow(clippy::too_many_arguments)]
278    pub(crate) fn new_inner(
279        table_name: String,
280        output_col_idx: Vec<usize>, // the column index in the table
281        table_catalog: Arc<TableCatalog>,
282        indexes: Vec<Rc<IndexCatalog>>,
283        ctx: OptimizerContextRef,
284        predicate: Condition, // refers to column indexes of the table
285        as_of: Option<AsOf>,
286        table_cardinality: Cardinality,
287    ) -> Self {
288        // here we have 3 concepts
289        // 1. column_id: ColumnId, stored in catalog and a ID to access data from storage.
290        // 2. table_idx: usize, column index in the TableDesc or tableCatalog.
291        // 3. operator_idx: usize, column index in the ScanOperator's schema.
292        // In a query we get the same version of catalog, so the mapping from column_id and
293        // table_idx will not change. And the `required_col_idx` is the `table_idx` of the
294        // required columns, i.e., the mapping from operator_idx to table_idx.
295
296        let mut required_col_idx = output_col_idx.clone();
297        let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
298        predicate_col_idx.ones().for_each(|idx| {
299            if !required_col_idx.contains(&idx) {
300                required_col_idx.push(idx);
301            }
302        });
303
304        let table_desc = Rc::new(table_catalog.table_desc());
305
306        Self {
307            table_name,
308            required_col_idx,
309            output_col_idx,
310            table_catalog,
311            table_desc,
312            indexes,
313            predicate,
314            as_of,
315            table_cardinality,
316            ctx,
317        }
318    }
319
320    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
321        Pretty::Array(
322            match verbose {
323                true => self.column_names_with_table_prefix(),
324                false => self.column_names(),
325            }
326            .into_iter()
327            .map(Pretty::from)
328            .collect(),
329        )
330    }
331
332    pub(crate) fn fields_pretty_schema(&self) -> Schema {
333        let fields = self
334            .table_desc
335            .columns
336            .iter()
337            .map(|col| Field::from_with_table_name_prefix(col, &self.table_name))
338            .collect();
339        Schema { fields }
340    }
341}
342
343impl GenericPlanNode for TableScan {
344    fn schema(&self) -> Schema {
345        let fields = self
346            .output_col_idx
347            .iter()
348            .map(|tb_idx| {
349                let col = &self.get_table_columns()[*tb_idx];
350                Field::from_with_table_name_prefix(col, &self.table_name)
351            })
352            .collect();
353        Schema { fields }
354    }
355
356    fn stream_key(&self) -> Option<Vec<usize>> {
357        if matches!(self.table_catalog.table_type, TableType::Internal) {
358            return None;
359        }
360        let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc);
361        self.table_desc
362            .stream_key
363            .iter()
364            .map(|&c| {
365                id_to_op_idx
366                    .get(&self.table_desc.columns[c].column_id)
367                    .copied()
368            })
369            .collect::<Option<Vec<_>>>()
370    }
371
372    fn ctx(&self) -> OptimizerContextRef {
373        self.ctx.clone()
374    }
375
376    fn functional_dependency(&self) -> FunctionalDependencySet {
377        let pk_indices = self.stream_key();
378        let col_num = self.output_col_idx.len();
379        match &pk_indices {
380            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
381            None => FunctionalDependencySet::new(col_num),
382        }
383    }
384}
385
386impl TableScan {
387    pub fn get_table_columns(&self) -> &[ColumnDesc] {
388        &self.table_desc.columns
389    }
390
391    pub fn append_only(&self) -> bool {
392        self.table_desc.append_only
393    }
394
395    /// Get the descs of the output columns.
396    pub fn column_descs(&self) -> Vec<ColumnDesc> {
397        self.output_col_idx
398            .iter()
399            .map(|&i| self.get_table_columns()[i].clone())
400            .collect()
401    }
402
403    /// Helper function to create a mapping from `column_id` to `operator_idx`
404    pub fn get_id_to_op_idx_mapping(
405        output_col_idx: &[usize],
406        table_desc: &Rc<TableDesc>,
407    ) -> HashMap<ColumnId, usize> {
408        let mut id_to_op_idx = HashMap::new();
409        output_col_idx
410            .iter()
411            .enumerate()
412            .for_each(|(op_idx, tb_idx)| {
413                let col = &table_desc.columns[*tb_idx];
414                id_to_op_idx.insert(col.column_id, op_idx);
415            });
416        id_to_op_idx
417    }
418}