1use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnDesc, Schema};
21use risingwave_common::util::sort_util::ColumnOrder;
22use risingwave_pb::stream_plan::StreamScanType;
23use risingwave_sqlparser::ast::AsOf;
24
25use super::generic::{GenericPlanNode, GenericPlanRef};
26use super::utils::{Distill, childless_record};
27use super::{
28 BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical,
29 LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamTableScan, ToBatch,
30 ToStream, generic,
31};
32use crate::TableCatalog;
33use crate::binder::BoundBaseTable;
34use crate::catalog::ColumnId;
35use crate::catalog::index_catalog::{IndexType, TableIndex};
36use crate::error::Result;
37use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
38use crate::optimizer::ApplyResult;
39use crate::optimizer::optimizer_context::OptimizerContextRef;
40use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
41use crate::optimizer::plan_node::plan_node_meta::AnyPlanNodeMeta;
42use crate::optimizer::plan_node::{
43 BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
44 PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
45};
46use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
47use crate::optimizer::rule::IndexSelectionRule;
48use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct LogicalScan {
53 pub base: PlanBase<Logical>,
54 core: generic::TableScan,
55}
56
57impl From<generic::TableScan> for LogicalScan {
58 fn from(core: generic::TableScan) -> Self {
59 let base = PlanBase::new_logical_with_core(&core);
60 Self { base, core }
61 }
62}
63
64impl From<generic::TableScan> for PlanRef {
65 fn from(core: generic::TableScan) -> Self {
66 LogicalScan::from(core).into()
67 }
68}
69
70impl GenericPlanRef for LogicalScan {
71 fn id(&self) -> PlanNodeId {
72 self.plan_base().id()
73 }
74
75 fn schema(&self) -> &Schema {
76 self.plan_base().schema()
77 }
78
79 fn stream_key(&self) -> Option<&[usize]> {
80 self.plan_base().stream_key()
81 }
82
83 fn ctx(&self) -> OptimizerContextRef {
84 self.plan_base().ctx()
85 }
86
87 fn functional_dependency(&self) -> &FunctionalDependencySet {
88 self.plan_base().functional_dependency()
89 }
90}
91
92impl LogicalScan {
93 pub fn create(
95 table_catalog: Arc<TableCatalog>,
96 ctx: OptimizerContextRef,
97 as_of: Option<AsOf>,
98 ) -> Self {
99 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
100 generic::TableScan::new(
101 output_col_idx,
102 table_catalog,
103 vec![],
104 ctx,
105 Condition::true_cond(),
106 as_of,
107 )
108 .into()
109 }
110
111 pub fn from_base_table(
112 base_table: &BoundBaseTable,
113 ctx: OptimizerContextRef,
114 as_of: Option<AsOf>,
115 ) -> Self {
116 let table_catalog = base_table.table_catalog.clone();
117 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
118 let mut table_indexes = vec![];
119 for index in &base_table.table_indexes {
120 match &index.index_type {
121 IndexType::Table(index) => table_indexes.push(index.clone()),
122 }
123 }
124 generic::TableScan::new(
125 output_col_idx,
126 table_catalog,
127 table_indexes,
128 ctx,
129 Condition::true_cond(),
130 as_of,
131 )
132 .into()
133 }
134
135 pub fn table_name(&self) -> &str {
136 &self.core.table_catalog.name
137 }
138
139 pub fn as_of(&self) -> Option<AsOf> {
140 self.core.as_of.clone()
141 }
142
143 pub fn table_cardinality(&self) -> Cardinality {
145 self.core.table_catalog.cardinality
146 }
147
148 pub fn table(&self) -> &Arc<TableCatalog> {
149 &self.core.table_catalog
150 }
151
152 pub fn column_descs(&self) -> Vec<ColumnDesc> {
154 self.core.column_descs()
155 }
156
157 pub fn output_column_ids(&self) -> Vec<ColumnId> {
159 self.core.output_column_ids()
160 }
161
162 pub fn table_indexes(&self) -> &[Arc<TableIndex>] {
164 &self.core.table_indexes
165 }
166
167 pub fn predicate(&self) -> &Condition {
169 &self.core.predicate
170 }
171
172 pub fn get_out_column_index_order(&self) -> Order {
175 self.core.get_out_column_index_order()
176 }
177
178 pub fn distribution_key(&self) -> Option<Vec<usize>> {
179 self.core.distribution_key()
180 }
181
182 pub fn watermark_columns(&self) -> WatermarkColumns {
183 self.core.watermark_columns()
184 }
185
186 pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Arc<TableIndex>> {
188 self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
189 .into_iter()
190 .map(|(index, _)| index)
191 .collect()
192 }
193
194 pub fn indexes_satisfy_order_with_prefix(
199 &self,
200 required_order: &Order,
201 prefix: &HashSet<ColumnOrder>,
202 ) -> Vec<(&Arc<TableIndex>, Order)> {
203 let output_col_map = self
204 .output_col_idx()
205 .iter()
206 .cloned()
207 .enumerate()
208 .map(|(id, col)| (col, id))
209 .collect::<BTreeMap<_, _>>();
210 let unmatched_idx = output_col_map.len();
211 let mut index_catalog_and_orders = vec![];
212 for index in self.table_indexes() {
213 let s2p_mapping = index.secondary_to_primary_mapping();
214 let index_orders: Vec<ColumnOrder> = index
215 .index_table
216 .pk()
217 .iter()
218 .map(|idx_item| {
219 let idx = match s2p_mapping.get(&idx_item.column_index) {
220 Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
221 None => unmatched_idx,
223 };
224 ColumnOrder::new(idx, idx_item.order_type)
225 })
226 .collect();
227
228 let mut index_orders_iter = index_orders.into_iter().peekable();
229
230 let fixed_prefix = {
232 let mut fixed_prefix = vec![];
233 loop {
234 match index_orders_iter.peek() {
235 Some(index_col_order) if prefix.contains(index_col_order) => {
236 let index_col_order = index_orders_iter.next().unwrap();
237 fixed_prefix.push(index_col_order);
238 }
239 _ => break,
240 }
241 }
242 Order {
243 column_orders: fixed_prefix,
244 }
245 };
246
247 let remaining_orders = Order {
248 column_orders: index_orders_iter.collect(),
249 };
250 if remaining_orders.satisfies(required_order) {
251 index_catalog_and_orders.push((index, fixed_prefix));
252 }
253 }
254 index_catalog_and_orders
255 }
256
257 pub fn to_index_scan_if_index_covered(&self, index: &Arc<TableIndex>) -> Option<LogicalScan> {
259 let p2s_mapping = index.primary_to_secondary_mapping();
260 if self
261 .required_col_idx()
262 .iter()
263 .all(|x| p2s_mapping.contains_key(x))
264 {
265 let index_scan = self.core.to_index_scan(
266 index.index_table.clone(),
267 p2s_mapping,
268 index.function_mapping(),
269 );
270 Some(index_scan.into())
271 } else {
272 None
273 }
274 }
275
276 pub fn primary_key(&self) -> &[ColumnOrder] {
277 self.core.primary_key()
278 }
279
280 fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
282 self.output_col_idx()
283 .iter()
284 .enumerate()
285 .map(|(i, &col_idx)| {
286 InputRef::new(
287 i,
288 self.table().columns[col_idx].column_desc.data_type.clone(),
289 )
290 .into()
291 })
292 .collect_vec()
293 }
294
295 pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
297 let mut predicate = self.predicate().clone();
298 if predicate.always_true() {
299 return (self.core.clone(), Condition::true_cond(), None);
300 }
301
302 let mut inverse_mapping = {
303 let mapping = ColIndexMapping::new(
304 self.required_col_idx().iter().map(|i| Some(*i)).collect(),
305 self.table().columns.len(),
306 );
307 let mut inverse_map = vec![None; mapping.target_size()];
309 for (src, dst) in mapping.mapping_pairs() {
310 inverse_map[dst] = Some(src);
311 }
312 ColIndexMapping::new(inverse_map, mapping.source_size())
313 };
314
315 predicate = predicate.rewrite_expr(&mut inverse_mapping);
316
317 let scan_without_predicate = generic::TableScan::new(
318 self.required_col_idx().to_vec(),
319 self.core.table_catalog.clone(),
320 self.table_indexes().to_vec(),
321 self.ctx(),
322 Condition::true_cond(),
323 self.as_of(),
324 );
325 let project_expr = if self.required_col_idx() != self.output_col_idx() {
326 Some(self.output_idx_to_input_ref())
327 } else {
328 None
329 };
330 (scan_without_predicate, predicate, project_expr)
331 }
332
333 fn clone_with_predicate(&self, predicate: Condition) -> Self {
334 generic::TableScan::new_inner(
335 self.output_col_idx().to_vec(),
336 self.table().clone(),
337 self.table_indexes().to_vec(),
338 self.base.ctx().clone(),
339 predicate,
340 self.as_of(),
341 )
342 .into()
343 }
344
345 pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
346 generic::TableScan::new_inner(
347 output_col_idx,
348 self.core.table_catalog.clone(),
349 self.table_indexes().to_vec(),
350 self.base.ctx().clone(),
351 self.predicate().clone(),
352 self.as_of(),
353 )
354 .into()
355 }
356
357 pub fn output_col_idx(&self) -> &Vec<usize> {
358 &self.core.output_col_idx
359 }
360
361 pub fn required_col_idx(&self) -> &Vec<usize> {
362 &self.core.required_col_idx
363 }
364}
365
366impl_plan_tree_node_for_leaf! { Logical, LogicalScan}
367
368impl Distill for LogicalScan {
369 fn distill<'a>(&self) -> XmlNode<'a> {
370 let verbose = self.base.ctx().is_explain_verbose();
371 let mut vec = Vec::with_capacity(5);
372 vec.push(("table", Pretty::from(self.table_name().to_owned())));
373 let key_is_columns =
374 self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
375 let key = if key_is_columns {
376 "columns"
377 } else {
378 "output_columns"
379 };
380 vec.push((key, self.core.columns_pretty(verbose)));
381 if !key_is_columns {
382 vec.push((
383 "required_columns",
384 Pretty::Array(
385 self.required_col_idx()
386 .iter()
387 .map(|i| {
388 let col_name = &self.table().columns[*i].name;
389 Pretty::from(if verbose {
390 format!("{}.{}", self.table_name(), col_name)
391 } else {
392 col_name.clone()
393 })
394 })
395 .collect(),
396 ),
397 ));
398 }
399
400 if !self.predicate().always_true() {
401 let input_schema = self.core.fields_pretty_schema();
402 vec.push((
403 "predicate",
404 Pretty::display(&ConditionDisplay {
405 condition: self.predicate(),
406 input_schema: &input_schema,
407 }),
408 ))
409 }
410
411 if self.table_cardinality() != Cardinality::unknown() {
412 vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
413 }
414
415 childless_record("LogicalScan", vec)
416 }
417}
418
419impl ColPrunable for LogicalScan {
420 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
421 let output_col_idx: Vec<usize> = required_cols
422 .iter()
423 .map(|i| self.required_col_idx()[*i])
424 .collect();
425 assert!(
426 output_col_idx
427 .iter()
428 .all(|i| self.output_col_idx().contains(i))
429 );
430
431 self.clone_with_output_indices(output_col_idx).into()
432 }
433}
434
435impl ExprRewritable<Logical> for LogicalScan {
436 fn has_rewritable_expr(&self) -> bool {
437 true
438 }
439
440 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
441 let mut core = self.core.clone();
442 core.rewrite_exprs(r);
443 Self {
444 base: self.base.clone_with_new_plan_id(),
445 core,
446 }
447 .into()
448 }
449}
450
451impl ExprVisitable for LogicalScan {
452 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
453 self.core.visit_exprs(v);
454 }
455}
456
457impl PredicatePushdown for LogicalScan {
458 fn predicate_pushdown(
459 &self,
460 mut predicate: Condition,
461 _ctx: &mut PredicatePushdownContext,
462 ) -> PlanRef {
463 struct HasCorrelated {
466 has: bool,
467 }
468 impl ExprVisitor for HasCorrelated {
469 fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
470 self.has = true;
471 }
472 }
473 let non_pushable_predicate: Vec<_> = predicate
474 .conjunctions
475 .extract_if(.., |expr| {
476 if expr.count_nows() > 0 {
477 true
478 } else {
479 let mut visitor = HasCorrelated { has: false };
480 visitor.visit_expr(expr);
481 visitor.has
482 }
483 })
484 .collect();
485 let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
486 self.output_col_idx().iter().map(|i| Some(*i)).collect(),
487 self.table().columns.len(),
488 ));
489 if non_pushable_predicate.is_empty() {
490 self.clone_with_predicate(predicate.and(self.predicate().clone()))
491 .into()
492 } else {
493 LogicalFilter::create(
494 self.clone_with_predicate(predicate.and(self.predicate().clone()))
495 .into(),
496 Condition {
497 conjunctions: non_pushable_predicate,
498 },
499 )
500 }
501 }
502}
503
504impl LogicalScan {
505 fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
506 if self.predicate().always_true() {
507 required_order
508 .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
509 } else {
510 let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
511 self.table(),
512 self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
513 )?;
514 let mut scan = self.clone();
515 scan.core.predicate = predicate; let plan: BatchPlanRef = if scan.core.predicate.always_false() {
518 LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
519 } else {
520 let (scan, predicate, project_expr) = scan.predicate_pull_up();
521
522 let mut plan: BatchPlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
523 if !predicate.always_true() {
524 plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
525 }
526 if let Some(exprs) = project_expr {
527 plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
528 }
529 plan
530 };
531
532 assert_eq!(plan.schema(), self.schema());
533 required_order.enforce_if_not_satisfies(plan)
534 }
535 }
536
537 fn use_index_scan_if_order_is_satisfied(
540 &self,
541 required_order: &Order,
542 ) -> Option<Result<BatchPlanRef>> {
543 if required_order.column_orders.is_empty() {
544 return None;
545 }
546
547 let order_satisfied_index = self.indexes_satisfy_order(required_order);
548 for index in order_satisfied_index {
549 if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
550 return Some(index_scan.to_batch());
551 }
552 }
553
554 None
555 }
556}
557
558impl ToBatch for LogicalScan {
559 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
560 self.to_batch_with_order_required(&Order::any())
561 }
562
563 fn to_batch_with_order_required(
564 &self,
565 required_order: &Order,
566 ) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
567 let new = self.clone_with_predicate(self.predicate().clone());
568
569 if !new.table_indexes().is_empty() {
570 let index_selection_rule = IndexSelectionRule::create();
571 if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
572 if let Some(scan) = applied.as_logical_scan() {
573 return required_order.enforce_if_not_satisfies(scan.to_batch()?);
575 } else if let Some(join) = applied.as_logical_join() {
576 return required_order
578 .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
579 } else {
580 unreachable!();
581 }
582 } else {
583 if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
585 return plan_ref;
586 }
587 }
588 }
589 new.to_batch_inner_with_required(required_order)
590 }
591}
592
593impl ToStream for LogicalScan {
594 fn to_stream(
595 &self,
596 ctx: &mut ToStreamContext,
597 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
598 if self.predicate().always_true() {
599 if self.core.table_catalog.database_id != self.base.ctx().session_ctx().database_id() {
601 Ok(StreamTableScan::new_with_stream_scan_type(
602 self.core.clone(),
603 StreamScanType::CrossDbSnapshotBackfill,
604 )
605 .into())
606 } else {
607 Ok(StreamTableScan::new_with_stream_scan_type(
608 self.core.clone(),
609 ctx.stream_scan_type(),
610 )
611 .into())
612 }
613 } else {
614 let (scan, predicate, project_expr) = self.predicate_pull_up();
615 let mut plan = LogicalFilter::create(scan.into(), predicate);
616 if let Some(exprs) = project_expr {
617 plan = LogicalProject::create(plan, exprs)
618 }
619 plan.to_stream(ctx)
620 }
621 }
622
623 fn logical_rewrite_for_stream(
624 &self,
625 _ctx: &mut RewriteStreamContext,
626 ) -> Result<(PlanRef, ColIndexMapping)> {
627 match self.base.stream_key().is_none() {
628 true => {
629 let mut col_ids = HashSet::new();
630
631 for &idx in self.output_col_idx() {
632 col_ids.insert(self.table().columns[idx].column_id);
633 }
634 let col_need_to_add = self
635 .table()
636 .pk
637 .iter()
638 .filter_map(|c| {
639 if !col_ids.contains(&self.table().columns[c.column_index].column_id) {
640 Some(c.column_index)
641 } else {
642 None
643 }
644 })
645 .collect_vec();
646
647 let mut output_col_idx = self.output_col_idx().clone();
648 output_col_idx.extend(col_need_to_add);
649 let new_len = output_col_idx.len();
650 Ok((
651 self.clone_with_output_indices(output_col_idx).into(),
652 ColIndexMapping::identity_or_none(self.schema().len(), new_len),
653 ))
654 }
655 false => Ok((
656 self.clone().into(),
657 ColIndexMapping::identity(self.schema().len()),
658 )),
659 }
660 }
661}