risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::GenericPlanNode;
27use crate::TableCatalog;
28use crate::catalog::ColumnId;
29use crate::catalog::index_catalog::{TableIndex, VectorIndex};
30use crate::catalog::table_catalog::TableType;
31use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
32use crate::optimizer::optimizer_context::OptimizerContextRef;
33use crate::optimizer::property::{FunctionalDependencySet, Order, WatermarkColumns};
34use crate::utils::{ColIndexMappingRewriteExt, Condition};
35
36/// [`TableScan`] returns contents of a RisingWave Table.
37#[derive(Debug, Clone, Educe)]
38#[educe(PartialEq, Eq, Hash)]
39pub struct TableScan {
40    /// Include `output_col_idx` and columns required in `predicate`
41    pub required_col_idx: Vec<usize>,
42    pub output_col_idx: Vec<usize>,
43    /// Table Catalog of the upstream table that the descriptor is derived from.
44    pub table_catalog: Arc<TableCatalog>,
45    // FIXME(kwannoel): Currently many places in the code reference this,
46    // but now we have table catalog.
47    // We should remove this and use table catalog in those call-sites instead.
48    // It's introduced in https://github.com/risingwavelabs/risingwave/pull/13622.
49    // We kept this field to avoid extensive refactor in that PR.
50    /// Table Desc (subset of table catalog).
51    /// Descriptors of all indexes on this table
52    pub table_indexes: Vec<Arc<TableIndex>>,
53    pub vector_indexes: Vec<Arc<VectorIndex>>,
54    /// Table column indices used as the stream key of this scan operator.
55    table_scan_stream_key: Option<Vec<usize>>,
56    /// The pushed down predicates. It refers to column indexes of the table.
57    pub predicate: Condition,
58    /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join.
59    /// syntax `FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00'` is used for iceberg.
60    /// syntax `FOR SYSTEM_TIME AS OF 499162860` is used for iceberg.
61    /// syntax `FOR SYSTEM_VERSION AS OF 10963874102873;` is used for iceberg.
62    pub as_of: Option<AsOf>,
63    #[educe(PartialEq(ignore))]
64    #[educe(Hash(ignore))]
65    pub ctx: OptimizerContextRef,
66}
67
68impl TableScan {
69    pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
70        self.predicate = self.predicate.clone().rewrite_expr(r);
71    }
72
73    pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
74        self.predicate.visit_expr(v);
75    }
76
77    pub fn table_name(&self) -> &str {
78        self.table_catalog.name()
79    }
80
81    /// The mapped distribution key of the scan operator.
82    ///
83    /// The column indices in it is the position in the `output_col_idx`, instead of the position
84    /// in all the columns of the table (which is the table's distribution key).
85    ///
86    /// Return `None` if the table's distribution key are not all in the `output_col_idx`.
87    pub fn distribution_key(&self) -> Option<Vec<usize>> {
88        let tb_idx_to_op_idx = self
89            .output_col_idx
90            .iter()
91            .enumerate()
92            .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
93            .collect::<HashMap<_, _>>();
94        self.table_catalog
95            .distribution_key
96            .iter()
97            .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
98            .collect()
99    }
100
101    /// Get the ids of the output columns.
102    pub fn output_column_ids(&self) -> Vec<ColumnId> {
103        self.output_col_idx
104            .iter()
105            .map(|i| self.get_table_columns()[*i].column_id)
106            .collect()
107    }
108
109    pub fn primary_key(&self) -> &[ColumnOrder] {
110        &self.table_catalog.pk
111    }
112
113    pub(crate) fn table_scan_stream_key(&self) -> Option<&[usize]> {
114        self.table_scan_stream_key.as_deref()
115    }
116
117    pub(crate) fn extend_table_scan_stream_key_with_primary_key(&mut self) {
118        let primary_key_indices = self
119            .primary_key()
120            .iter()
121            .map(|c| c.column_index)
122            .collect::<Vec<_>>();
123        let Some(table_scan_stream_key) = &mut self.table_scan_stream_key else {
124            #[cfg(debug_assertions)]
125            {
126                panic!(
127                    "table scan stream key should exist before extending with primary key: table: {}, output_col_idx: {:?}, pk: {:?}",
128                    self.table_name(),
129                    self.output_col_idx,
130                    primary_key_indices
131                );
132            }
133            #[cfg(not(debug_assertions))]
134            {
135                tracing::warn!(
136                    table = self.table_name(),
137                    output_col_idx = ?self.output_col_idx,
138                    pk = ?primary_key_indices,
139                    "table scan stream key should exist before extending with primary key"
140                );
141                return;
142            }
143        };
144
145        for primary_key in primary_key_indices {
146            if !table_scan_stream_key.contains(&primary_key) {
147                table_scan_stream_key.push(primary_key);
148            }
149        }
150    }
151
152    pub fn watermark_columns(&self) -> WatermarkColumns {
153        // TODO(rc): For now, we still use `FixedBitSet` for watermark columns in `TableDesc`.
154        // So when we scan from a table, we have to conservatively assign each watermark column
155        // a separate watermark group. We should record the watermark group information in
156        // `TableDesc` later.
157        let mut watermark_columns = WatermarkColumns::new();
158        for idx in self.table_catalog.watermark_columns.ones() {
159            watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
160        }
161        watermark_columns.map_clone(&self.i2o_col_mapping())
162    }
163
164    pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
165        self.output_col_idx
166            .iter()
167            .map(|&i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
168            .collect()
169    }
170
171    pub(crate) fn column_names(&self) -> Vec<String> {
172        self.output_col_idx
173            .iter()
174            .map(|&i| self.get_table_columns()[i].name.clone())
175            .collect()
176    }
177
178    pub(crate) fn out_fields(&self) -> FixedBitSet {
179        let out_fields_vec = self.output_col_idx.clone();
180        FixedBitSet::from_iter(out_fields_vec)
181    }
182
183    pub(crate) fn order_names(&self) -> Vec<String> {
184        self.table_catalog
185            .order_column_indices()
186            .map(|i| self.get_table_columns()[i].name.clone())
187            .collect()
188    }
189
190    pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
191        self.table_catalog
192            .order_column_indices()
193            .map(|i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
194            .collect()
195    }
196
197    /// Return indices of fields the output is ordered by and
198    /// corresponding direction
199    pub fn get_out_column_index_order(&self) -> Order {
200        let id_to_tb_idx = self.table_catalog.get_id_to_op_idx_mapping();
201        let order = Order::new(
202            self.table_catalog
203                .pk
204                .iter()
205                .map(|order| {
206                    let idx = id_to_tb_idx
207                        .get(&self.table_catalog.columns[order.column_index].column_id)
208                        .unwrap();
209                    ColumnOrder::new(*idx, order.order_type)
210                })
211                .collect(),
212        );
213        self.i2o_col_mapping().rewrite_provided_order(&order)
214    }
215
216    /// get the Mapping of columnIndex from internal column index to output column index
217    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
218        ColIndexMapping::with_remaining_columns(
219            &self.output_col_idx,
220            self.get_table_columns().len(),
221        )
222    }
223
224    /// Get the ids of the output columns and primary key columns.
225    pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
226        let mut ids = self.output_column_ids();
227        for column_order in self.primary_key() {
228            let id = self.get_table_columns()[column_order.column_index].column_id;
229            if !ids.contains(&id) {
230                ids.push(id);
231            }
232        }
233        ids
234    }
235
236    /// Prerequisite: the caller should guarantee that `primary_to_secondary_mapping` must cover the
237    /// scan.
238    pub fn to_index_scan(
239        &self,
240        index_table_catalog: Arc<TableCatalog>,
241        primary_to_secondary_mapping: &BTreeMap<usize, usize>,
242        function_mapping: &HashMap<FunctionCall, usize>,
243    ) -> Self {
244        let new_output_col_idx = self
245            .output_col_idx
246            .iter()
247            .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
248            .collect();
249
250        struct Rewriter<'a> {
251            primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
252            function_mapping: &'a HashMap<FunctionCall, usize>,
253        }
254        impl ExprRewriter for Rewriter<'_> {
255            fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
256                InputRef::new(
257                    *self
258                        .primary_to_secondary_mapping
259                        .get(&input_ref.index)
260                        .unwrap(),
261                    input_ref.return_type(),
262                )
263                .into()
264            }
265
266            fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
267                if let Some(index) = self.function_mapping.get(&func_call) {
268                    return InputRef::new(*index, func_call.return_type()).into();
269                }
270
271                let (func_type, inputs, ret) = func_call.decompose();
272                let inputs = inputs
273                    .into_iter()
274                    .map(|expr| self.rewrite_expr(expr))
275                    .collect();
276                FunctionCall::new_unchecked(func_type, inputs, ret).into()
277            }
278        }
279        let mut rewriter = Rewriter {
280            primary_to_secondary_mapping,
281            function_mapping,
282        };
283
284        let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
285
286        Self::new(
287            new_output_col_idx,
288            index_table_catalog,
289            vec![],
290            vec![],
291            self.ctx.clone(),
292            new_predicate,
293            self.as_of.clone(),
294        )
295    }
296
297    /// Create a `LogicalScan` node. Used internally by optimizer.
298    #[allow(clippy::too_many_arguments)]
299    pub(crate) fn new(
300        output_col_idx: Vec<usize>, // the column index in the table
301        table_catalog: Arc<TableCatalog>,
302        table_indexes: Vec<Arc<TableIndex>>,
303        vector_indexes: Vec<Arc<VectorIndex>>,
304        ctx: OptimizerContextRef,
305        predicate: Condition, // refers to column indexes of the table
306        as_of: Option<AsOf>,
307    ) -> Self {
308        let table_scan_stream_key = (!matches!(table_catalog.table_type, TableType::Internal))
309            .then(|| table_catalog.stream_key());
310        Self::new_inner(
311            output_col_idx,
312            table_catalog,
313            table_indexes,
314            vector_indexes,
315            ctx,
316            predicate,
317            as_of,
318            table_scan_stream_key,
319        )
320    }
321
322    pub(crate) fn new_inner(
323        output_col_idx: Vec<usize>, // the column index in the table
324        table_catalog: Arc<TableCatalog>,
325        table_indexes: Vec<Arc<TableIndex>>,
326        vector_indexes: Vec<Arc<VectorIndex>>,
327        ctx: OptimizerContextRef,
328        predicate: Condition, // refers to column indexes of the table
329        as_of: Option<AsOf>,
330        table_scan_stream_key: Option<Vec<usize>>,
331    ) -> Self {
332        // here we have 3 concepts
333        // 1. column_id: ColumnId, stored in catalog and a ID to access data from storage.
334        // 2. table_idx: usize, column index in the TableDesc or tableCatalog.
335        // 3. operator_idx: usize, column index in the ScanOperator's schema.
336        // In a query we get the same version of catalog, so the mapping from column_id and
337        // table_idx will not change. And the `required_col_idx` is the `table_idx` of the
338        // required columns, i.e., the mapping from operator_idx to table_idx.
339
340        let mut required_col_idx = output_col_idx.clone();
341        let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
342        predicate_col_idx.ones().for_each(|idx| {
343            if !required_col_idx.contains(&idx) {
344                required_col_idx.push(idx);
345            }
346        });
347
348        Self {
349            required_col_idx,
350            output_col_idx,
351            table_catalog,
352            table_indexes,
353            vector_indexes,
354            table_scan_stream_key,
355            predicate,
356            as_of,
357            ctx,
358        }
359    }
360
361    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
362        Pretty::Array(
363            match verbose {
364                true => self.column_names_with_table_prefix(),
365                false => self.column_names(),
366            }
367            .into_iter()
368            .map(Pretty::from)
369            .collect(),
370        )
371    }
372
373    pub(crate) fn fields_pretty_schema(&self) -> Schema {
374        let fields = self
375            .table_catalog
376            .columns
377            .iter()
378            .map(|col| Field::from_with_table_name_prefix(&col.column_desc, self.table_name()))
379            .collect();
380        Schema { fields }
381    }
382
383    // Check if the scan is cross-database
384    pub(crate) fn cross_database(&self) -> bool {
385        self.table_catalog.database_id != self.ctx().session_ctx().database_id()
386    }
387}
388
389impl GenericPlanNode for TableScan {
390    fn schema(&self) -> Schema {
391        let fields = self
392            .output_col_idx
393            .iter()
394            .map(|tb_idx| {
395                let col = &self.get_table_columns()[*tb_idx];
396                Field::from_with_table_name_prefix(&col.column_desc, self.table_name())
397            })
398            .collect();
399        Schema { fields }
400    }
401
402    fn stream_key(&self) -> Option<Vec<usize>> {
403        let stream_key = self.table_scan_stream_key.as_deref()?;
404        let id_to_op_idx =
405            Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_catalog);
406        stream_key
407            .iter()
408            .map(|&c| {
409                id_to_op_idx
410                    .get(&self.table_catalog.columns[c].column_id)
411                    .copied()
412            })
413            .collect()
414    }
415
416    fn ctx(&self) -> OptimizerContextRef {
417        self.ctx.clone()
418    }
419
420    fn functional_dependency(&self) -> FunctionalDependencySet {
421        let pk_indices = self.stream_key();
422        let col_num = self.output_col_idx.len();
423        match &pk_indices {
424            Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
425            None => FunctionalDependencySet::new(col_num),
426        }
427    }
428}
429
430impl TableScan {
431    pub fn get_table_columns(&self) -> &[ColumnCatalog] {
432        &self.table_catalog.columns
433    }
434
435    pub fn append_only(&self) -> bool {
436        self.table_catalog.append_only
437    }
438
439    /// Get the descs of the output columns.
440    pub fn column_descs(&self) -> Vec<ColumnDesc> {
441        self.output_col_idx
442            .iter()
443            .map(|&i| self.get_table_columns()[i].column_desc.clone())
444            .collect()
445    }
446
447    /// Helper function to create a mapping from `column_id` to `operator_idx`
448    pub fn get_id_to_op_idx_mapping(
449        output_col_idx: &[usize],
450        table_catalog: &TableCatalog,
451    ) -> HashMap<ColumnId, usize> {
452        ColumnDesc::get_id_to_op_idx_mapping(&table_catalog.columns, Some(output_col_idx))
453    }
454}