risingwave_frontend/optimizer/plan_node/generic/
log_scan.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
22use risingwave_common::types::DataType;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_hummock_sdk::HummockVersionId;
26
27use crate::TableCatalog;
28use crate::catalog::ColumnId;
29use crate::optimizer::optimizer_context::OptimizerContextRef;
30use crate::optimizer::property::Order;
31use crate::utils::ColIndexMappingRewriteExt;
32
33const OP_NAME: &str = "op";
34const OP_TYPE: DataType = DataType::Varchar;
35
36#[derive(Debug, Clone, Educe)]
37#[educe(PartialEq, Eq, Hash)]
38pub struct LogScan {
39 pub table_name: String,
40 pub output_col_idx: Vec<usize>,
42 pub table: Arc<TableCatalog>,
44 pub chunk_size: Option<u32>,
46
47 #[educe(PartialEq(ignore))]
48 #[educe(Hash(ignore))]
49 pub ctx: OptimizerContextRef,
50
51 pub epoch_range: (u64, u64),
52 pub version_id: HummockVersionId,
53}
54
55impl LogScan {
56 pub fn output_column_ids(&self) -> Vec<ColumnId> {
58 self.output_col_idx
59 .iter()
60 .map(|i| self.table.columns[*i].column_desc.column_id)
61 .collect()
62 }
63
64 pub fn primary_key(&self) -> &[ColumnOrder] {
65 &self.table.pk
66 }
67
68 fn column_names_with_table_prefix(&self) -> Vec<String> {
69 let mut out_column_names: Vec<_> = self
70 .output_col_idx
71 .iter()
72 .map(|&i| format!("{}.{}", self.table_name, self.table.columns[i].name))
73 .collect();
74 out_column_names.push(format!("{}.{}", self.table_name, OP_NAME));
75 out_column_names
76 }
77
78 pub(crate) fn column_names(&self) -> Vec<String> {
79 let mut out_column_names: Vec<_> = self
80 .output_col_idx
81 .iter()
82 .map(|&i| self.table.columns[i].name.clone())
83 .collect();
84 out_column_names.push(OP_NAME.to_owned());
85 out_column_names
86 }
87
88 pub fn distribution_key(&self) -> Option<Vec<usize>> {
89 let tb_idx_to_op_idx = self
90 .output_col_idx
91 .iter()
92 .enumerate()
93 .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
94 .collect::<HashMap<_, _>>();
95 self.table
96 .distribution_key
97 .iter()
98 .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
99 .collect()
100 }
101
102 pub(crate) fn new(
104 table_name: String,
105 output_col_idx: Vec<usize>,
106 table: Arc<TableCatalog>,
107 ctx: OptimizerContextRef,
108 epoch_range: (u64, u64),
109 version_id: HummockVersionId,
110 ) -> Self {
111 Self {
112 table_name,
113 output_col_idx,
114 table,
115 chunk_size: None,
116 ctx,
117 epoch_range,
118 version_id,
119 }
120 }
121
122 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
123 Pretty::Array(
124 match verbose {
125 true => self.column_names_with_table_prefix(),
126 false => self.column_names(),
127 }
128 .into_iter()
129 .map(Pretty::from)
130 .collect(),
131 )
132 }
133
134 pub(crate) fn schema(&self) -> Schema {
135 let mut fields: Vec<_> = self
136 .output_col_idx
137 .iter()
138 .map(|tb_idx| {
139 let col = &self.table.columns[*tb_idx];
140 Field::from_with_table_name_prefix(col, &self.table_name)
141 })
142 .collect();
143 fields.push(Field::with_name(
144 OP_TYPE,
145 format!("{}.{}", &self.table_name, OP_NAME),
146 ));
147 Schema { fields }
148 }
149
150 pub(crate) fn out_fields(&self) -> FixedBitSet {
151 let mut out_fields_vec = self.output_col_idx.clone();
152 out_fields_vec.push(self.output_col_idx.len());
154 FixedBitSet::from_iter(out_fields_vec)
155 }
156
157 pub(crate) fn ctx(&self) -> OptimizerContextRef {
158 self.ctx.clone()
159 }
160
161 pub fn get_table_columns(&self) -> &[ColumnCatalog] {
162 &self.table.columns
163 }
164
165 pub(crate) fn order_names(&self) -> Vec<String> {
166 self.table
167 .order_column_indices()
168 .map(|i| self.get_table_columns()[i].name.clone())
169 .collect()
170 }
171
172 pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
173 self.table
174 .order_column_indices()
175 .map(|i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
176 .collect()
177 }
178
179 pub fn get_out_column_index_order(&self) -> Order {
182 let id_to_tb_idx = self.table.get_id_to_op_idx_mapping();
183 let order = Order::new(
184 self.table
185 .pk
186 .iter()
187 .map(|order| {
188 let idx = id_to_tb_idx
189 .get(&self.table.columns[order.column_index].column_id)
190 .unwrap();
191 ColumnOrder::new(*idx, order.order_type)
192 })
193 .collect(),
194 );
195 self.i2o_col_mapping().rewrite_provided_order(&order)
196 }
197
198 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
200 ColIndexMapping::with_remaining_columns(
201 &self.output_col_idx,
202 self.get_table_columns().len(),
203 )
204 }
205}