risingwave_frontend/optimizer/plan_node/generic/
log_scan.rs1use std::collections::HashMap;
16use std::rc::Rc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
22use risingwave_common::types::DataType;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_hummock_sdk::HummockVersionId;
26
27use crate::catalog::ColumnId;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::Order;
30use crate::utils::ColIndexMappingRewriteExt;
31
32const OP_NAME: &str = "op";
33const OP_TYPE: DataType = DataType::Varchar;
34
35#[derive(Debug, Clone, Educe)]
36#[educe(PartialEq, Eq, Hash)]
37pub struct LogScan {
38 pub table_name: String,
39 pub output_col_idx: Vec<usize>,
41 pub table_desc: Rc<TableDesc>,
43 pub chunk_size: Option<u32>,
45
46 #[educe(PartialEq(ignore))]
47 #[educe(Hash(ignore))]
48 pub ctx: OptimizerContextRef,
49
50 pub epoch_range: (u64, u64),
51 pub version_id: HummockVersionId,
52}
53
54impl LogScan {
55 pub fn output_column_ids(&self) -> Vec<ColumnId> {
57 self.output_col_idx
58 .iter()
59 .map(|i| self.table_desc.columns[*i].column_id)
60 .collect()
61 }
62
63 pub fn primary_key(&self) -> &[ColumnOrder] {
64 &self.table_desc.pk
65 }
66
67 fn column_names_with_table_prefix(&self) -> Vec<String> {
68 let mut out_column_names: Vec<_> = self
69 .output_col_idx
70 .iter()
71 .map(|&i| format!("{}.{}", self.table_name, self.table_desc.columns[i].name))
72 .collect();
73 out_column_names.push(format!("{}.{}", self.table_name, OP_NAME));
74 out_column_names
75 }
76
77 pub(crate) fn column_names(&self) -> Vec<String> {
78 let mut out_column_names: Vec<_> = self
79 .output_col_idx
80 .iter()
81 .map(|&i| self.table_desc.columns[i].name.clone())
82 .collect();
83 out_column_names.push(OP_NAME.to_owned());
84 out_column_names
85 }
86
87 pub fn distribution_key(&self) -> Option<Vec<usize>> {
88 let tb_idx_to_op_idx = self
89 .output_col_idx
90 .iter()
91 .enumerate()
92 .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
93 .collect::<HashMap<_, _>>();
94 self.table_desc
95 .distribution_key
96 .iter()
97 .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
98 .collect()
99 }
100
101 pub(crate) fn new(
103 table_name: String,
104 output_col_idx: Vec<usize>,
105 table_desc: Rc<TableDesc>,
106 ctx: OptimizerContextRef,
107 epoch_range: (u64, u64),
108 version_id: HummockVersionId,
109 ) -> Self {
110 Self {
111 table_name,
112 output_col_idx,
113 table_desc,
114 chunk_size: None,
115 ctx,
116 epoch_range,
117 version_id,
118 }
119 }
120
121 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
122 Pretty::Array(
123 match verbose {
124 true => self.column_names_with_table_prefix(),
125 false => self.column_names(),
126 }
127 .into_iter()
128 .map(Pretty::from)
129 .collect(),
130 )
131 }
132
133 pub(crate) fn schema(&self) -> Schema {
134 let mut fields: Vec<_> = self
135 .output_col_idx
136 .iter()
137 .map(|tb_idx| {
138 let col = &self.table_desc.columns[*tb_idx];
139 Field::from_with_table_name_prefix(col, &self.table_name)
140 })
141 .collect();
142 fields.push(Field::with_name(
143 OP_TYPE,
144 format!("{}.{}", &self.table_name, OP_NAME),
145 ));
146 Schema { fields }
147 }
148
149 pub(crate) fn out_fields(&self) -> FixedBitSet {
150 let mut out_fields_vec = self.output_col_idx.clone();
151 out_fields_vec.push(self.output_col_idx.len());
153 FixedBitSet::from_iter(out_fields_vec)
154 }
155
156 pub(crate) fn ctx(&self) -> OptimizerContextRef {
157 self.ctx.clone()
158 }
159
160 pub fn get_table_columns(&self) -> &[ColumnDesc] {
161 &self.table_desc.columns
162 }
163
164 pub(crate) fn order_names(&self) -> Vec<String> {
165 self.table_desc
166 .order_column_indices()
167 .iter()
168 .map(|&i| self.get_table_columns()[i].name.clone())
169 .collect()
170 }
171
172 pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
173 self.table_desc
174 .order_column_indices()
175 .iter()
176 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
177 .collect()
178 }
179
180 pub fn get_out_column_index_order(&self) -> Order {
183 let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping();
184 let order = Order::new(
185 self.table_desc
186 .pk
187 .iter()
188 .map(|order| {
189 let idx = id_to_tb_idx
190 .get(&self.table_desc.columns[order.column_index].column_id)
191 .unwrap();
192 ColumnOrder::new(*idx, order.order_type)
193 })
194 .collect(),
195 );
196 self.i2o_col_mapping().rewrite_provided_order(&order)
197 }
198
199 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
201 ColIndexMapping::with_remaining_columns(
202 &self.output_col_idx,
203 self.get_table_columns().len(),
204 )
205 }
206}