risingwave_frontend/optimizer/plan_node/generic/
log_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::rc::Rc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use itertools::Itertools;
21use pretty_xmlish::Pretty;
22use risingwave_common::catalog::{Field, Schema, TableDesc};
23use risingwave_common::types::DataType;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_hummock_sdk::HummockVersionId;
26
27use crate::catalog::ColumnId;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29
30const OP_NAME: &str = "op";
31const OP_TYPE: DataType = DataType::Varchar;
32
33#[derive(Debug, Clone, Educe)]
34#[educe(PartialEq, Eq, Hash)]
35pub struct LogScan {
36    pub table_name: String,
37    /// Include `output_col_idx_with_out_hidden` and `op_column`
38    pub output_col_idx_with_out_hidden: Vec<usize>,
39    /// Include `output_col_idx_with_out_hidden` and `op_column` and hidden pk
40    pub output_col_idx: Vec<usize>,
41    /// Descriptor of the table
42    pub table_desc: Rc<TableDesc>,
43    /// Help `RowSeqLogScan` executor use a better chunk size
44    pub chunk_size: Option<u32>,
45
46    #[educe(PartialEq(ignore))]
47    #[educe(Hash(ignore))]
48    pub ctx: OptimizerContextRef,
49
50    pub old_epoch: u64,
51    pub new_epoch: u64,
52    pub version_id: HummockVersionId,
53}
54
55impl LogScan {
56    // Used for create batch exec, without op
57    pub fn output_column_ids(&self) -> Vec<ColumnId> {
58        self.output_col_idx
59            .iter()
60            .map(|i| self.table_desc.columns[*i].column_id)
61            .collect()
62    }
63
64    pub fn primary_key(&self) -> &[ColumnOrder] {
65        &self.table_desc.pk
66    }
67
68    fn column_names_with_table_prefix(&self) -> Vec<String> {
69        let mut out_column_names: Vec<_> = self
70            .output_col_idx
71            .iter()
72            .map(|&i| format!("{}.{}", self.table_name, self.table_desc.columns[i].name))
73            .collect();
74        out_column_names.push(format!("{}.{}", self.table_name, OP_NAME));
75        out_column_names
76    }
77
78    pub(crate) fn column_names(&self) -> Vec<String> {
79        let mut out_column_names: Vec<_> = self
80            .output_col_idx
81            .iter()
82            .map(|&i| self.table_desc.columns[i].name.clone())
83            .collect();
84        out_column_names.push(OP_NAME.to_owned());
85        out_column_names
86    }
87
88    pub(crate) fn column_names_without_hidden(&self) -> Vec<String> {
89        let mut out_column_names: Vec<_> = self
90            .output_col_idx_with_out_hidden
91            .iter()
92            .map(|&i| self.table_desc.columns[i].name.clone())
93            .collect();
94        out_column_names.push(OP_NAME.to_owned());
95        out_column_names
96    }
97
98    pub fn distribution_key(&self) -> Option<Vec<usize>> {
99        let tb_idx_to_op_idx = self
100            .output_col_idx
101            .iter()
102            .enumerate()
103            .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
104            .collect::<HashMap<_, _>>();
105        self.table_desc
106            .distribution_key
107            .iter()
108            .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
109            .collect()
110    }
111
112    /// Create a logical scan node for log table scan
113    pub(crate) fn new(
114        table_name: String,
115        output_col_idx_with_out_hidden: Vec<usize>,
116        output_col_idx: Vec<usize>,
117        table_desc: Rc<TableDesc>,
118        ctx: OptimizerContextRef,
119        old_epoch: u64,
120        new_epoch: u64,
121        version_id: HummockVersionId,
122    ) -> Self {
123        Self {
124            table_name,
125            output_col_idx_with_out_hidden,
126            output_col_idx,
127            table_desc,
128            chunk_size: None,
129            ctx,
130            old_epoch,
131            new_epoch,
132            version_id,
133        }
134    }
135
136    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
137        Pretty::Array(
138            match verbose {
139                true => self.column_names_with_table_prefix(),
140                false => self.column_names(),
141            }
142            .into_iter()
143            .map(Pretty::from)
144            .collect(),
145        )
146    }
147
148    pub(crate) fn schema(&self) -> Schema {
149        let mut fields: Vec<_> = self
150            .output_col_idx
151            .iter()
152            .map(|tb_idx| {
153                let col = &self.table_desc.columns[*tb_idx];
154                Field::from_with_table_name_prefix(col, &self.table_name)
155            })
156            .collect();
157        fields.push(Field::with_name(
158            OP_TYPE,
159            format!("{}.{}", &self.table_name, OP_NAME),
160        ));
161        Schema { fields }
162    }
163
164    pub(crate) fn out_fields(&self) -> FixedBitSet {
165        let mut out_fields_vec = self
166            .output_col_idx
167            .iter()
168            .enumerate()
169            .filter_map(|(index, idx)| {
170                if self.output_col_idx_with_out_hidden.contains(idx) {
171                    Some(index)
172                } else {
173                    None
174                }
175            })
176            .collect_vec();
177        // add op column
178        out_fields_vec.push(self.output_col_idx.len());
179        FixedBitSet::from_iter(out_fields_vec)
180    }
181
182    pub(crate) fn ctx(&self) -> OptimizerContextRef {
183        self.ctx.clone()
184    }
185}