risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::GenericPlanNode;
27use crate::TableCatalog;
28use crate::catalog::ColumnId;
29use crate::catalog::index_catalog::TableIndex;
30use crate::catalog::table_catalog::TableType;
31use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
32use crate::optimizer::optimizer_context::OptimizerContextRef;
33use crate::optimizer::property::{FunctionalDependencySet, Order, WatermarkColumns};
34use crate::utils::{ColIndexMappingRewriteExt, Condition};
35
36/// [`TableScan`] returns contents of a RisingWave Table.
37#[derive(Debug, Clone, Educe)]
38#[educe(PartialEq, Eq, Hash)]
39pub struct TableScan {
40    /// Include `output_col_idx` and columns required in `predicate`
41    pub required_col_idx: Vec<usize>,
42    pub output_col_idx: Vec<usize>,
43    /// Table Catalog of the upstream table that the descriptor is derived from.
44    pub table_catalog: Arc<TableCatalog>,
45    // FIXME(kwannoel): Currently many places in the code reference this,
46    // but now we have table catalog.
47    // We should remove this and use table catalog in those call-sites instead.
48    // It's introduced in https://github.com/risingwavelabs/risingwave/pull/13622.
49    // We kept this field to avoid extensive refactor in that PR.
50    /// Table Desc (subset of table catalog).
51    /// Descriptors of all indexes on this table
52    pub table_indexes: Vec<Arc<TableIndex>>,
53    /// The pushed down predicates. It refers to column indexes of the table.
54    pub predicate: Condition,
55    /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join.
56    /// syntax `FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00'` is used for iceberg.
57    /// syntax `FOR SYSTEM_TIME AS OF 499162860` is used for iceberg.
58    /// syntax `FOR SYSTEM_VERSION AS OF 10963874102873;` is used for iceberg.
59    pub as_of: Option<AsOf>,
60    #[educe(PartialEq(ignore))]
61    #[educe(Hash(ignore))]
62    pub ctx: OptimizerContextRef,
63}
64
65impl TableScan {
66    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
67        self.predicate = self.predicate.clone().rewrite_expr(r);
68    }
69
70    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
71        self.predicate.visit_expr(v);
72    }
73
74    pub fn table_name(&self) -> &str {
75        self.table_catalog.name()
76    }
77
78    /// The mapped distribution key of the scan operator.
79    ///
80    /// The column indices in it is the position in the `output_col_idx`, instead of the position
81    /// in all the columns of the table (which is the table's distribution key).
82    ///
83    /// Return `None` if the table's distribution key are not all in the `output_col_idx`.
84    pub fn distribution_key(&self) -> Option<Vec<usize>> {
85        let tb_idx_to_op_idx = self
86            .output_col_idx
87            .iter()
88            .enumerate()
89            .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
90            .collect::<HashMap<_, _>>();
91        self.table_catalog
92            .distribution_key
93            .iter()
94            .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
95            .collect()
96    }
97
98    /// Get the ids of the output columns.
99    pub fn output_column_ids(&self) -> Vec<ColumnId> {
100        self.output_col_idx
101            .iter()
102            .map(|i| self.get_table_columns()[*i].column_id)
103            .collect()
104    }
105
106    pub fn primary_key(&self) -> &[ColumnOrder] {
107        &self.table_catalog.pk
108    }
109
110    pub fn watermark_columns(&self) -> WatermarkColumns {
111        // TODO(rc): For now, we still use `FixedBitSet` for watermark columns in `TableDesc`.
112        // So when we scan from a table, we have to conservatively assign each watermark column
113        // a separate watermark group. We should record the watermark group information in
114        // `TableDesc` later.
115        let mut watermark_columns = WatermarkColumns::new();
116        for idx in self.table_catalog.watermark_columns.ones() {
117            watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
118        }
119        watermark_columns.map_clone(&self.i2o_col_mapping())
120    }
121
122    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
123        self.output_col_idx
124            .iter()
125            .map(|&i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
126            .collect()
127    }
128
129    pub(crate) fn column_names(&self) -> Vec<String> {
130        self.output_col_idx
131            .iter()
132            .map(|&i| self.get_table_columns()[i].name.clone())
133            .collect()
134    }
135
136    pub(crate) fn out_fields(&self) -> FixedBitSet {
137        let out_fields_vec = self.output_col_idx.clone();
138        FixedBitSet::from_iter(out_fields_vec)
139    }
140
141    pub(crate) fn order_names(&self) -> Vec<String> {
142        self.table_catalog
143            .order_column_indices()
144            .map(|i| self.get_table_columns()[i].name.clone())
145            .collect()
146    }
147
148    pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
149        self.table_catalog
150            .order_column_indices()
151            .map(|i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
152            .collect()
153    }
154
155    /// Return indices of fields the output is ordered by and
156    /// corresponding direction
157    pub fn get_out_column_index_order(&self) -> Order {
158        let id_to_tb_idx = self.table_catalog.get_id_to_op_idx_mapping();
159        let order = Order::new(
160            self.table_catalog
161                .pk
162                .iter()
163                .map(|order| {
164                    let idx = id_to_tb_idx
165                        .get(&self.table_catalog.columns[order.column_index].column_id)
166                        .unwrap();
167                    ColumnOrder::new(*idx, order.order_type)
168                })
169                .collect(),
170        );
171        self.i2o_col_mapping().rewrite_provided_order(&order)
172    }
173
174    /// get the Mapping of columnIndex from internal column index to output column index
175    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
176        ColIndexMapping::with_remaining_columns(
177            &self.output_col_idx,
178            self.get_table_columns().len(),
179        )
180    }
181
182    /// Get the ids of the output columns and primary key columns.
183    pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
184        let mut ids = self.output_column_ids();
185        for column_order in self.primary_key() {
186            let id = self.get_table_columns()[column_order.column_index].column_id;
187            if !ids.contains(&id) {
188                ids.push(id);
189            }
190        }
191        ids
192    }
193
194    /// Prerequisite: the caller should guarantee that `primary_to_secondary_mapping` must cover the
195    /// scan.
196    pub fn to_index_scan(
197        &self,
198        index_table_catalog: Arc<TableCatalog>,
199        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
200        function_mapping: &HashMap<FunctionCall, usize>,
201    ) -> Self {
202        let new_output_col_idx = self
203            .output_col_idx
204            .iter()
205            .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
206            .collect();
207
208        struct Rewriter<'a> {
209            primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
210            function_mapping: &'a HashMap<FunctionCall, usize>,
211        }
212        impl ExprRewriter for Rewriter<'_> {
213            fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
214                InputRef::new(
215                    *self
216                        .primary_to_secondary_mapping
217                        .get(&input_ref.index)
218                        .unwrap(),
219                    input_ref.return_type(),
220                )
221                .into()
222            }
223
224            fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
225                if let Some(index) = self.function_mapping.get(&func_call) {
226                    return InputRef::new(*index, func_call.return_type()).into();
227                }
228
229                let (func_type, inputs, ret) = func_call.decompose();
230                let inputs = inputs
231                    .into_iter()
232                    .map(|expr| self.rewrite_expr(expr))
233                    .collect();
234                FunctionCall::new_unchecked(func_type, inputs, ret).into()
235            }
236        }
237        let mut rewriter = Rewriter {
238            primary_to_secondary_mapping,
239            function_mapping,
240        };
241
242        let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
243
244        Self::new(
245            new_output_col_idx,
246            index_table_catalog,
247            vec![],
248            self.ctx.clone(),
249            new_predicate,
250            self.as_of.clone(),
251        )
252    }
253
254    /// Create a `LogicalScan` node. Used internally by optimizer.
255    #[allow(clippy::too_many_arguments)]
256    pub(crate) fn new(
257        output_col_idx: Vec<usize>, // the column index in the table
258        table_catalog: Arc<TableCatalog>,
259        table_indexes: Vec<Arc<TableIndex>>,
260        ctx: OptimizerContextRef,
261        predicate: Condition, // refers to column indexes of the table
262        as_of: Option<AsOf>,
263    ) -> Self {
264        Self::new_inner(
265            output_col_idx,
266            table_catalog,
267            table_indexes,
268            ctx,
269            predicate,
270            as_of,
271        )
272    }
273
274    pub(crate) fn new_inner(
275        output_col_idx: Vec<usize>, // the column index in the table
276        table_catalog: Arc<TableCatalog>,
277        table_indexes: Vec<Arc<TableIndex>>,
278        ctx: OptimizerContextRef,
279        predicate: Condition, // refers to column indexes of the table
280        as_of: Option<AsOf>,
281    ) -> Self {
282        // here we have 3 concepts
283        // 1. column_id: ColumnId, stored in catalog and a ID to access data from storage.
284        // 2. table_idx: usize, column index in the TableDesc or tableCatalog.
285        // 3. operator_idx: usize, column index in the ScanOperator's schema.
286        // In a query we get the same version of catalog, so the mapping from column_id and
287        // table_idx will not change. And the `required_col_idx` is the `table_idx` of the
288        // required columns, i.e., the mapping from operator_idx to table_idx.
289
290        let mut required_col_idx = output_col_idx.clone();
291        let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
292        predicate_col_idx.ones().for_each(|idx| {
293            if !required_col_idx.contains(&idx) {
294                required_col_idx.push(idx);
295            }
296        });
297
298        Self {
299            required_col_idx,
300            output_col_idx,
301            table_catalog,
302            table_indexes,
303            predicate,
304            as_of,
305            ctx,
306        }
307    }
308
309    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
310        Pretty::Array(
311            match verbose {
312                true => self.column_names_with_table_prefix(),
313                false => self.column_names(),
314            }
315            .into_iter()
316            .map(Pretty::from)
317            .collect(),
318        )
319    }
320
321    pub(crate) fn fields_pretty_schema(&self) -> Schema {
322        let fields = self
323            .table_catalog
324            .columns
325            .iter()
326            .map(|col| Field::from_with_table_name_prefix(&col.column_desc, self.table_name()))
327            .collect();
328        Schema { fields }
329    }
330}
331
332impl GenericPlanNode for TableScan {
333    fn schema(&self) -> Schema {
334        let fields = self
335            .output_col_idx
336            .iter()
337            .map(|tb_idx| {
338                let col = &self.get_table_columns()[*tb_idx];
339                Field::from_with_table_name_prefix(&col.column_desc, self.table_name())
340            })
341            .collect();
342        Schema { fields }
343    }
344
345    fn stream_key(&self) -> Option<Vec<usize>> {
346        if matches!(self.table_catalog.table_type, TableType::Internal) {
347            return None;
348        }
349        let id_to_op_idx =
350            Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_catalog);
351        self.table_catalog
352            .stream_key
353            .iter()
354            .map(|&c| {
355                id_to_op_idx
356                    .get(&self.table_catalog.columns[c].column_id)
357                    .copied()
358            })
359            .collect::<Option<Vec<_>>>()
360    }
361
362    fn ctx(&self) -> OptimizerContextRef {
363        self.ctx.clone()
364    }
365
366    fn functional_dependency(&self) -> FunctionalDependencySet {
367        let pk_indices = self.stream_key();
368        let col_num = self.output_col_idx.len();
369        match &pk_indices {
370            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
371            None => FunctionalDependencySet::new(col_num),
372        }
373    }
374}
375
376impl TableScan {
377    pub fn get_table_columns(&self) -> &[ColumnCatalog] {
378        &self.table_catalog.columns
379    }
380
381    pub fn append_only(&self) -> bool {
382        self.table_catalog.append_only
383    }
384
385    /// Get the descs of the output columns.
386    pub fn column_descs(&self) -> Vec<ColumnDesc> {
387        self.output_col_idx
388            .iter()
389            .map(|&i| self.get_table_columns()[i].column_desc.clone())
390            .collect()
391    }
392
393    /// Helper function to create a mapping from `column_id` to `operator_idx`
394    pub fn get_id_to_op_idx_mapping(
395        output_col_idx: &[usize],
396        table_catalog: &TableCatalog,
397    ) -> HashMap<ColumnId, usize> {
398        ColumnDesc::get_id_to_op_idx_mapping(&table_catalog.columns, Some(output_col_idx))
399    }
400}