risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs
1use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use educe::Educe;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::GenericPlanNode;
27use crate::TableCatalog;
28use crate::catalog::table_catalog::TableType;
29use crate::catalog::{ColumnId, IndexCatalog};
30use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
31use crate::optimizer::optimizer_context::OptimizerContextRef;
32use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
33use crate::utils::{ColIndexMappingRewriteExt, Condition};
34
35#[derive(Debug, Clone, Educe)]
37#[educe(PartialEq, Eq, Hash)]
38pub struct TableScan {
39 pub table_name: String,
40 pub required_col_idx: Vec<usize>,
42 pub output_col_idx: Vec<usize>,
43 pub table_catalog: Arc<TableCatalog>,
45 pub table_desc: Rc<TableDesc>,
52 pub indexes: Vec<Rc<IndexCatalog>>,
54 pub predicate: Condition,
56 pub as_of: Option<AsOf>,
61 pub table_cardinality: Cardinality,
63 #[educe(PartialEq(ignore))]
64 #[educe(Hash(ignore))]
65 pub ctx: OptimizerContextRef,
66}
67
68impl TableScan {
69 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
70 self.predicate = self.predicate.clone().rewrite_expr(r);
71 }
72
73 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
74 self.predicate.visit_expr(v);
75 }
76
77 pub fn distribution_key(&self) -> Option<Vec<usize>> {
84 let tb_idx_to_op_idx = self
85 .output_col_idx
86 .iter()
87 .enumerate()
88 .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
89 .collect::<HashMap<_, _>>();
90 self.table_desc
91 .distribution_key
92 .iter()
93 .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
94 .collect()
95 }
96
97 pub fn output_column_ids(&self) -> Vec<ColumnId> {
99 self.output_col_idx
100 .iter()
101 .map(|i| self.get_table_columns()[*i].column_id)
102 .collect()
103 }
104
105 pub fn primary_key(&self) -> &[ColumnOrder] {
106 &self.table_desc.pk
107 }
108
109 pub fn watermark_columns(&self) -> WatermarkColumns {
110 let mut watermark_columns = WatermarkColumns::new();
115 for idx in self.table_desc.watermark_columns.ones() {
116 watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
117 }
118 watermark_columns.map_clone(&self.i2o_col_mapping())
119 }
120
121 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
122 self.output_col_idx
123 .iter()
124 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
125 .collect()
126 }
127
128 pub(crate) fn column_names(&self) -> Vec<String> {
129 self.output_col_idx
130 .iter()
131 .map(|&i| self.get_table_columns()[i].name.clone())
132 .collect()
133 }
134
135 pub(crate) fn order_names(&self) -> Vec<String> {
136 self.table_desc
137 .order_column_indices()
138 .iter()
139 .map(|&i| self.get_table_columns()[i].name.clone())
140 .collect()
141 }
142
143 pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
144 self.table_desc
145 .order_column_indices()
146 .iter()
147 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
148 .collect()
149 }
150
151 pub fn get_out_column_index_order(&self) -> Order {
154 let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping();
155 let order = Order::new(
156 self.table_desc
157 .pk
158 .iter()
159 .map(|order| {
160 let idx = id_to_tb_idx
161 .get(&self.table_desc.columns[order.column_index].column_id)
162 .unwrap();
163 ColumnOrder::new(*idx, order.order_type)
164 })
165 .collect(),
166 );
167 self.i2o_col_mapping().rewrite_provided_order(&order)
168 }
169
170 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
172 ColIndexMapping::with_remaining_columns(
173 &self.output_col_idx,
174 self.get_table_columns().len(),
175 )
176 }
177
178 pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
180 let mut ids = self.output_column_ids();
181 for column_order in self.primary_key() {
182 let id = self.get_table_columns()[column_order.column_index].column_id;
183 if !ids.contains(&id) {
184 ids.push(id);
185 }
186 }
187 ids
188 }
189
190 pub fn to_index_scan(
193 &self,
194 index_name: &str,
195 index_table_catalog: Arc<TableCatalog>,
196 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
197 function_mapping: &HashMap<FunctionCall, usize>,
198 ) -> Self {
199 let new_output_col_idx = self
200 .output_col_idx
201 .iter()
202 .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
203 .collect();
204
205 struct Rewriter<'a> {
206 primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
207 function_mapping: &'a HashMap<FunctionCall, usize>,
208 }
209 impl ExprRewriter for Rewriter<'_> {
210 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
211 InputRef::new(
212 *self
213 .primary_to_secondary_mapping
214 .get(&input_ref.index)
215 .unwrap(),
216 input_ref.return_type(),
217 )
218 .into()
219 }
220
221 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
222 if let Some(index) = self.function_mapping.get(&func_call) {
223 return InputRef::new(*index, func_call.return_type()).into();
224 }
225
226 let (func_type, inputs, ret) = func_call.decompose();
227 let inputs = inputs
228 .into_iter()
229 .map(|expr| self.rewrite_expr(expr))
230 .collect();
231 FunctionCall::new_unchecked(func_type, inputs, ret).into()
232 }
233 }
234 let mut rewriter = Rewriter {
235 primary_to_secondary_mapping,
236 function_mapping,
237 };
238
239 let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
240
241 Self::new(
242 index_name.to_owned(),
243 new_output_col_idx,
244 index_table_catalog,
245 vec![],
246 self.ctx.clone(),
247 new_predicate,
248 self.as_of.clone(),
249 self.table_cardinality,
250 )
251 }
252
253 #[allow(clippy::too_many_arguments)]
255 pub(crate) fn new(
256 table_name: String,
257 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
259 indexes: Vec<Rc<IndexCatalog>>,
260 ctx: OptimizerContextRef,
261 predicate: Condition, as_of: Option<AsOf>,
263 table_cardinality: Cardinality,
264 ) -> Self {
265 Self::new_inner(
266 table_name,
267 output_col_idx,
268 table_catalog,
269 indexes,
270 ctx,
271 predicate,
272 as_of,
273 table_cardinality,
274 )
275 }
276
277 #[allow(clippy::too_many_arguments)]
278 pub(crate) fn new_inner(
279 table_name: String,
280 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
282 indexes: Vec<Rc<IndexCatalog>>,
283 ctx: OptimizerContextRef,
284 predicate: Condition, as_of: Option<AsOf>,
286 table_cardinality: Cardinality,
287 ) -> Self {
288 let mut required_col_idx = output_col_idx.clone();
297 let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
298 predicate_col_idx.ones().for_each(|idx| {
299 if !required_col_idx.contains(&idx) {
300 required_col_idx.push(idx);
301 }
302 });
303
304 let table_desc = Rc::new(table_catalog.table_desc());
305
306 Self {
307 table_name,
308 required_col_idx,
309 output_col_idx,
310 table_catalog,
311 table_desc,
312 indexes,
313 predicate,
314 as_of,
315 table_cardinality,
316 ctx,
317 }
318 }
319
320 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
321 Pretty::Array(
322 match verbose {
323 true => self.column_names_with_table_prefix(),
324 false => self.column_names(),
325 }
326 .into_iter()
327 .map(Pretty::from)
328 .collect(),
329 )
330 }
331
332 pub(crate) fn fields_pretty_schema(&self) -> Schema {
333 let fields = self
334 .table_desc
335 .columns
336 .iter()
337 .map(|col| Field::from_with_table_name_prefix(col, &self.table_name))
338 .collect();
339 Schema { fields }
340 }
341}
342
343impl GenericPlanNode for TableScan {
344 fn schema(&self) -> Schema {
345 let fields = self
346 .output_col_idx
347 .iter()
348 .map(|tb_idx| {
349 let col = &self.get_table_columns()[*tb_idx];
350 Field::from_with_table_name_prefix(col, &self.table_name)
351 })
352 .collect();
353 Schema { fields }
354 }
355
356 fn stream_key(&self) -> Option<Vec<usize>> {
357 if matches!(self.table_catalog.table_type, TableType::Internal) {
358 return None;
359 }
360 let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc);
361 self.table_desc
362 .stream_key
363 .iter()
364 .map(|&c| {
365 id_to_op_idx
366 .get(&self.table_desc.columns[c].column_id)
367 .copied()
368 })
369 .collect::<Option<Vec<_>>>()
370 }
371
372 fn ctx(&self) -> OptimizerContextRef {
373 self.ctx.clone()
374 }
375
376 fn functional_dependency(&self) -> FunctionalDependencySet {
377 let pk_indices = self.stream_key();
378 let col_num = self.output_col_idx.len();
379 match &pk_indices {
380 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
381 None => FunctionalDependencySet::new(col_num),
382 }
383 }
384}
385
386impl TableScan {
387 pub fn get_table_columns(&self) -> &[ColumnDesc] {
388 &self.table_desc.columns
389 }
390
391 pub fn append_only(&self) -> bool {
392 self.table_desc.append_only
393 }
394
395 pub fn column_descs(&self) -> Vec<ColumnDesc> {
397 self.output_col_idx
398 .iter()
399 .map(|&i| self.get_table_columns()[i].clone())
400 .collect()
401 }
402
403 pub fn get_id_to_op_idx_mapping(
405 output_col_idx: &[usize],
406 table_desc: &Rc<TableDesc>,
407 ) -> HashMap<ColumnId, usize> {
408 let mut id_to_op_idx = HashMap::new();
409 output_col_idx
410 .iter()
411 .enumerate()
412 .for_each(|(op_idx, tb_idx)| {
413 let col = &table_desc.columns[*tb_idx];
414 id_to_op_idx.insert(col.column_id, op_idx);
415 });
416 id_to_op_idx
417 }
418}