risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::GenericPlanNode;
27use crate::TableCatalog;
28use crate::catalog::ColumnId;
29use crate::catalog::index_catalog::{TableIndex, VectorIndex};
30use crate::catalog::table_catalog::TableType;
31use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
32use crate::optimizer::optimizer_context::OptimizerContextRef;
33use crate::optimizer::property::{FunctionalDependencySet, Order, WatermarkColumns};
34use crate::utils::{ColIndexMappingRewriteExt, Condition};
35
36#[derive(Debug, Clone, Educe)]
38#[educe(PartialEq, Eq, Hash)]
39pub struct TableScan {
40 pub required_col_idx: Vec<usize>,
42 pub output_col_idx: Vec<usize>,
43 pub table_catalog: Arc<TableCatalog>,
45 pub table_indexes: Vec<Arc<TableIndex>>,
53 pub vector_indexes: Vec<Arc<VectorIndex>>,
54 table_scan_stream_key: Option<Vec<usize>>,
56 pub predicate: Condition,
58 pub as_of: Option<AsOf>,
63 #[educe(PartialEq(ignore))]
64 #[educe(Hash(ignore))]
65 pub ctx: OptimizerContextRef,
66}
67
68impl TableScan {
69 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
70 self.predicate = self.predicate.clone().rewrite_expr(r);
71 }
72
73 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
74 self.predicate.visit_expr(v);
75 }
76
77 pub fn table_name(&self) -> &str {
78 self.table_catalog.name()
79 }
80
81 pub fn distribution_key(&self) -> Option<Vec<usize>> {
88 let tb_idx_to_op_idx = self
89 .output_col_idx
90 .iter()
91 .enumerate()
92 .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
93 .collect::<HashMap<_, _>>();
94 self.table_catalog
95 .distribution_key
96 .iter()
97 .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
98 .collect()
99 }
100
101 pub fn output_column_ids(&self) -> Vec<ColumnId> {
103 self.output_col_idx
104 .iter()
105 .map(|i| self.get_table_columns()[*i].column_id)
106 .collect()
107 }
108
109 pub fn primary_key(&self) -> &[ColumnOrder] {
110 &self.table_catalog.pk
111 }
112
113 pub(crate) fn table_scan_stream_key(&self) -> Option<&[usize]> {
114 self.table_scan_stream_key.as_deref()
115 }
116
117 pub(crate) fn extend_table_scan_stream_key_with_primary_key(&mut self) {
118 let primary_key_indices = self
119 .primary_key()
120 .iter()
121 .map(|c| c.column_index)
122 .collect::<Vec<_>>();
123 let Some(table_scan_stream_key) = &mut self.table_scan_stream_key else {
124 #[cfg(debug_assertions)]
125 {
126 panic!(
127 "table scan stream key should exist before extending with primary key: table: {}, output_col_idx: {:?}, pk: {:?}",
128 self.table_name(),
129 self.output_col_idx,
130 primary_key_indices
131 );
132 }
133 #[cfg(not(debug_assertions))]
134 {
135 tracing::warn!(
136 table = self.table_name(),
137 output_col_idx = ?self.output_col_idx,
138 pk = ?primary_key_indices,
139 "table scan stream key should exist before extending with primary key"
140 );
141 return;
142 }
143 };
144
145 for primary_key in primary_key_indices {
146 if !table_scan_stream_key.contains(&primary_key) {
147 table_scan_stream_key.push(primary_key);
148 }
149 }
150 }
151
152 pub fn watermark_columns(&self) -> WatermarkColumns {
153 let mut watermark_columns = WatermarkColumns::new();
158 for idx in self.table_catalog.watermark_columns.ones() {
159 watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
160 }
161 watermark_columns.map_clone(&self.i2o_col_mapping())
162 }
163
164 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
165 self.output_col_idx
166 .iter()
167 .map(|&i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
168 .collect()
169 }
170
171 pub(crate) fn column_names(&self) -> Vec<String> {
172 self.output_col_idx
173 .iter()
174 .map(|&i| self.get_table_columns()[i].name.clone())
175 .collect()
176 }
177
178 pub(crate) fn out_fields(&self) -> FixedBitSet {
179 let out_fields_vec = self.output_col_idx.clone();
180 FixedBitSet::from_iter(out_fields_vec)
181 }
182
183 pub(crate) fn order_names(&self) -> Vec<String> {
184 self.table_catalog
185 .order_column_indices()
186 .map(|i| self.get_table_columns()[i].name.clone())
187 .collect()
188 }
189
190 pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
191 self.table_catalog
192 .order_column_indices()
193 .map(|i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
194 .collect()
195 }
196
197 pub fn get_out_column_index_order(&self) -> Order {
200 let id_to_tb_idx = self.table_catalog.get_id_to_op_idx_mapping();
201 let order = Order::new(
202 self.table_catalog
203 .pk
204 .iter()
205 .map(|order| {
206 let idx = id_to_tb_idx
207 .get(&self.table_catalog.columns[order.column_index].column_id)
208 .unwrap();
209 ColumnOrder::new(*idx, order.order_type)
210 })
211 .collect(),
212 );
213 self.i2o_col_mapping().rewrite_provided_order(&order)
214 }
215
216 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
218 ColIndexMapping::with_remaining_columns(
219 &self.output_col_idx,
220 self.get_table_columns().len(),
221 )
222 }
223
224 pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
226 let mut ids = self.output_column_ids();
227 for column_order in self.primary_key() {
228 let id = self.get_table_columns()[column_order.column_index].column_id;
229 if !ids.contains(&id) {
230 ids.push(id);
231 }
232 }
233 ids
234 }
235
236 pub fn to_index_scan(
239 &self,
240 index_table_catalog: Arc<TableCatalog>,
241 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
242 function_mapping: &HashMap<FunctionCall, usize>,
243 ) -> Self {
244 let new_output_col_idx = self
245 .output_col_idx
246 .iter()
247 .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
248 .collect();
249
250 struct Rewriter<'a> {
251 primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
252 function_mapping: &'a HashMap<FunctionCall, usize>,
253 }
254 impl ExprRewriter for Rewriter<'_> {
255 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
256 InputRef::new(
257 *self
258 .primary_to_secondary_mapping
259 .get(&input_ref.index)
260 .unwrap(),
261 input_ref.return_type(),
262 )
263 .into()
264 }
265
266 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
267 if let Some(index) = self.function_mapping.get(&func_call) {
268 return InputRef::new(*index, func_call.return_type()).into();
269 }
270
271 let (func_type, inputs, ret) = func_call.decompose();
272 let inputs = inputs
273 .into_iter()
274 .map(|expr| self.rewrite_expr(expr))
275 .collect();
276 FunctionCall::new_unchecked(func_type, inputs, ret).into()
277 }
278 }
279 let mut rewriter = Rewriter {
280 primary_to_secondary_mapping,
281 function_mapping,
282 };
283
284 let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
285
286 Self::new(
287 new_output_col_idx,
288 index_table_catalog,
289 vec![],
290 vec![],
291 self.ctx.clone(),
292 new_predicate,
293 self.as_of.clone(),
294 )
295 }
296
297 #[allow(clippy::too_many_arguments)]
299 pub(crate) fn new(
300 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
302 table_indexes: Vec<Arc<TableIndex>>,
303 vector_indexes: Vec<Arc<VectorIndex>>,
304 ctx: OptimizerContextRef,
305 predicate: Condition, as_of: Option<AsOf>,
307 ) -> Self {
308 let table_scan_stream_key = (!matches!(table_catalog.table_type, TableType::Internal))
309 .then(|| table_catalog.stream_key());
310 Self::new_inner(
311 output_col_idx,
312 table_catalog,
313 table_indexes,
314 vector_indexes,
315 ctx,
316 predicate,
317 as_of,
318 table_scan_stream_key,
319 )
320 }
321
322 pub(crate) fn new_inner(
323 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
325 table_indexes: Vec<Arc<TableIndex>>,
326 vector_indexes: Vec<Arc<VectorIndex>>,
327 ctx: OptimizerContextRef,
328 predicate: Condition, as_of: Option<AsOf>,
330 table_scan_stream_key: Option<Vec<usize>>,
331 ) -> Self {
332 let mut required_col_idx = output_col_idx.clone();
341 let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
342 predicate_col_idx.ones().for_each(|idx| {
343 if !required_col_idx.contains(&idx) {
344 required_col_idx.push(idx);
345 }
346 });
347
348 Self {
349 required_col_idx,
350 output_col_idx,
351 table_catalog,
352 table_indexes,
353 vector_indexes,
354 table_scan_stream_key,
355 predicate,
356 as_of,
357 ctx,
358 }
359 }
360
361 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
362 Pretty::Array(
363 match verbose {
364 true => self.column_names_with_table_prefix(),
365 false => self.column_names(),
366 }
367 .into_iter()
368 .map(Pretty::from)
369 .collect(),
370 )
371 }
372
373 pub(crate) fn fields_pretty_schema(&self) -> Schema {
374 let fields = self
375 .table_catalog
376 .columns
377 .iter()
378 .map(|col| Field::from_with_table_name_prefix(&col.column_desc, self.table_name()))
379 .collect();
380 Schema { fields }
381 }
382
383 pub(crate) fn cross_database(&self) -> bool {
385 self.table_catalog.database_id != self.ctx().session_ctx().database_id()
386 }
387}
388
389impl GenericPlanNode for TableScan {
390 fn schema(&self) -> Schema {
391 let fields = self
392 .output_col_idx
393 .iter()
394 .map(|tb_idx| {
395 let col = &self.get_table_columns()[*tb_idx];
396 Field::from_with_table_name_prefix(&col.column_desc, self.table_name())
397 })
398 .collect();
399 Schema { fields }
400 }
401
402 fn stream_key(&self) -> Option<Vec<usize>> {
403 let stream_key = self.table_scan_stream_key.as_deref()?;
404 let id_to_op_idx =
405 Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_catalog);
406 stream_key
407 .iter()
408 .map(|&c| {
409 id_to_op_idx
410 .get(&self.table_catalog.columns[c].column_id)
411 .copied()
412 })
413 .collect()
414 }
415
416 fn ctx(&self) -> OptimizerContextRef {
417 self.ctx.clone()
418 }
419
420 fn functional_dependency(&self) -> FunctionalDependencySet {
421 let pk_indices = self.stream_key();
422 let col_num = self.output_col_idx.len();
423 match &pk_indices {
424 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
425 None => FunctionalDependencySet::new(col_num),
426 }
427 }
428}
429
430impl TableScan {
431 pub fn get_table_columns(&self) -> &[ColumnCatalog] {
432 &self.table_catalog.columns
433 }
434
435 pub fn append_only(&self) -> bool {
436 self.table_catalog.append_only
437 }
438
439 pub fn column_descs(&self) -> Vec<ColumnDesc> {
441 self.output_col_idx
442 .iter()
443 .map(|&i| self.get_table_columns()[i].column_desc.clone())
444 .collect()
445 }
446
447 pub fn get_id_to_op_idx_mapping(
449 output_col_idx: &[usize],
450 table_catalog: &TableCatalog,
451 ) -> HashMap<ColumnId, usize> {
452 ColumnDesc::get_id_to_op_idx_mapping(&table_catalog.columns, Some(output_col_idx))
453 }
454}