risingwave_frontend/optimizer/plan_node/generic/
log_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::rc::Rc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
22use risingwave_common::types::DataType;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_hummock_sdk::HummockVersionId;
26
27use crate::catalog::ColumnId;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::Order;
30use crate::utils::ColIndexMappingRewriteExt;
31
32const OP_NAME: &str = "op";
33const OP_TYPE: DataType = DataType::Varchar;
34
35#[derive(Debug, Clone, Educe)]
36#[educe(PartialEq, Eq, Hash)]
37pub struct LogScan {
38    pub table_name: String,
39    /// Include `output_col_idx` and `op_column`
40    pub output_col_idx: Vec<usize>,
41    /// Descriptor of the table
42    pub table_desc: Rc<TableDesc>,
43    /// Help `RowSeqLogScan` executor use a better chunk size
44    pub chunk_size: Option<u32>,
45
46    #[educe(PartialEq(ignore))]
47    #[educe(Hash(ignore))]
48    pub ctx: OptimizerContextRef,
49
50    pub epoch_range: (u64, u64),
51    pub version_id: HummockVersionId,
52}
53
54impl LogScan {
55    // Used for create batch exec, without op
56    pub fn output_column_ids(&self) -> Vec<ColumnId> {
57        self.output_col_idx
58            .iter()
59            .map(|i| self.table_desc.columns[*i].column_id)
60            .collect()
61    }
62
63    pub fn primary_key(&self) -> &[ColumnOrder] {
64        &self.table_desc.pk
65    }
66
67    fn column_names_with_table_prefix(&self) -> Vec<String> {
68        let mut out_column_names: Vec<_> = self
69            .output_col_idx
70            .iter()
71            .map(|&i| format!("{}.{}", self.table_name, self.table_desc.columns[i].name))
72            .collect();
73        out_column_names.push(format!("{}.{}", self.table_name, OP_NAME));
74        out_column_names
75    }
76
77    pub(crate) fn column_names(&self) -> Vec<String> {
78        let mut out_column_names: Vec<_> = self
79            .output_col_idx
80            .iter()
81            .map(|&i| self.table_desc.columns[i].name.clone())
82            .collect();
83        out_column_names.push(OP_NAME.to_owned());
84        out_column_names
85    }
86
87    pub fn distribution_key(&self) -> Option<Vec<usize>> {
88        let tb_idx_to_op_idx = self
89            .output_col_idx
90            .iter()
91            .enumerate()
92            .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
93            .collect::<HashMap<_, _>>();
94        self.table_desc
95            .distribution_key
96            .iter()
97            .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
98            .collect()
99    }
100
101    /// Create a logical scan node for log table scan
102    pub(crate) fn new(
103        table_name: String,
104        output_col_idx: Vec<usize>,
105        table_desc: Rc<TableDesc>,
106        ctx: OptimizerContextRef,
107        epoch_range: (u64, u64),
108        version_id: HummockVersionId,
109    ) -> Self {
110        Self {
111            table_name,
112            output_col_idx,
113            table_desc,
114            chunk_size: None,
115            ctx,
116            epoch_range,
117            version_id,
118        }
119    }
120
121    pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
122        Pretty::Array(
123            match verbose {
124                true => self.column_names_with_table_prefix(),
125                false => self.column_names(),
126            }
127            .into_iter()
128            .map(Pretty::from)
129            .collect(),
130        )
131    }
132
133    pub(crate) fn schema(&self) -> Schema {
134        let mut fields: Vec<_> = self
135            .output_col_idx
136            .iter()
137            .map(|tb_idx| {
138                let col = &self.table_desc.columns[*tb_idx];
139                Field::from_with_table_name_prefix(col, &self.table_name)
140            })
141            .collect();
142        fields.push(Field::with_name(
143            OP_TYPE,
144            format!("{}.{}", &self.table_name, OP_NAME),
145        ));
146        Schema { fields }
147    }
148
149    pub(crate) fn out_fields(&self) -> FixedBitSet {
150        let mut out_fields_vec = self.output_col_idx.clone();
151        // add op column
152        out_fields_vec.push(self.output_col_idx.len());
153        FixedBitSet::from_iter(out_fields_vec)
154    }
155
156    pub(crate) fn ctx(&self) -> OptimizerContextRef {
157        self.ctx.clone()
158    }
159
160    pub fn get_table_columns(&self) -> &[ColumnDesc] {
161        &self.table_desc.columns
162    }
163
164    pub(crate) fn order_names(&self) -> Vec<String> {
165        self.table_desc
166            .order_column_indices()
167            .iter()
168            .map(|&i| self.get_table_columns()[i].name.clone())
169            .collect()
170    }
171
172    pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
173        self.table_desc
174            .order_column_indices()
175            .iter()
176            .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
177            .collect()
178    }
179
180    /// Return indices of fields the output is ordered by and
181    /// corresponding direction
182    pub fn get_out_column_index_order(&self) -> Order {
183        let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping();
184        let order = Order::new(
185            self.table_desc
186                .pk
187                .iter()
188                .map(|order| {
189                    let idx = id_to_tb_idx
190                        .get(&self.table_desc.columns[order.column_index].column_id)
191                        .unwrap();
192                    ColumnOrder::new(*idx, order.order_type)
193                })
194                .collect(),
195        );
196        self.i2o_col_mapping().rewrite_provided_order(&order)
197    }
198
199    /// get the Mapping of columnIndex from internal column index to output column index
200    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
201        ColIndexMapping::with_remaining_columns(
202            &self.output_col_idx,
203            self.get_table_columns().len(),
204        )
205    }
206}