risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::GenericPlanNode;
27use crate::TableCatalog;
28use crate::catalog::ColumnId;
29use crate::catalog::index_catalog::{TableIndex, VectorIndex};
30use crate::catalog::table_catalog::TableType;
31use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
32use crate::optimizer::optimizer_context::OptimizerContextRef;
33use crate::optimizer::property::{FunctionalDependencySet, Order, WatermarkColumns};
34use crate::utils::{ColIndexMappingRewriteExt, Condition};
35
36/// [`TableScan`] returns contents of a RisingWave Table.
37#[derive(Debug, Clone, Educe)]
38#[educe(PartialEq, Eq, Hash)]
39pub struct TableScan {
40    /// Include `output_col_idx` and columns required in `predicate`
41    pub required_col_idx: Vec<usize>,
42    pub output_col_idx: Vec<usize>,
43    /// Table Catalog of the upstream table that the descriptor is derived from.
44    pub table_catalog: Arc<TableCatalog>,
45    // FIXME(kwannoel): Currently many places in the code reference this,
46    // but now we have table catalog.
47    // We should remove this and use table catalog in those call-sites instead.
48    // It's introduced in https://github.com/risingwavelabs/risingwave/pull/13622.
49    // We kept this field to avoid extensive refactor in that PR.
50    /// Table Desc (subset of table catalog).
51    /// Descriptors of all indexes on this table
52    pub table_indexes: Vec<Arc<TableIndex>>,
53    pub vector_indexes: Vec<Arc<VectorIndex>>,
54    /// The pushed down predicates. It refers to column indexes of the table.
55    pub predicate: Condition,
56    /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join.
57    /// syntax `FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00'` is used for iceberg.
58    /// syntax `FOR SYSTEM_TIME AS OF 499162860` is used for iceberg.
59    /// syntax `FOR SYSTEM_VERSION AS OF 10963874102873;` is used for iceberg.
60    pub as_of: Option<AsOf>,
61    #[educe(PartialEq(ignore))]
62    #[educe(Hash(ignore))]
63    pub ctx: OptimizerContextRef,
64}
65
66impl TableScan {
67    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
68        self.predicate = self.predicate.clone().rewrite_expr(r);
69    }
70
71    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
72        self.predicate.visit_expr(v);
73    }
74
75    pub fn table_name(&self) -> &str {
76        self.table_catalog.name()
77    }
78
79    /// The mapped distribution key of the scan operator.
80    ///
81    /// The column indices in it is the position in the `output_col_idx`, instead of the position
82    /// in all the columns of the table (which is the table's distribution key).
83    ///
84    /// Return `None` if the table's distribution key are not all in the `output_col_idx`.
85    pub fn distribution_key(&self) -> Option<Vec<usize>> {
86        let tb_idx_to_op_idx = self
87            .output_col_idx
88            .iter()
89            .enumerate()
90            .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
91            .collect::<HashMap<_, _>>();
92        self.table_catalog
93            .distribution_key
94            .iter()
95            .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
96            .collect()
97    }
98
99    /// Get the ids of the output columns.
100    pub fn output_column_ids(&self) -> Vec<ColumnId> {
101        self.output_col_idx
102            .iter()
103            .map(|i| self.get_table_columns()[*i].column_id)
104            .collect()
105    }
106
107    pub fn primary_key(&self) -> &[ColumnOrder] {
108        &self.table_catalog.pk
109    }
110
111    pub fn watermark_columns(&self) -> WatermarkColumns {
112        // TODO(rc): For now, we still use `FixedBitSet` for watermark columns in `TableDesc`.
113        // So when we scan from a table, we have to conservatively assign each watermark column
114        // a separate watermark group. We should record the watermark group information in
115        // `TableDesc` later.
116        let mut watermark_columns = WatermarkColumns::new();
117        for idx in self.table_catalog.watermark_columns.ones() {
118            watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
119        }
120        watermark_columns.map_clone(&self.i2o_col_mapping())
121    }
122
123    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
124        self.output_col_idx
125            .iter()
126            .map(|&i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
127            .collect()
128    }
129
130    pub(crate) fn column_names(&self) -> Vec<String> {
131        self.output_col_idx
132            .iter()
133            .map(|&i| self.get_table_columns()[i].name.clone())
134            .collect()
135    }
136
137    pub(crate) fn out_fields(&self) -> FixedBitSet {
138        let out_fields_vec = self.output_col_idx.clone();
139        FixedBitSet::from_iter(out_fields_vec)
140    }
141
142    pub(crate) fn order_names(&self) -> Vec<String> {
143        self.table_catalog
144            .order_column_indices()
145            .map(|i| self.get_table_columns()[i].name.clone())
146            .collect()
147    }
148
149    pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
150        self.table_catalog
151            .order_column_indices()
152            .map(|i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
153            .collect()
154    }
155
156    /// Return indices of fields the output is ordered by and
157    /// corresponding direction
158    pub fn get_out_column_index_order(&self) -> Order {
159        let id_to_tb_idx = self.table_catalog.get_id_to_op_idx_mapping();
160        let order = Order::new(
161            self.table_catalog
162                .pk
163                .iter()
164                .map(|order| {
165                    let idx = id_to_tb_idx
166                        .get(&self.table_catalog.columns[order.column_index].column_id)
167                        .unwrap();
168                    ColumnOrder::new(*idx, order.order_type)
169                })
170                .collect(),
171        );
172        self.i2o_col_mapping().rewrite_provided_order(&order)
173    }
174
175    /// get the Mapping of columnIndex from internal column index to output column index
176    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
177        ColIndexMapping::with_remaining_columns(
178            &self.output_col_idx,
179            self.get_table_columns().len(),
180        )
181    }
182
183    /// Get the ids of the output columns and primary key columns.
184    pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
185        let mut ids = self.output_column_ids();
186        for column_order in self.primary_key() {
187            let id = self.get_table_columns()[column_order.column_index].column_id;
188            if !ids.contains(&id) {
189                ids.push(id);
190            }
191        }
192        ids
193    }
194
195    /// Prerequisite: the caller should guarantee that `primary_to_secondary_mapping` must cover the
196    /// scan.
197    pub fn to_index_scan(
198        &self,
199        index_table_catalog: Arc<TableCatalog>,
200        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
201        function_mapping: &HashMap<FunctionCall, usize>,
202    ) -> Self {
203        let new_output_col_idx = self
204            .output_col_idx
205            .iter()
206            .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
207            .collect();
208
209        struct Rewriter<'a> {
210            primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
211            function_mapping: &'a HashMap<FunctionCall, usize>,
212        }
213        impl ExprRewriter for Rewriter<'_> {
214            fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
215                InputRef::new(
216                    *self
217                        .primary_to_secondary_mapping
218                        .get(&input_ref.index)
219                        .unwrap(),
220                    input_ref.return_type(),
221                )
222                .into()
223            }
224
225            fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
226                if let Some(index) = self.function_mapping.get(&func_call) {
227                    return InputRef::new(*index, func_call.return_type()).into();
228                }
229
230                let (func_type, inputs, ret) = func_call.decompose();
231                let inputs = inputs
232                    .into_iter()
233                    .map(|expr| self.rewrite_expr(expr))
234                    .collect();
235                FunctionCall::new_unchecked(func_type, inputs, ret).into()
236            }
237        }
238        let mut rewriter = Rewriter {
239            primary_to_secondary_mapping,
240            function_mapping,
241        };
242
243        let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
244
245        Self::new(
246            new_output_col_idx,
247            index_table_catalog,
248            vec![],
249            vec![],
250            self.ctx.clone(),
251            new_predicate,
252            self.as_of.clone(),
253        )
254    }
255
256    /// Create a `LogicalScan` node. Used internally by optimizer.
257    #[allow(clippy::too_many_arguments)]
258    pub(crate) fn new(
259        output_col_idx: Vec<usize>, // the column index in the table
260        table_catalog: Arc<TableCatalog>,
261        table_indexes: Vec<Arc<TableIndex>>,
262        vector_indexes: Vec<Arc<VectorIndex>>,
263        ctx: OptimizerContextRef,
264        predicate: Condition, // refers to column indexes of the table
265        as_of: Option<AsOf>,
266    ) -> Self {
267        Self::new_inner(
268            output_col_idx,
269            table_catalog,
270            table_indexes,
271            vector_indexes,
272            ctx,
273            predicate,
274            as_of,
275        )
276    }
277
278    pub(crate) fn new_inner(
279        output_col_idx: Vec<usize>, // the column index in the table
280        table_catalog: Arc<TableCatalog>,
281        table_indexes: Vec<Arc<TableIndex>>,
282        vector_indexes: Vec<Arc<VectorIndex>>,
283        ctx: OptimizerContextRef,
284        predicate: Condition, // refers to column indexes of the table
285        as_of: Option<AsOf>,
286    ) -> Self {
287        // here we have 3 concepts
288        // 1. column_id: ColumnId, stored in catalog and a ID to access data from storage.
289        // 2. table_idx: usize, column index in the TableDesc or tableCatalog.
290        // 3. operator_idx: usize, column index in the ScanOperator's schema.
291        // In a query we get the same version of catalog, so the mapping from column_id and
292        // table_idx will not change. And the `required_col_idx` is the `table_idx` of the
293        // required columns, i.e., the mapping from operator_idx to table_idx.
294
295        let mut required_col_idx = output_col_idx.clone();
296        let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
297        predicate_col_idx.ones().for_each(|idx| {
298            if !required_col_idx.contains(&idx) {
299                required_col_idx.push(idx);
300            }
301        });
302
303        Self {
304            required_col_idx,
305            output_col_idx,
306            table_catalog,
307            table_indexes,
308            vector_indexes,
309            predicate,
310            as_of,
311            ctx,
312        }
313    }
314
315    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
316        Pretty::Array(
317            match verbose {
318                true => self.column_names_with_table_prefix(),
319                false => self.column_names(),
320            }
321            .into_iter()
322            .map(Pretty::from)
323            .collect(),
324        )
325    }
326
327    pub(crate) fn fields_pretty_schema(&self) -> Schema {
328        let fields = self
329            .table_catalog
330            .columns
331            .iter()
332            .map(|col| Field::from_with_table_name_prefix(&col.column_desc, self.table_name()))
333            .collect();
334        Schema { fields }
335    }
336
337    // Check if the scan is cross-database
338    pub(crate) fn cross_database(&self) -> bool {
339        self.table_catalog.database_id != self.ctx().session_ctx().database_id()
340    }
341}
342
343impl GenericPlanNode for TableScan {
344    fn schema(&self) -> Schema {
345        let fields = self
346            .output_col_idx
347            .iter()
348            .map(|tb_idx| {
349                let col = &self.get_table_columns()[*tb_idx];
350                Field::from_with_table_name_prefix(&col.column_desc, self.table_name())
351            })
352            .collect();
353        Schema { fields }
354    }
355
356    fn stream_key(&self) -> Option<Vec<usize>> {
357        if matches!(self.table_catalog.table_type, TableType::Internal) {
358            return None;
359        }
360        let id_to_op_idx =
361            Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_catalog);
362        self.table_catalog
363            .stream_key
364            .iter()
365            .map(|&c| {
366                id_to_op_idx
367                    .get(&self.table_catalog.columns[c].column_id)
368                    .copied()
369            })
370            .collect::<Option<Vec<_>>>()
371    }
372
373    fn ctx(&self) -> OptimizerContextRef {
374        self.ctx.clone()
375    }
376
377    fn functional_dependency(&self) -> FunctionalDependencySet {
378        let pk_indices = self.stream_key();
379        let col_num = self.output_col_idx.len();
380        match &pk_indices {
381            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
382            None => FunctionalDependencySet::new(col_num),
383        }
384    }
385}
386
387impl TableScan {
388    pub fn get_table_columns(&self) -> &[ColumnCatalog] {
389        &self.table_catalog.columns
390    }
391
392    pub fn append_only(&self) -> bool {
393        self.table_catalog.append_only
394    }
395
396    /// Get the descs of the output columns.
397    pub fn column_descs(&self) -> Vec<ColumnDesc> {
398        self.output_col_idx
399            .iter()
400            .map(|&i| self.get_table_columns()[i].column_desc.clone())
401            .collect()
402    }
403
404    /// Helper function to create a mapping from `column_id` to `operator_idx`
405    pub fn get_id_to_op_idx_mapping(
406        output_col_idx: &[usize],
407        table_catalog: &TableCatalog,
408    ) -> HashMap<ColumnId, usize> {
409        ColumnDesc::get_id_to_op_idx_mapping(&table_catalog.columns, Some(output_col_idx))
410    }
411}