1use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnDesc, Schema};
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::AsOf;
23
24use super::generic::{GenericPlanNode, GenericPlanRef};
25use super::utils::{Distill, childless_record};
26use super::{
27 BackfillType, BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical,
28 LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamTableScan, ToBatch,
29 ToStream, generic,
30};
31use crate::TableCatalog;
32use crate::binder::BoundBaseTable;
33use crate::catalog::ColumnId;
34use crate::catalog::index_catalog::{IndexType, TableIndex, VectorIndex};
35use crate::error::{ErrorCode, Result};
36use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
37use crate::optimizer::ApplyResult;
38use crate::optimizer::optimizer_context::OptimizerContextRef;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::plan_node_meta::AnyPlanNodeMeta;
41use crate::optimizer::plan_node::{
42 BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
43 PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
44};
45use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
46use crate::optimizer::rule::IndexSelectionRule;
47use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct LogicalScan {
52 pub base: PlanBase<Logical>,
53 core: generic::TableScan,
54}
55
56impl From<generic::TableScan> for LogicalScan {
57 fn from(core: generic::TableScan) -> Self {
58 let base = PlanBase::new_logical_with_core(&core);
59 Self { base, core }
60 }
61}
62
63impl From<generic::TableScan> for PlanRef {
64 fn from(core: generic::TableScan) -> Self {
65 LogicalScan::from(core).into()
66 }
67}
68
69impl GenericPlanRef for LogicalScan {
70 fn id(&self) -> PlanNodeId {
71 self.plan_base().id()
72 }
73
74 fn schema(&self) -> &Schema {
75 self.plan_base().schema()
76 }
77
78 fn stream_key(&self) -> Option<&[usize]> {
79 self.plan_base().stream_key()
80 }
81
82 fn ctx(&self) -> OptimizerContextRef {
83 self.plan_base().ctx()
84 }
85
86 fn functional_dependency(&self) -> &FunctionalDependencySet {
87 self.plan_base().functional_dependency()
88 }
89}
90
91impl LogicalScan {
92 pub fn create(
94 table_catalog: Arc<TableCatalog>,
95 ctx: OptimizerContextRef,
96 as_of: Option<AsOf>,
97 ) -> Self {
98 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
99 generic::TableScan::new(
100 output_col_idx,
101 table_catalog,
102 vec![],
103 vec![],
104 ctx,
105 Condition::true_cond(),
106 as_of,
107 )
108 .into()
109 }
110
111 pub fn from_base_table(
112 base_table: &BoundBaseTable,
113 ctx: OptimizerContextRef,
114 as_of: Option<AsOf>,
115 ) -> Self {
116 let table_catalog = base_table.table_catalog.clone();
117 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
118 let mut table_indexes = vec![];
119 let mut vector_indexes = vec![];
120 for index in &base_table.table_indexes {
121 match &index.index_type {
122 IndexType::Table(index) => table_indexes.push(index.clone()),
123 IndexType::Vector(index) => vector_indexes.push(index.clone()),
124 }
125 }
126 generic::TableScan::new(
127 output_col_idx,
128 table_catalog,
129 table_indexes,
130 vector_indexes,
131 ctx,
132 Condition::true_cond(),
133 as_of,
134 )
135 .into()
136 }
137
138 pub fn table_name(&self) -> &str {
139 &self.core.table_catalog.name
140 }
141
142 pub fn as_of(&self) -> Option<AsOf> {
143 self.core.as_of.clone()
144 }
145
146 pub fn table_cardinality(&self) -> Cardinality {
148 self.core.table_catalog.cardinality
149 }
150
151 pub fn table(&self) -> &Arc<TableCatalog> {
152 &self.core.table_catalog
153 }
154
155 pub fn column_descs(&self) -> Vec<ColumnDesc> {
157 self.core.column_descs()
158 }
159
160 pub fn output_column_ids(&self) -> Vec<ColumnId> {
162 self.core.output_column_ids()
163 }
164
165 pub fn table_indexes(&self) -> &[Arc<TableIndex>] {
167 &self.core.table_indexes
168 }
169
170 pub fn vector_indexes(&self) -> &[Arc<VectorIndex>] {
172 &self.core.vector_indexes
173 }
174
175 pub fn predicate(&self) -> &Condition {
177 &self.core.predicate
178 }
179
180 pub fn get_out_column_index_order(&self) -> Order {
183 self.core.get_out_column_index_order()
184 }
185
186 pub fn distribution_key(&self) -> Option<Vec<usize>> {
187 self.core.distribution_key()
188 }
189
190 pub fn watermark_columns(&self) -> WatermarkColumns {
191 self.core.watermark_columns()
192 }
193
194 pub fn cross_database(&self) -> bool {
195 self.core.cross_database()
196 }
197
198 pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Arc<TableIndex>> {
200 self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
201 .into_iter()
202 .map(|(index, _)| index)
203 .collect()
204 }
205
206 pub fn indexes_satisfy_order_with_prefix(
211 &self,
212 required_order: &Order,
213 prefix: &HashSet<ColumnOrder>,
214 ) -> Vec<(&Arc<TableIndex>, Order)> {
215 let output_col_map = self
216 .output_col_idx()
217 .iter()
218 .cloned()
219 .enumerate()
220 .map(|(id, col)| (col, id))
221 .collect::<BTreeMap<_, _>>();
222 let unmatched_idx = output_col_map.len();
223 let mut index_catalog_and_orders = vec![];
224 for index in self.table_indexes() {
225 let s2p_mapping = index.secondary_to_primary_mapping();
226 let index_orders: Vec<ColumnOrder> = index
227 .index_table
228 .pk()
229 .iter()
230 .map(|idx_item| {
231 let idx = match s2p_mapping.get(&idx_item.column_index) {
232 Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
233 None => unmatched_idx,
235 };
236 ColumnOrder::new(idx, idx_item.order_type)
237 })
238 .collect();
239
240 let mut index_orders_iter = index_orders.into_iter().peekable();
241
242 let fixed_prefix = {
244 let mut fixed_prefix = vec![];
245 loop {
246 match index_orders_iter.peek() {
247 Some(index_col_order) if prefix.contains(index_col_order) => {
248 let index_col_order = index_orders_iter.next().unwrap();
249 fixed_prefix.push(index_col_order);
250 }
251 _ => break,
252 }
253 }
254 Order {
255 column_orders: fixed_prefix,
256 }
257 };
258
259 let remaining_orders = Order {
260 column_orders: index_orders_iter.collect(),
261 };
262 if remaining_orders.satisfies(required_order) {
263 index_catalog_and_orders.push((index, fixed_prefix));
264 }
265 }
266 index_catalog_and_orders
267 }
268
269 pub fn to_index_scan_if_index_covered(&self, index: &Arc<TableIndex>) -> Option<LogicalScan> {
271 let p2s_mapping = index.primary_to_secondary_mapping();
272 if self
273 .required_col_idx()
274 .iter()
275 .all(|x| p2s_mapping.contains_key(x))
276 {
277 let index_scan = self.core.to_index_scan(
278 index.index_table.clone(),
279 p2s_mapping,
280 index.function_mapping(),
281 );
282 Some(index_scan.into())
283 } else {
284 None
285 }
286 }
287
288 pub fn primary_key(&self) -> &[ColumnOrder] {
289 self.core.primary_key()
290 }
291
292 fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
294 self.output_col_idx()
295 .iter()
296 .enumerate()
297 .map(|(i, &col_idx)| {
298 InputRef::new(
299 i,
300 self.table().columns[col_idx].column_desc.data_type.clone(),
301 )
302 .into()
303 })
304 .collect_vec()
305 }
306
307 pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
309 let mut predicate = self.predicate().clone();
310 if predicate.always_true() {
311 return (self.core.clone(), Condition::true_cond(), None);
312 }
313
314 let mut inverse_mapping = {
315 let mapping = ColIndexMapping::new(
316 self.required_col_idx().iter().map(|i| Some(*i)).collect(),
317 self.table().columns.len(),
318 );
319 let mut inverse_map = vec![None; mapping.target_size()];
321 for (src, dst) in mapping.mapping_pairs() {
322 inverse_map[dst] = Some(src);
323 }
324 ColIndexMapping::new(inverse_map, mapping.source_size())
325 };
326
327 predicate = predicate.rewrite_expr(&mut inverse_mapping);
328
329 let scan_without_predicate = generic::TableScan::new(
330 self.required_col_idx().clone(),
331 self.core.table_catalog.clone(),
332 self.table_indexes().to_vec(),
333 self.vector_indexes().to_vec(),
334 self.ctx(),
335 Condition::true_cond(),
336 self.as_of(),
337 );
338 let project_expr = if self.required_col_idx() != self.output_col_idx() {
339 Some(self.output_idx_to_input_ref())
340 } else {
341 None
342 };
343 (scan_without_predicate, predicate, project_expr)
344 }
345
346 fn clone_with_predicate(&self, predicate: Condition) -> Self {
347 generic::TableScan::new_inner(
348 self.output_col_idx().clone(),
349 self.table().clone(),
350 self.table_indexes().to_vec(),
351 self.vector_indexes().to_vec(),
352 self.base.ctx(),
353 predicate,
354 self.as_of(),
355 )
356 .into()
357 }
358
359 pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
360 generic::TableScan::new_inner(
361 output_col_idx,
362 self.core.table_catalog.clone(),
363 self.table_indexes().to_vec(),
364 self.vector_indexes().to_vec(),
365 self.base.ctx(),
366 self.predicate().clone(),
367 self.as_of(),
368 )
369 .into()
370 }
371
372 pub fn output_col_idx(&self) -> &Vec<usize> {
373 &self.core.output_col_idx
374 }
375
376 pub fn required_col_idx(&self) -> &Vec<usize> {
377 &self.core.required_col_idx
378 }
379}
380
381impl_plan_tree_node_for_leaf! { Logical, LogicalScan}
382
383impl Distill for LogicalScan {
384 fn distill<'a>(&self) -> XmlNode<'a> {
385 let verbose = self.base.ctx().is_explain_verbose();
386 let mut vec = Vec::with_capacity(5);
387 vec.push(("table", Pretty::from(self.table_name().to_owned())));
388 let key_is_columns =
389 self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
390 let key = if key_is_columns {
391 "columns"
392 } else {
393 "output_columns"
394 };
395 vec.push((key, self.core.columns_pretty(verbose)));
396 if !key_is_columns {
397 vec.push((
398 "required_columns",
399 Pretty::Array(
400 self.required_col_idx()
401 .iter()
402 .map(|i| {
403 let col_name = &self.table().columns[*i].name;
404 Pretty::from(if verbose {
405 format!("{}.{}", self.table_name(), col_name)
406 } else {
407 col_name.clone()
408 })
409 })
410 .collect(),
411 ),
412 ));
413 }
414
415 if !self.predicate().always_true() {
416 let input_schema = self.core.fields_pretty_schema();
417 vec.push((
418 "predicate",
419 Pretty::display(&ConditionDisplay {
420 condition: self.predicate(),
421 input_schema: &input_schema,
422 }),
423 ))
424 }
425
426 if self.table_cardinality() != Cardinality::unknown() {
427 vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
428 }
429
430 childless_record("LogicalScan", vec)
431 }
432}
433
434impl ColPrunable for LogicalScan {
435 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
436 let output_col_idx: Vec<usize> = required_cols
437 .iter()
438 .map(|i| self.required_col_idx()[*i])
439 .collect();
440 assert!(
441 output_col_idx
442 .iter()
443 .all(|i| self.output_col_idx().contains(i))
444 );
445
446 self.clone_with_output_indices(output_col_idx).into()
447 }
448}
449
450impl ExprRewritable<Logical> for LogicalScan {
451 fn has_rewritable_expr(&self) -> bool {
452 true
453 }
454
455 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
456 let mut core = self.core.clone();
457 core.rewrite_exprs(r);
458 Self {
459 base: self.base.clone_with_new_plan_id(),
460 core,
461 }
462 .into()
463 }
464}
465
466impl ExprVisitable for LogicalScan {
467 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
468 self.core.visit_exprs(v);
469 }
470}
471
472impl PredicatePushdown for LogicalScan {
473 fn predicate_pushdown(
474 &self,
475 mut predicate: Condition,
476 _ctx: &mut PredicatePushdownContext,
477 ) -> PlanRef {
478 struct HasCorrelated {
481 has: bool,
482 }
483 impl ExprVisitor for HasCorrelated {
484 fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
485 self.has = true;
486 }
487 }
488 let non_pushable_predicate: Vec<_> = predicate
489 .conjunctions
490 .extract_if(.., |expr| {
491 if expr.count_nows() > 0 {
492 true
493 } else {
494 let mut visitor = HasCorrelated { has: false };
495 visitor.visit_expr(expr);
496 visitor.has
497 }
498 })
499 .collect();
500 let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
501 self.output_col_idx().iter().map(|i| Some(*i)).collect(),
502 self.table().columns.len(),
503 ));
504 if non_pushable_predicate.is_empty() {
505 self.clone_with_predicate(predicate.and(self.predicate().clone()))
506 .into()
507 } else {
508 LogicalFilter::create(
509 self.clone_with_predicate(predicate.and(self.predicate().clone()))
510 .into(),
511 Condition {
512 conjunctions: non_pushable_predicate,
513 },
514 )
515 }
516 }
517}
518
519impl LogicalScan {
520 fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
521 if self.predicate().always_true() {
522 required_order
523 .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
524 } else {
525 let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
526 self.table(),
527 self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
528 )?;
529 let mut scan = self.clone();
530 scan.core.predicate = predicate; let plan: BatchPlanRef = if scan.core.predicate.always_false() {
533 LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
534 } else {
535 let (scan, predicate, project_expr) = scan.predicate_pull_up();
536
537 let mut plan: BatchPlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
538 if !predicate.always_true() {
539 plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
540 }
541 if let Some(exprs) = project_expr {
542 plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
543 }
544 plan
545 };
546
547 assert_eq!(plan.schema(), self.schema());
548 required_order.enforce_if_not_satisfies(plan)
549 }
550 }
551
552 fn use_index_scan_if_order_is_satisfied(
555 &self,
556 required_order: &Order,
557 ) -> Option<Result<BatchPlanRef>> {
558 if required_order.column_orders.is_empty() {
559 return None;
560 }
561
562 let order_satisfied_index = self.indexes_satisfy_order(required_order);
563 for index in order_satisfied_index {
564 if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
565 return Some(index_scan.to_batch());
566 }
567 }
568
569 None
570 }
571}
572
573impl ToBatch for LogicalScan {
574 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
575 self.to_batch_with_order_required(&Order::any())
576 }
577
578 fn to_batch_with_order_required(
579 &self,
580 required_order: &Order,
581 ) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
582 let new = self.clone_with_predicate(self.predicate().clone());
583
584 if !new.table_indexes().is_empty()
585 && self
586 .base
587 .ctx()
588 .session_ctx()
589 .config()
590 .enable_index_selection()
591 {
592 let index_selection_rule = IndexSelectionRule::create();
593 if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
594 if let Some(scan) = applied.as_logical_scan() {
595 return required_order.enforce_if_not_satisfies(scan.to_batch()?);
597 } else if let Some(join) = applied.as_logical_join() {
598 return required_order
600 .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
601 } else {
602 unreachable!();
603 }
604 } else {
605 if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
607 return plan_ref;
608 }
609 }
610 }
611 new.to_batch_inner_with_required(required_order)
612 }
613}
614
615impl ToStream for LogicalScan {
616 fn to_stream(
617 &self,
618 ctx: &mut ToStreamContext,
619 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
620 if self.predicate().always_true() {
621 if self.core.cross_database() && ctx.backfill_type() == BackfillType::UpstreamOnly {
622 return Err(ErrorCode::NotSupported(
623 "We currently do not support cross database scan in upstream only mode."
624 .to_owned(),
625 "Please ensure the source table is in the same database.".to_owned(),
626 )
627 .into());
628 }
629
630 Ok(StreamTableScan::new_with_stream_scan_type(
631 self.core.clone(),
632 ctx.backfill_type().to_stream_scan_type(),
633 )
634 .into())
635 } else {
636 let (scan, predicate, project_expr) = self.predicate_pull_up();
637 let mut plan = LogicalFilter::create(scan.into(), predicate);
638 if let Some(exprs) = project_expr {
639 plan = LogicalProject::create(plan, exprs)
640 }
641 plan.to_stream(ctx)
642 }
643 }
644
645 fn logical_rewrite_for_stream(
646 &self,
647 _ctx: &mut RewriteStreamContext,
648 ) -> Result<(PlanRef, ColIndexMapping)> {
649 match self.base.stream_key().is_none() {
650 true => {
651 let mut col_ids = HashSet::new();
652
653 for &idx in self.output_col_idx() {
654 col_ids.insert(self.table().columns[idx].column_id);
655 }
656 let col_need_to_add = self
657 .table()
658 .pk
659 .iter()
660 .filter_map(|c| {
661 if !col_ids.contains(&self.table().columns[c.column_index].column_id) {
662 Some(c.column_index)
663 } else {
664 None
665 }
666 })
667 .collect_vec();
668
669 let mut output_col_idx = self.output_col_idx().clone();
670 output_col_idx.extend(col_need_to_add);
671 let new_len = output_col_idx.len();
672 Ok((
673 self.clone_with_output_indices(output_col_idx).into(),
674 ColIndexMapping::identity_or_none(self.schema().len(), new_len),
675 ))
676 }
677 false => Ok((
678 self.clone().into(),
679 ColIndexMapping::identity(self.schema().len()),
680 )),
681 }
682 }
683
684 fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
685 if !self
686 .core
687 .ctx()
688 .session_ctx()
689 .config()
690 .enable_index_selection()
691 {
692 return None;
693 }
694 if columns.is_empty() {
695 return None;
696 }
697 if self.table_indexes().is_empty() {
698 return None;
699 }
700 let orders = if columns.len() <= 3 {
701 OrderType::all()
702 } else {
703 vec![OrderType::ascending_nulls_last(), OrderType::descending()]
708 };
709 for order_type_combo in columns
710 .iter()
711 .map(|&col| orders.iter().map(move |ot| ColumnOrder::new(col, *ot)))
712 .multi_cartesian_product()
713 .take(256)
714 {
716 let required_order = Order {
717 column_orders: order_type_combo,
718 };
719
720 let order_satisfied_index = self.indexes_satisfy_order(&required_order);
721 for index in order_satisfied_index {
722 if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
723 if let Some(dist_key) = index_scan.distribution_key()
729 && dist_key.iter().all(|k| columns.contains(k))
730 {
731 return Some(index_scan.into());
732 }
733 }
734 }
735 }
736 None
737 }
738}