risingwave_frontend/optimizer/plan_node/generic/
log_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
22use risingwave_common::types::DataType;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_hummock_sdk::HummockVersionId;
26
27use crate::TableCatalog;
28use crate::catalog::ColumnId;
29use crate::optimizer::optimizer_context::OptimizerContextRef;
30use crate::optimizer::property::Order;
31use crate::utils::ColIndexMappingRewriteExt;
32
33const OP_NAME: &str = "op";
34const OP_TYPE: DataType = DataType::Varchar;
35
36#[derive(Debug, Clone, Educe)]
37#[educe(PartialEq, Eq, Hash)]
38pub struct LogScan {
39    pub table_name: String,
40    /// Include `output_col_idx` and `op_column`
41    pub output_col_idx: Vec<usize>,
42    /// Descriptor of the table
43    pub table: Arc<TableCatalog>,
44    /// Help `RowSeqLogScan` executor use a better chunk size
45    pub chunk_size: Option<u32>,
46
47    #[educe(PartialEq(ignore))]
48    #[educe(Hash(ignore))]
49    pub ctx: OptimizerContextRef,
50
51    pub epoch_range: (u64, u64),
52    pub version_id: HummockVersionId,
53}
54
55impl LogScan {
56    // Used for create batch exec, without op
57    pub fn output_column_ids(&self) -> Vec<ColumnId> {
58        self.output_col_idx
59            .iter()
60            .map(|i| self.table.columns[*i].column_desc.column_id)
61            .collect()
62    }
63
64    pub fn primary_key(&self) -> &[ColumnOrder] {
65        &self.table.pk
66    }
67
68    fn column_names_with_table_prefix(&self) -> Vec<String> {
69        let mut out_column_names: Vec<_> = self
70            .output_col_idx
71            .iter()
72            .map(|&i| format!("{}.{}", self.table_name, self.table.columns[i].name))
73            .collect();
74        out_column_names.push(format!("{}.{}", self.table_name, OP_NAME));
75        out_column_names
76    }
77
78    pub(crate) fn column_names(&self) -> Vec<String> {
79        let mut out_column_names: Vec<_> = self
80            .output_col_idx
81            .iter()
82            .map(|&i| self.table.columns[i].name.clone())
83            .collect();
84        out_column_names.push(OP_NAME.to_owned());
85        out_column_names
86    }
87
88    pub fn distribution_key(&self) -> Option<Vec<usize>> {
89        let tb_idx_to_op_idx = self
90            .output_col_idx
91            .iter()
92            .enumerate()
93            .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
94            .collect::<HashMap<_, _>>();
95        self.table
96            .distribution_key
97            .iter()
98            .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
99            .collect()
100    }
101
102    /// Create a logical scan node for log table scan
103    pub(crate) fn new(
104        table_name: String,
105        output_col_idx: Vec<usize>,
106        table: Arc<TableCatalog>,
107        ctx: OptimizerContextRef,
108        epoch_range: (u64, u64),
109        version_id: HummockVersionId,
110    ) -> Self {
111        Self {
112            table_name,
113            output_col_idx,
114            table,
115            chunk_size: None,
116            ctx,
117            epoch_range,
118            version_id,
119        }
120    }
121
122    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
123        Pretty::Array(
124            match verbose {
125                true => self.column_names_with_table_prefix(),
126                false => self.column_names(),
127            }
128            .into_iter()
129            .map(Pretty::from)
130            .collect(),
131        )
132    }
133
134    pub(crate) fn schema(&self) -> Schema {
135        let mut fields: Vec<_> = self
136            .output_col_idx
137            .iter()
138            .map(|tb_idx| {
139                let col = &self.table.columns[*tb_idx];
140                Field::from_with_table_name_prefix(col, &self.table_name)
141            })
142            .collect();
143        fields.push(Field::with_name(
144            OP_TYPE,
145            format!("{}.{}", &self.table_name, OP_NAME),
146        ));
147        Schema { fields }
148    }
149
150    pub(crate) fn out_fields(&self) -> FixedBitSet {
151        let mut out_fields_vec = self.output_col_idx.clone();
152        // add op column
153        out_fields_vec.push(self.output_col_idx.len());
154        FixedBitSet::from_iter(out_fields_vec)
155    }
156
157    pub(crate) fn ctx(&self) -> OptimizerContextRef {
158        self.ctx.clone()
159    }
160
161    pub fn get_table_columns(&self) -> &[ColumnCatalog] {
162        &self.table.columns
163    }
164
165    pub(crate) fn order_names(&self) -> Vec<String> {
166        self.table
167            .order_column_indices()
168            .map(|i| self.get_table_columns()[i].name.clone())
169            .collect()
170    }
171
172    pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
173        self.table
174            .order_column_indices()
175            .map(|i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
176            .collect()
177    }
178
179    /// Return indices of fields the output is ordered by and
180    /// corresponding direction
181    pub fn get_out_column_index_order(&self) -> Order {
182        let id_to_tb_idx = self.table.get_id_to_op_idx_mapping();
183        let order = Order::new(
184            self.table
185                .pk
186                .iter()
187                .map(|order| {
188                    let idx = id_to_tb_idx
189                        .get(&self.table.columns[order.column_index].column_id)
190                        .unwrap();
191                    ColumnOrder::new(*idx, order.order_type)
192                })
193                .collect(),
194        );
195        self.i2o_col_mapping().rewrite_provided_order(&order)
196    }
197
198    /// get the Mapping of columnIndex from internal column index to output column index
199    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
200        ColIndexMapping::with_remaining_columns(
201            &self.output_col_idx,
202            self.get_table_columns().len(),
203        )
204    }
205}