risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::GenericPlanNode;
27use crate::TableCatalog;
28use crate::catalog::ColumnId;
29use crate::catalog::index_catalog::{TableIndex, VectorIndex};
30use crate::catalog::table_catalog::TableType;
31use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
32use crate::optimizer::optimizer_context::OptimizerContextRef;
33use crate::optimizer::property::{FunctionalDependencySet, Order, WatermarkColumns};
34use crate::utils::{ColIndexMappingRewriteExt, Condition};
35
36#[derive(Debug, Clone, Educe)]
38#[educe(PartialEq, Eq, Hash)]
39pub struct TableScan {
40 pub required_col_idx: Vec<usize>,
42 pub output_col_idx: Vec<usize>,
43 pub table_catalog: Arc<TableCatalog>,
45 pub table_indexes: Vec<Arc<TableIndex>>,
53 pub vector_indexes: Vec<Arc<VectorIndex>>,
54 pub predicate: Condition,
56 pub as_of: Option<AsOf>,
61 #[educe(PartialEq(ignore))]
62 #[educe(Hash(ignore))]
63 pub ctx: OptimizerContextRef,
64}
65
66impl TableScan {
67 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
68 self.predicate = self.predicate.clone().rewrite_expr(r);
69 }
70
71 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
72 self.predicate.visit_expr(v);
73 }
74
75 pub fn table_name(&self) -> &str {
76 self.table_catalog.name()
77 }
78
79 pub fn distribution_key(&self) -> Option<Vec<usize>> {
86 let tb_idx_to_op_idx = self
87 .output_col_idx
88 .iter()
89 .enumerate()
90 .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
91 .collect::<HashMap<_, _>>();
92 self.table_catalog
93 .distribution_key
94 .iter()
95 .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
96 .collect()
97 }
98
99 pub fn output_column_ids(&self) -> Vec<ColumnId> {
101 self.output_col_idx
102 .iter()
103 .map(|i| self.get_table_columns()[*i].column_id)
104 .collect()
105 }
106
107 pub fn primary_key(&self) -> &[ColumnOrder] {
108 &self.table_catalog.pk
109 }
110
111 pub fn watermark_columns(&self) -> WatermarkColumns {
112 let mut watermark_columns = WatermarkColumns::new();
117 for idx in self.table_catalog.watermark_columns.ones() {
118 watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
119 }
120 watermark_columns.map_clone(&self.i2o_col_mapping())
121 }
122
123 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
124 self.output_col_idx
125 .iter()
126 .map(|&i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
127 .collect()
128 }
129
130 pub(crate) fn column_names(&self) -> Vec<String> {
131 self.output_col_idx
132 .iter()
133 .map(|&i| self.get_table_columns()[i].name.clone())
134 .collect()
135 }
136
137 pub(crate) fn out_fields(&self) -> FixedBitSet {
138 let out_fields_vec = self.output_col_idx.clone();
139 FixedBitSet::from_iter(out_fields_vec)
140 }
141
142 pub(crate) fn order_names(&self) -> Vec<String> {
143 self.table_catalog
144 .order_column_indices()
145 .map(|i| self.get_table_columns()[i].name.clone())
146 .collect()
147 }
148
149 pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
150 self.table_catalog
151 .order_column_indices()
152 .map(|i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
153 .collect()
154 }
155
156 pub fn get_out_column_index_order(&self) -> Order {
159 let id_to_tb_idx = self.table_catalog.get_id_to_op_idx_mapping();
160 let order = Order::new(
161 self.table_catalog
162 .pk
163 .iter()
164 .map(|order| {
165 let idx = id_to_tb_idx
166 .get(&self.table_catalog.columns[order.column_index].column_id)
167 .unwrap();
168 ColumnOrder::new(*idx, order.order_type)
169 })
170 .collect(),
171 );
172 self.i2o_col_mapping().rewrite_provided_order(&order)
173 }
174
175 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
177 ColIndexMapping::with_remaining_columns(
178 &self.output_col_idx,
179 self.get_table_columns().len(),
180 )
181 }
182
183 pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
185 let mut ids = self.output_column_ids();
186 for column_order in self.primary_key() {
187 let id = self.get_table_columns()[column_order.column_index].column_id;
188 if !ids.contains(&id) {
189 ids.push(id);
190 }
191 }
192 ids
193 }
194
195 pub fn to_index_scan(
198 &self,
199 index_table_catalog: Arc<TableCatalog>,
200 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
201 function_mapping: &HashMap<FunctionCall, usize>,
202 ) -> Self {
203 let new_output_col_idx = self
204 .output_col_idx
205 .iter()
206 .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
207 .collect();
208
209 struct Rewriter<'a> {
210 primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
211 function_mapping: &'a HashMap<FunctionCall, usize>,
212 }
213 impl ExprRewriter for Rewriter<'_> {
214 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
215 InputRef::new(
216 *self
217 .primary_to_secondary_mapping
218 .get(&input_ref.index)
219 .unwrap(),
220 input_ref.return_type(),
221 )
222 .into()
223 }
224
225 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
226 if let Some(index) = self.function_mapping.get(&func_call) {
227 return InputRef::new(*index, func_call.return_type()).into();
228 }
229
230 let (func_type, inputs, ret) = func_call.decompose();
231 let inputs = inputs
232 .into_iter()
233 .map(|expr| self.rewrite_expr(expr))
234 .collect();
235 FunctionCall::new_unchecked(func_type, inputs, ret).into()
236 }
237 }
238 let mut rewriter = Rewriter {
239 primary_to_secondary_mapping,
240 function_mapping,
241 };
242
243 let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
244
245 Self::new(
246 new_output_col_idx,
247 index_table_catalog,
248 vec![],
249 vec![],
250 self.ctx.clone(),
251 new_predicate,
252 self.as_of.clone(),
253 )
254 }
255
256 #[allow(clippy::too_many_arguments)]
258 pub(crate) fn new(
259 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
261 table_indexes: Vec<Arc<TableIndex>>,
262 vector_indexes: Vec<Arc<VectorIndex>>,
263 ctx: OptimizerContextRef,
264 predicate: Condition, as_of: Option<AsOf>,
266 ) -> Self {
267 Self::new_inner(
268 output_col_idx,
269 table_catalog,
270 table_indexes,
271 vector_indexes,
272 ctx,
273 predicate,
274 as_of,
275 )
276 }
277
278 pub(crate) fn new_inner(
279 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
281 table_indexes: Vec<Arc<TableIndex>>,
282 vector_indexes: Vec<Arc<VectorIndex>>,
283 ctx: OptimizerContextRef,
284 predicate: Condition, as_of: Option<AsOf>,
286 ) -> Self {
287 let mut required_col_idx = output_col_idx.clone();
296 let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
297 predicate_col_idx.ones().for_each(|idx| {
298 if !required_col_idx.contains(&idx) {
299 required_col_idx.push(idx);
300 }
301 });
302
303 Self {
304 required_col_idx,
305 output_col_idx,
306 table_catalog,
307 table_indexes,
308 vector_indexes,
309 predicate,
310 as_of,
311 ctx,
312 }
313 }
314
315 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
316 Pretty::Array(
317 match verbose {
318 true => self.column_names_with_table_prefix(),
319 false => self.column_names(),
320 }
321 .into_iter()
322 .map(Pretty::from)
323 .collect(),
324 )
325 }
326
327 pub(crate) fn fields_pretty_schema(&self) -> Schema {
328 let fields = self
329 .table_catalog
330 .columns
331 .iter()
332 .map(|col| Field::from_with_table_name_prefix(&col.column_desc, self.table_name()))
333 .collect();
334 Schema { fields }
335 }
336
337 pub(crate) fn cross_database(&self) -> bool {
339 self.table_catalog.database_id != self.ctx().session_ctx().database_id()
340 }
341}
342
343impl GenericPlanNode for TableScan {
344 fn schema(&self) -> Schema {
345 let fields = self
346 .output_col_idx
347 .iter()
348 .map(|tb_idx| {
349 let col = &self.get_table_columns()[*tb_idx];
350 Field::from_with_table_name_prefix(&col.column_desc, self.table_name())
351 })
352 .collect();
353 Schema { fields }
354 }
355
356 fn stream_key(&self) -> Option<Vec<usize>> {
357 if matches!(self.table_catalog.table_type, TableType::Internal) {
358 return None;
359 }
360 let id_to_op_idx =
361 Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_catalog);
362 self.table_catalog
363 .stream_key
364 .iter()
365 .map(|&c| {
366 id_to_op_idx
367 .get(&self.table_catalog.columns[c].column_id)
368 .copied()
369 })
370 .collect::<Option<Vec<_>>>()
371 }
372
373 fn ctx(&self) -> OptimizerContextRef {
374 self.ctx.clone()
375 }
376
377 fn functional_dependency(&self) -> FunctionalDependencySet {
378 let pk_indices = self.stream_key();
379 let col_num = self.output_col_idx.len();
380 match &pk_indices {
381 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
382 None => FunctionalDependencySet::new(col_num),
383 }
384 }
385}
386
387impl TableScan {
388 pub fn get_table_columns(&self) -> &[ColumnCatalog] {
389 &self.table_catalog.columns
390 }
391
392 pub fn append_only(&self) -> bool {
393 self.table_catalog.append_only
394 }
395
396 pub fn column_descs(&self) -> Vec<ColumnDesc> {
398 self.output_col_idx
399 .iter()
400 .map(|&i| self.get_table_columns()[i].column_desc.clone())
401 .collect()
402 }
403
404 pub fn get_id_to_op_idx_mapping(
406 output_col_idx: &[usize],
407 table_catalog: &TableCatalog,
408 ) -> HashMap<ColumnId, usize> {
409 ColumnDesc::get_id_to_op_idx_mapping(&table_catalog.columns, Some(output_col_idx))
410 }
411}