risingwave_frontend/optimizer/plan_node/generic/
table_scan.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use educe::Educe;
19use fixedbitset::FixedBitSet;
20use pretty_xmlish::Pretty;
21use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::GenericPlanNode;
27use crate::TableCatalog;
28use crate::catalog::ColumnId;
29use crate::catalog::index_catalog::TableIndex;
30use crate::catalog::table_catalog::TableType;
31use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef};
32use crate::optimizer::optimizer_context::OptimizerContextRef;
33use crate::optimizer::property::{FunctionalDependencySet, Order, WatermarkColumns};
34use crate::utils::{ColIndexMappingRewriteExt, Condition};
35
36#[derive(Debug, Clone, Educe)]
38#[educe(PartialEq, Eq, Hash)]
39pub struct TableScan {
40 pub required_col_idx: Vec<usize>,
42 pub output_col_idx: Vec<usize>,
43 pub table_catalog: Arc<TableCatalog>,
45 pub table_indexes: Vec<Arc<TableIndex>>,
53 pub predicate: Condition,
55 pub as_of: Option<AsOf>,
60 #[educe(PartialEq(ignore))]
61 #[educe(Hash(ignore))]
62 pub ctx: OptimizerContextRef,
63}
64
65impl TableScan {
66 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
67 self.predicate = self.predicate.clone().rewrite_expr(r);
68 }
69
70 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
71 self.predicate.visit_expr(v);
72 }
73
74 pub fn table_name(&self) -> &str {
75 self.table_catalog.name()
76 }
77
78 pub fn distribution_key(&self) -> Option<Vec<usize>> {
85 let tb_idx_to_op_idx = self
86 .output_col_idx
87 .iter()
88 .enumerate()
89 .map(|(op_idx, tb_idx)| (*tb_idx, op_idx))
90 .collect::<HashMap<_, _>>();
91 self.table_catalog
92 .distribution_key
93 .iter()
94 .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned())
95 .collect()
96 }
97
98 pub fn output_column_ids(&self) -> Vec<ColumnId> {
100 self.output_col_idx
101 .iter()
102 .map(|i| self.get_table_columns()[*i].column_id)
103 .collect()
104 }
105
106 pub fn primary_key(&self) -> &[ColumnOrder] {
107 &self.table_catalog.pk
108 }
109
110 pub fn watermark_columns(&self) -> WatermarkColumns {
111 let mut watermark_columns = WatermarkColumns::new();
116 for idx in self.table_catalog.watermark_columns.ones() {
117 watermark_columns.insert(idx, self.ctx.next_watermark_group_id());
118 }
119 watermark_columns.map_clone(&self.i2o_col_mapping())
120 }
121
122 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
123 self.output_col_idx
124 .iter()
125 .map(|&i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
126 .collect()
127 }
128
129 pub(crate) fn column_names(&self) -> Vec<String> {
130 self.output_col_idx
131 .iter()
132 .map(|&i| self.get_table_columns()[i].name.clone())
133 .collect()
134 }
135
136 pub(crate) fn out_fields(&self) -> FixedBitSet {
137 let out_fields_vec = self.output_col_idx.clone();
138 FixedBitSet::from_iter(out_fields_vec)
139 }
140
141 pub(crate) fn order_names(&self) -> Vec<String> {
142 self.table_catalog
143 .order_column_indices()
144 .map(|i| self.get_table_columns()[i].name.clone())
145 .collect()
146 }
147
148 pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
149 self.table_catalog
150 .order_column_indices()
151 .map(|i| format!("{}.{}", self.table_name(), self.get_table_columns()[i].name))
152 .collect()
153 }
154
155 pub fn get_out_column_index_order(&self) -> Order {
158 let id_to_tb_idx = self.table_catalog.get_id_to_op_idx_mapping();
159 let order = Order::new(
160 self.table_catalog
161 .pk
162 .iter()
163 .map(|order| {
164 let idx = id_to_tb_idx
165 .get(&self.table_catalog.columns[order.column_index].column_id)
166 .unwrap();
167 ColumnOrder::new(*idx, order.order_type)
168 })
169 .collect(),
170 );
171 self.i2o_col_mapping().rewrite_provided_order(&order)
172 }
173
174 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
176 ColIndexMapping::with_remaining_columns(
177 &self.output_col_idx,
178 self.get_table_columns().len(),
179 )
180 }
181
182 pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
184 let mut ids = self.output_column_ids();
185 for column_order in self.primary_key() {
186 let id = self.get_table_columns()[column_order.column_index].column_id;
187 if !ids.contains(&id) {
188 ids.push(id);
189 }
190 }
191 ids
192 }
193
194 pub fn to_index_scan(
197 &self,
198 index_table_catalog: Arc<TableCatalog>,
199 primary_to_secondary_mapping: &BTreeMap<usize, usize>,
200 function_mapping: &HashMap<FunctionCall, usize>,
201 ) -> Self {
202 let new_output_col_idx = self
203 .output_col_idx
204 .iter()
205 .map(|col_idx| *primary_to_secondary_mapping.get(col_idx).unwrap())
206 .collect();
207
208 struct Rewriter<'a> {
209 primary_to_secondary_mapping: &'a BTreeMap<usize, usize>,
210 function_mapping: &'a HashMap<FunctionCall, usize>,
211 }
212 impl ExprRewriter for Rewriter<'_> {
213 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
214 InputRef::new(
215 *self
216 .primary_to_secondary_mapping
217 .get(&input_ref.index)
218 .unwrap(),
219 input_ref.return_type(),
220 )
221 .into()
222 }
223
224 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
225 if let Some(index) = self.function_mapping.get(&func_call) {
226 return InputRef::new(*index, func_call.return_type()).into();
227 }
228
229 let (func_type, inputs, ret) = func_call.decompose();
230 let inputs = inputs
231 .into_iter()
232 .map(|expr| self.rewrite_expr(expr))
233 .collect();
234 FunctionCall::new_unchecked(func_type, inputs, ret).into()
235 }
236 }
237 let mut rewriter = Rewriter {
238 primary_to_secondary_mapping,
239 function_mapping,
240 };
241
242 let new_predicate = self.predicate.clone().rewrite_expr(&mut rewriter);
243
244 Self::new(
245 new_output_col_idx,
246 index_table_catalog,
247 vec![],
248 self.ctx.clone(),
249 new_predicate,
250 self.as_of.clone(),
251 )
252 }
253
254 #[allow(clippy::too_many_arguments)]
256 pub(crate) fn new(
257 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
259 table_indexes: Vec<Arc<TableIndex>>,
260 ctx: OptimizerContextRef,
261 predicate: Condition, as_of: Option<AsOf>,
263 ) -> Self {
264 Self::new_inner(
265 output_col_idx,
266 table_catalog,
267 table_indexes,
268 ctx,
269 predicate,
270 as_of,
271 )
272 }
273
274 pub(crate) fn new_inner(
275 output_col_idx: Vec<usize>, table_catalog: Arc<TableCatalog>,
277 table_indexes: Vec<Arc<TableIndex>>,
278 ctx: OptimizerContextRef,
279 predicate: Condition, as_of: Option<AsOf>,
281 ) -> Self {
282 let mut required_col_idx = output_col_idx.clone();
291 let predicate_col_idx = predicate.collect_input_refs(table_catalog.columns().len());
292 predicate_col_idx.ones().for_each(|idx| {
293 if !required_col_idx.contains(&idx) {
294 required_col_idx.push(idx);
295 }
296 });
297
298 Self {
299 required_col_idx,
300 output_col_idx,
301 table_catalog,
302 table_indexes,
303 predicate,
304 as_of,
305 ctx,
306 }
307 }
308
309 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
310 Pretty::Array(
311 match verbose {
312 true => self.column_names_with_table_prefix(),
313 false => self.column_names(),
314 }
315 .into_iter()
316 .map(Pretty::from)
317 .collect(),
318 )
319 }
320
321 pub(crate) fn fields_pretty_schema(&self) -> Schema {
322 let fields = self
323 .table_catalog
324 .columns
325 .iter()
326 .map(|col| Field::from_with_table_name_prefix(&col.column_desc, self.table_name()))
327 .collect();
328 Schema { fields }
329 }
330}
331
332impl GenericPlanNode for TableScan {
333 fn schema(&self) -> Schema {
334 let fields = self
335 .output_col_idx
336 .iter()
337 .map(|tb_idx| {
338 let col = &self.get_table_columns()[*tb_idx];
339 Field::from_with_table_name_prefix(&col.column_desc, self.table_name())
340 })
341 .collect();
342 Schema { fields }
343 }
344
345 fn stream_key(&self) -> Option<Vec<usize>> {
346 if matches!(self.table_catalog.table_type, TableType::Internal) {
347 return None;
348 }
349 let id_to_op_idx =
350 Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_catalog);
351 self.table_catalog
352 .stream_key
353 .iter()
354 .map(|&c| {
355 id_to_op_idx
356 .get(&self.table_catalog.columns[c].column_id)
357 .copied()
358 })
359 .collect::<Option<Vec<_>>>()
360 }
361
362 fn ctx(&self) -> OptimizerContextRef {
363 self.ctx.clone()
364 }
365
366 fn functional_dependency(&self) -> FunctionalDependencySet {
367 let pk_indices = self.stream_key();
368 let col_num = self.output_col_idx.len();
369 match &pk_indices {
370 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
371 None => FunctionalDependencySet::new(col_num),
372 }
373 }
374}
375
376impl TableScan {
377 pub fn get_table_columns(&self) -> &[ColumnCatalog] {
378 &self.table_catalog.columns
379 }
380
381 pub fn append_only(&self) -> bool {
382 self.table_catalog.append_only
383 }
384
385 pub fn column_descs(&self) -> Vec<ColumnDesc> {
387 self.output_col_idx
388 .iter()
389 .map(|&i| self.get_table_columns()[i].column_desc.clone())
390 .collect()
391 }
392
393 pub fn get_id_to_op_idx_mapping(
395 output_col_idx: &[usize],
396 table_catalog: &TableCatalog,
397 ) -> HashMap<ColumnId, usize> {
398 ColumnDesc::get_id_to_op_idx_mapping(&table_catalog.columns, Some(output_col_idx))
399 }
400}