risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use educe::Educe;
20use fixedbitset::FixedBitSet;
21use pretty_xmlish::Pretty;
22use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::GenericPlanNode;
28use crate::TableCatalog;
29use crate::catalog::table_catalog::TableType;
30use crate::catalog::{ColumnId, IndexCatalog};
31use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
32use crate::optimizer::optimizer_context::OptimizerContextRef;
33use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
34use crate::utils::{ColIndexMappingRewriteExt, Condition};
35
36/// [`TableScan`] returns contents of a RisingWave Table.
37#[derive(Debug, Clone, Educe)]
38#[educe(PartialEq, Eq, Hash)]
39pub struct TableScan {
40    pub table_name: String,
41    /// Include `output_col_idx` and columns required in `predicate`
42    pub required_col_idx: Vec<usize>,
43    pub output_col_idx: Vec<usize>,
44    /// Table Catalog of the upstream table that the descriptor is derived from.
45    pub table_catalog: Arc<TableCatalog>,
46    // FIXME(kwannoel): Currently many places in the code reference this,
47    // but now we have table catalog.
48    // We should remove this and use table catalog in those call-sites instead.
49    // It's introduced in https://github.com/risingwavelabs/risingwave/pull/13622.
50    // We kept this field to avoid extensive refactor in that PR.
51    /// Table Desc (subset of table catalog).
52    pub table_desc: Rc<TableDesc>,
53    /// Descriptors of all indexes on this table
54    pub indexes: Vec<Rc<IndexCatalog>>,
55    /// The pushed down predicates. It refers to column indexes of the table.
56    pub predicate: Condition,
57    /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join.
58    /// syntax `FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00'` is used for iceberg.
59    /// syntax `FOR SYSTEM_TIME AS OF 499162860` is used for iceberg.
60    /// syntax `FOR SYSTEM_VERSION AS OF 10963874102873;` is used for iceberg.
61    pub as_of: Option<AsOf>,
62    /// The cardinality of the table **without** applying the predicate.
63    pub table_cardinality: Cardinality,
64    #[educe(PartialEq(ignore))]
65    #[educe(Hash(ignore))]
66    pub ctx: OptimizerContextRef,
67}
68
69impl TableScan {
70    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
71        self.predicate = self.predicate.clone().rewrite_expr(r);
72    }
73
74    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
75        self.predicate.visit_expr(v);
76    }
77
78    /// The mapped distribution key of the scan operator.
79    ///
80    /// The column indices in it is the position in the `output_col_idx`, instead of the position
81    /// in all the columns of the table (which is the table's distribution key).
82    ///
83    /// Return `None` if the table's distribution key are not all in the `output_col_idx`.
84    pub fn distribution_key(&self) -> Option<Vec<usize>> {
85        let tb_idx_to_op_idx = self
86            .output_col_idx
87            .iter()
88            .enumerate()
89            .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
90            .collect::<HashMap<_, _>>();
91        self.table_desc
92            .distribution_key
93            .iter()
94            .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
95            .collect()
96    }
97
98    /// Get the ids of the output columns.
99    pub fn output_column_ids(&self) -> Vec<ColumnId> {
100        self.output_col_idx
101            .iter()
102            .map(|i| self.get_table_columns()[*i].column_id)
103            .collect()
104    }
105
106    pub fn primary_key(&self) -> &[ColumnOrder] {
107        &self.table_desc.pk
108    }
109
110    pub fn watermark_columns(&self) -> WatermarkColumns {
111        // TODO(rc): For now, we still use `FixedBitSet` for watermark columns in `TableDesc`.
112        // So when we scan from a table, we have to conservatively assign each watermark column
113        // a separate watermark group. We should record the watermark group information in
114        // `TableDesc` later.
115        let mut watermark_columns = WatermarkColumns::new();
116        for idx in self.table_desc.watermark_columns.ones() {
117            watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
118        }
119        watermark_columns.map_clone(&self.i2o_col_mapping())
120    }
121
122    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
123        self.output_col_idx
124            .iter()
125            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
126            .collect()
127    }
128
129    pub(crate) fn column_names(&self) -> Vec<String> {
130        self.output_col_idx
131            .iter()
132            .map(|&i| self.get_table_columns()[i].name.clone())
133            .collect()
134    }
135
136    pub(crate) fn out_fields(&self) -> FixedBitSet {
137        let out_fields_vec = self.output_col_idx.clone();
138        FixedBitSet::from_iter(out_fields_vec)
139    }
140
141    pub(crate) fn order_names(&self) -> Vec<String> {
142        self.table_desc
143            .order_column_indices()
144            .iter()
145            .map(|&i| self.get_table_columns()[i].name.clone())
146            .collect()
147    }
148
149    pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
150        self.table_desc
151            .order_column_indices()
152            .iter()
153            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
154            .collect()
155    }
156
157    /// Return indices of fields the output is ordered by and
158    /// corresponding direction
159    pub fn get_out_column_index_order(&self) -> Order {
160        let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping();
161        let order = Order::new(
162            self.table_desc
163                .pk
164                .iter()
165                .map(|order| {
166                    let idx = id_to_tb_idx
167                        .get(&self.table_desc.columns[order.column_index].column_id)
168                        .unwrap();
169                    ColumnOrder::new(*idx, order.order_type)
170                })
171                .collect(),
172        );
173        self.i2o_col_mapping().rewrite_provided_order(&order)
174    }
175
176    /// get the Mapping of columnIndex from internal column index to output column index
177    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
178        ColIndexMapping::with_remaining_columns(
179            &self.output_col_idx,
180            self.get_table_columns().len(),
181        )
182    }
183
184    /// Get the ids of the output columns and primary key columns.
185    pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
186        let mut ids = self.output_column_ids();
187        for column_order in self.primary_key() {
188            let id = self.get_table_columns()[column_order.column_index].column_id;
189            if !ids.contains(&id) {
190                ids.push(id);
191            }
192        }
193        ids
194    }
195
196    /// Prerequisite: the caller should guarantee that `primary_to_secondary_mapping` must cover the
197    /// scan.
198    pub fn to_index_scan(
199        &self,
200        index_name: &str,
201        index_table_catalog: Arc<TableCatalog>,
202        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
203        function_mapping: &HashMap<FunctionCall, usize>,
204    ) -> Self {
205        let new_output_col_idx = self
206            .output_col_idx
207            .iter()
208            .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
209            .collect();
210
211        struct Rewriter<'a> {
212            primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
213            function_mapping: &'a HashMap<FunctionCall, usize>,
214        }
215        impl ExprRewriter for Rewriter<'_> {
216            fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
217                InputRef::new(
218                    *self
219                        .primary_to_secondary_mapping
220                        .get(&input_ref.index)
221                        .unwrap(),
222                    input_ref.return_type(),
223                )
224                .into()
225            }
226
227            fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
228                if let Some(index) = self.function_mapping.get(&func_call) {
229                    return InputRef::new(*index, func_call.return_type()).into();
230                }
231
232                let (func_type, inputs, ret) = func_call.decompose();
233                let inputs = inputs
234                    .into_iter()
235                    .map(|expr| self.rewrite_expr(expr))
236                    .collect();
237                FunctionCall::new_unchecked(func_type, inputs, ret).into()
238            }
239        }
240        let mut rewriter = Rewriter {
241            primary_to_secondary_mapping,
242            function_mapping,
243        };
244
245        let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
246
247        Self::new(
248            index_name.to_owned(),
249            new_output_col_idx,
250            index_table_catalog,
251            vec![],
252            self.ctx.clone(),
253            new_predicate,
254            self.as_of.clone(),
255            self.table_cardinality,
256        )
257    }
258
259    /// Create a `LogicalScan` node. Used internally by optimizer.
260    #[allow(clippy::too_many_arguments)]
261    pub(crate) fn new(
262        table_name: String,
263        output_col_idx: Vec<usize>, // the column index in the table
264        table_catalog: Arc<TableCatalog>,
265        indexes: Vec<Rc<IndexCatalog>>,
266        ctx: OptimizerContextRef,
267        predicate: Condition, // refers to column indexes of the table
268        as_of: Option<AsOf>,
269        table_cardinality: Cardinality,
270    ) -> Self {
271        Self::new_inner(
272            table_name,
273            output_col_idx,
274            table_catalog,
275            indexes,
276            ctx,
277            predicate,
278            as_of,
279            table_cardinality,
280        )
281    }
282
283    #[allow(clippy::too_many_arguments)]
284    pub(crate) fn new_inner(
285        table_name: String,
286        output_col_idx: Vec<usize>, // the column index in the table
287        table_catalog: Arc<TableCatalog>,
288        indexes: Vec<Rc<IndexCatalog>>,
289        ctx: OptimizerContextRef,
290        predicate: Condition, // refers to column indexes of the table
291        as_of: Option<AsOf>,
292        table_cardinality: Cardinality,
293    ) -> Self {
294        // here we have 3 concepts
295        // 1. column_id: ColumnId, stored in catalog and a ID to access data from storage.
296        // 2. table_idx: usize, column index in the TableDesc or tableCatalog.
297        // 3. operator_idx: usize, column index in the ScanOperator's schema.
298        // In a query we get the same version of catalog, so the mapping from column_id and
299        // table_idx will not change. And the `required_col_idx` is the `table_idx` of the
300        // required columns, i.e., the mapping from operator_idx to table_idx.
301
302        let mut required_col_idx = output_col_idx.clone();
303        let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
304        predicate_col_idx.ones().for_each(|idx| {
305            if !required_col_idx.contains(&idx) {
306                required_col_idx.push(idx);
307            }
308        });
309
310        let table_desc = Rc::new(table_catalog.table_desc());
311
312        Self {
313            table_name,
314            required_col_idx,
315            output_col_idx,
316            table_catalog,
317            table_desc,
318            indexes,
319            predicate,
320            as_of,
321            table_cardinality,
322            ctx,
323        }
324    }
325
326    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
327        Pretty::Array(
328            match verbose {
329                true => self.column_names_with_table_prefix(),
330                false => self.column_names(),
331            }
332            .into_iter()
333            .map(Pretty::from)
334            .collect(),
335        )
336    }
337
338    pub(crate) fn fields_pretty_schema(&self) -> Schema {
339        let fields = self
340            .table_desc
341            .columns
342            .iter()
343            .map(|col| Field::from_with_table_name_prefix(col, &self.table_name))
344            .collect();
345        Schema { fields }
346    }
347}
348
349impl GenericPlanNode for TableScan {
350    fn schema(&self) -> Schema {
351        let fields = self
352            .output_col_idx
353            .iter()
354            .map(|tb_idx| {
355                let col = &self.get_table_columns()[*tb_idx];
356                Field::from_with_table_name_prefix(col, &self.table_name)
357            })
358            .collect();
359        Schema { fields }
360    }
361
362    fn stream_key(&self) -> Option<Vec<usize>> {
363        if matches!(self.table_catalog.table_type, TableType::Internal) {
364            return None;
365        }
366        let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc);
367        self.table_desc
368            .stream_key
369            .iter()
370            .map(|&c| {
371                id_to_op_idx
372                    .get(&self.table_desc.columns[c].column_id)
373                    .copied()
374            })
375            .collect::<Option<Vec<_>>>()
376    }
377
378    fn ctx(&self) -> OptimizerContextRef {
379        self.ctx.clone()
380    }
381
382    fn functional_dependency(&self) -> FunctionalDependencySet {
383        let pk_indices = self.stream_key();
384        let col_num = self.output_col_idx.len();
385        match &pk_indices {
386            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
387            None => FunctionalDependencySet::new(col_num),
388        }
389    }
390}
391
392impl TableScan {
393    pub fn get_table_columns(&self) -> &[ColumnDesc] {
394        &self.table_desc.columns
395    }
396
397    pub fn append_only(&self) -> bool {
398        self.table_desc.append_only
399    }
400
401    /// Get the descs of the output columns.
402    pub fn column_descs(&self) -> Vec<ColumnDesc> {
403        self.output_col_idx
404            .iter()
405            .map(|&i| self.get_table_columns()[i].clone())
406            .collect()
407    }
408
409    /// Helper function to create a mapping from `column_id` to `operator_idx`
410    pub fn get_id_to_op_idx_mapping(
411        output_col_idx: &[usize],
412        table_desc: &Rc<TableDesc>,
413    ) -> HashMap<ColumnId, usize> {
414        let mut id_to_op_idx = HashMap::new();
415        output_col_idx
416            .iter()
417            .enumerate()
418            .for_each(|(op_idx, tb_idx)| {
419                let col = &table_desc.columns[*tb_idx];
420                id_to_op_idx.insert(col.column_id, op_idx);
421            });
422        id_to_op_idx
423    }
424}