risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use educe::Educe;
20use fixedbitset::FixedBitSet;
21use pretty_xmlish::Pretty;
22use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::GenericPlanNode;
28use crate::TableCatalog;
29use crate::catalog::table_catalog::TableType;
30use crate::catalog::{ColumnId, IndexCatalog};
31use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
32use crate::optimizer::optimizer_context::OptimizerContextRef;
33use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
34use crate::utils::{ColIndexMappingRewriteExt, Condition};
35
36#[derive(Debug, Clone, Educe)]
38#[educe(PartialEq, Eq, Hash)]
39pub struct TableScan {
40 pub table_name: String,
41 pub required_col_idx: Vec<usize>,
43 pub output_col_idx: Vec<usize>,
44 pub table_catalog: Arc<TableCatalog>,
46 pub table_desc: Rc<TableDesc>,
53 pub indexes: Vec<Rc<IndexCatalog>>,
55 pub predicate: Condition,
57 pub as_of: Option<AsOf>,
62 pub table_cardinality: Cardinality,
64 #[educe(PartialEq(ignore))]
65 #[educe(Hash(ignore))]
66 pub ctx: OptimizerContextRef,
67}
68
69impl TableScan {
70 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
71 self.predicate = self.predicate.clone().rewrite_expr(r);
72 }
73
74 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
75 self.predicate.visit_expr(v);
76 }
77
78 pub fn distribution_key(&self) -> Option<Vec<usize>> {
85 let tb_idx_to_op_idx = self
86 .output_col_idx
87 .iter()
88 .enumerate()
89 .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
90 .collect::<HashMap<_, _>>();
91 self.table_desc
92 .distribution_key
93 .iter()
94 .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
95 .collect()
96 }
97
98 pub fn output_column_ids(&self) -> Vec<ColumnId> {
100 self.output_col_idx
101 .iter()
102 .map(|i| self.get_table_columns()[*i].column_id)
103 .collect()
104 }
105
106 pub fn primary_key(&self) -> &[ColumnOrder] {
107 &self.table_desc.pk
108 }
109
110 pub fn watermark_columns(&self) -> WatermarkColumns {
111 let mut watermark_columns = WatermarkColumns::new();
116 for idx in self.table_desc.watermark_columns.ones() {
117 watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
118 }
119 watermark_columns.map_clone(&self.i2o_col_mapping())
120 }
121
122 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
123 self.output_col_idx
124 .iter()
125 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
126 .collect()
127 }
128
129 pub(crate) fn column_names(&self) -> Vec<String> {
130 self.output_col_idx
131 .iter()
132 .map(|&i| self.get_table_columns()[i].name.clone())
133 .collect()
134 }
135
136 pub(crate) fn out_fields(&self) -> FixedBitSet {
137 let out_fields_vec = self.output_col_idx.clone();
138 FixedBitSet::from_iter(out_fields_vec)
139 }
140
141 pub(crate) fn order_names(&self) -> Vec<String> {
142 self.table_desc
143 .order_column_indices()
144 .iter()
145 .map(|&i| self.get_table_columns()[i].name.clone())
146 .collect()
147 }
148
149 pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
150 self.table_desc
151 .order_column_indices()
152 .iter()
153 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
154 .collect()
155 }
156
157 pub fn get_out_column_index_order(&self) -> Order {
160 let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping();
161 let order = Order::new(
162 self.table_desc
163 .pk
164 .iter()
165 .map(|order| {
166 let idx = id_to_tb_idx
167 .get(&self.table_desc.columns[order.column_index].column_id)
168 .unwrap();
169 ColumnOrder::new(*idx, order.order_type)
170 })
171 .collect(),
172 );
173 self.i2o_col_mapping().rewrite_provided_order(&order)
174 }
175
176 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
178 ColIndexMapping::with_remaining_columns(
179 &self.output_col_idx,
180 self.get_table_columns().len(),
181 )
182 }
183
184 pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
186 let mut ids = self.output_column_ids();
187 for column_order in self.primary_key() {
188 let id = self.get_table_columns()[column_order.column_index].column_id;
189 if !ids.contains(&id) {
190 ids.push(id);
191 }
192 }
193 ids
194 }
195
196 pub fn to_index_scan(
199 &self,
200 index_name: &str,
201 index_table_catalog: Arc<TableCatalog>,
202 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
203 function_mapping: &HashMap<FunctionCall, usize>,
204 ) -> Self {
205 let new_output_col_idx = self
206 .output_col_idx
207 .iter()
208 .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
209 .collect();
210
211 struct Rewriter<'a> {
212 primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
213 function_mapping: &'a HashMap<FunctionCall, usize>,
214 }
215 impl ExprRewriter for Rewriter<'_> {
216 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
217 InputRef::new(
218 *self
219 .primary_to_secondary_mapping
220 .get(&input_ref.index)
221 .unwrap(),
222 input_ref.return_type(),
223 )
224 .into()
225 }
226
227 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
228 if let Some(index) = self.function_mapping.get(&func_call) {
229 return InputRef::new(*index, func_call.return_type()).into();
230 }
231
232 let (func_type, inputs, ret) = func_call.decompose();
233 let inputs = inputs
234 .into_iter()
235 .map(|expr| self.rewrite_expr(expr))
236 .collect();
237 FunctionCall::new_unchecked(func_type, inputs, ret).into()
238 }
239 }
240 let mut rewriter = Rewriter {
241 primary_to_secondary_mapping,
242 function_mapping,
243 };
244
245 let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
246
247 Self::new(
248 index_name.to_owned(),
249 new_output_col_idx,
250 index_table_catalog,
251 vec![],
252 self.ctx.clone(),
253 new_predicate,
254 self.as_of.clone(),
255 self.table_cardinality,
256 )
257 }
258
259 #[allow(clippy::too_many_arguments)]
261 pub(crate) fn new(
262 table_name: String,
263 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
265 indexes: Vec<Rc<IndexCatalog>>,
266 ctx: OptimizerContextRef,
267 predicate: Condition, as_of: Option<AsOf>,
269 table_cardinality: Cardinality,
270 ) -> Self {
271 Self::new_inner(
272 table_name,
273 output_col_idx,
274 table_catalog,
275 indexes,
276 ctx,
277 predicate,
278 as_of,
279 table_cardinality,
280 )
281 }
282
283 #[allow(clippy::too_many_arguments)]
284 pub(crate) fn new_inner(
285 table_name: String,
286 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
288 indexes: Vec<Rc<IndexCatalog>>,
289 ctx: OptimizerContextRef,
290 predicate: Condition, as_of: Option<AsOf>,
292 table_cardinality: Cardinality,
293 ) -> Self {
294 let mut required_col_idx = output_col_idx.clone();
303 let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
304 predicate_col_idx.ones().for_each(|idx| {
305 if !required_col_idx.contains(&idx) {
306 required_col_idx.push(idx);
307 }
308 });
309
310 let table_desc = Rc::new(table_catalog.table_desc());
311
312 Self {
313 table_name,
314 required_col_idx,
315 output_col_idx,
316 table_catalog,
317 table_desc,
318 indexes,
319 predicate,
320 as_of,
321 table_cardinality,
322 ctx,
323 }
324 }
325
326 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
327 Pretty::Array(
328 match verbose {
329 true => self.column_names_with_table_prefix(),
330 false => self.column_names(),
331 }
332 .into_iter()
333 .map(Pretty::from)
334 .collect(),
335 )
336 }
337
338 pub(crate) fn fields_pretty_schema(&self) -> Schema {
339 let fields = self
340 .table_desc
341 .columns
342 .iter()
343 .map(|col| Field::from_with_table_name_prefix(col, &self.table_name))
344 .collect();
345 Schema { fields }
346 }
347}
348
349impl GenericPlanNode for TableScan {
350 fn schema(&self) -> Schema {
351 let fields = self
352 .output_col_idx
353 .iter()
354 .map(|tb_idx| {
355 let col = &self.get_table_columns()[*tb_idx];
356 Field::from_with_table_name_prefix(col, &self.table_name)
357 })
358 .collect();
359 Schema { fields }
360 }
361
362 fn stream_key(&self) -> Option<Vec<usize>> {
363 if matches!(self.table_catalog.table_type, TableType::Internal) {
364 return None;
365 }
366 let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc);
367 self.table_desc
368 .stream_key
369 .iter()
370 .map(|&c| {
371 id_to_op_idx
372 .get(&self.table_desc.columns[c].column_id)
373 .copied()
374 })
375 .collect::<Option<Vec<_>>>()
376 }
377
378 fn ctx(&self) -> OptimizerContextRef {
379 self.ctx.clone()
380 }
381
382 fn functional_dependency(&self) -> FunctionalDependencySet {
383 let pk_indices = self.stream_key();
384 let col_num = self.output_col_idx.len();
385 match &pk_indices {
386 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
387 None => FunctionalDependencySet::new(col_num),
388 }
389 }
390}
391
392impl TableScan {
393 pub fn get_table_columns(&self) -> &[ColumnDesc] {
394 &self.table_desc.columns
395 }
396
397 pub fn append_only(&self) -> bool {
398 self.table_desc.append_only
399 }
400
401 pub fn column_descs(&self) -> Vec<ColumnDesc> {
403 self.output_col_idx
404 .iter()
405 .map(|&i| self.get_table_columns()[i].clone())
406 .collect()
407 }
408
409 pub fn get_id_to_op_idx_mapping(
411 output_col_idx: &[usize],
412 table_desc: &Rc<TableDesc>,
413 ) -> HashMap<ColumnId, usize> {
414 let mut id_to_op_idx = HashMap::new();
415 output_col_idx
416 .iter()
417 .enumerate()
418 .for_each(|(op_idx, tb_idx)| {
419 let col = &table_desc.columns[*tb_idx];
420 id_to_op_idx.insert(col.column_id, op_idx);
421 });
422 id_to_op_idx
423 }
424}