1use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnDesc, Schema};
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::AsOf;
23
24use super::generic::{GenericPlanNode, GenericPlanRef};
25use super::utils::{Distill, childless_record};
26use super::{
27 BackfillType, BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical,
28 LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamFilter,
29 StreamProject, StreamTableScan, ToBatch, ToStream, generic,
30};
31use crate::TableCatalog;
32use crate::binder::BoundBaseTable;
33use crate::catalog::ColumnId;
34use crate::catalog::index_catalog::{IndexType, TableIndex, VectorIndex};
35use crate::error::{ErrorCode, Result};
36use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
37use crate::optimizer::ApplyResult;
38use crate::optimizer::optimizer_context::OptimizerContextRef;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::plan_node_meta::AnyPlanNodeMeta;
41use crate::optimizer::plan_node::{
42 BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
43 PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
44};
45use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
46use crate::optimizer::rule::IndexSelectionRule;
47use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct LogicalScan {
52 pub base: PlanBase<Logical>,
53 core: generic::TableScan,
54}
55
56impl From<generic::TableScan> for LogicalScan {
57 fn from(core: generic::TableScan) -> Self {
58 let base = PlanBase::new_logical_with_core(&core);
59 Self { base, core }
60 }
61}
62
63impl From<generic::TableScan> for PlanRef {
64 fn from(core: generic::TableScan) -> Self {
65 LogicalScan::from(core).into()
66 }
67}
68
69impl GenericPlanRef for LogicalScan {
70 fn id(&self) -> PlanNodeId {
71 self.plan_base().id()
72 }
73
74 fn schema(&self) -> &Schema {
75 self.plan_base().schema()
76 }
77
78 fn stream_key(&self) -> Option<&[usize]> {
79 self.plan_base().stream_key()
80 }
81
82 fn ctx(&self) -> OptimizerContextRef {
83 self.plan_base().ctx()
84 }
85
86 fn functional_dependency(&self) -> &FunctionalDependencySet {
87 self.plan_base().functional_dependency()
88 }
89}
90
91impl LogicalScan {
92 pub fn create(
94 table_catalog: Arc<TableCatalog>,
95 ctx: OptimizerContextRef,
96 as_of: Option<AsOf>,
97 ) -> Self {
98 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
99 generic::TableScan::new(
100 output_col_idx,
101 table_catalog,
102 vec![],
103 vec![],
104 ctx,
105 Condition::true_cond(),
106 as_of,
107 )
108 .into()
109 }
110
111 pub fn from_base_table(
112 base_table: &BoundBaseTable,
113 ctx: OptimizerContextRef,
114 as_of: Option<AsOf>,
115 ) -> Self {
116 let table_catalog = base_table.table_catalog.clone();
117 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
118 let mut table_indexes = vec![];
119 let mut vector_indexes = vec![];
120 for index in &base_table.table_indexes {
121 match &index.index_type {
122 IndexType::Table(index) => table_indexes.push(index.clone()),
123 IndexType::Vector(index) => vector_indexes.push(index.clone()),
124 }
125 }
126 generic::TableScan::new(
127 output_col_idx,
128 table_catalog,
129 table_indexes,
130 vector_indexes,
131 ctx,
132 Condition::true_cond(),
133 as_of,
134 )
135 .into()
136 }
137
138 pub fn table_name(&self) -> &str {
139 &self.core.table_catalog.name
140 }
141
142 pub fn as_of(&self) -> Option<AsOf> {
143 self.core.as_of.clone()
144 }
145
146 pub fn table_cardinality(&self) -> Cardinality {
148 self.core.table_catalog.cardinality
149 }
150
151 pub fn table(&self) -> &Arc<TableCatalog> {
152 &self.core.table_catalog
153 }
154
155 pub fn column_descs(&self) -> Vec<ColumnDesc> {
157 self.core.column_descs()
158 }
159
160 pub fn output_column_ids(&self) -> Vec<ColumnId> {
162 self.core.output_column_ids()
163 }
164
165 pub fn table_indexes(&self) -> &[Arc<TableIndex>] {
167 &self.core.table_indexes
168 }
169
170 pub fn vector_indexes(&self) -> &[Arc<VectorIndex>] {
172 &self.core.vector_indexes
173 }
174
175 pub fn predicate(&self) -> &Condition {
177 &self.core.predicate
178 }
179
180 pub fn get_out_column_index_order(&self) -> Order {
183 self.core.get_out_column_index_order()
184 }
185
186 pub fn distribution_key(&self) -> Option<Vec<usize>> {
187 self.core.distribution_key()
188 }
189
190 pub fn watermark_columns(&self) -> WatermarkColumns {
191 self.core.watermark_columns()
192 }
193
194 pub fn cross_database(&self) -> bool {
195 self.core.cross_database()
196 }
197
198 pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Arc<TableIndex>> {
200 self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
201 .into_iter()
202 .map(|(index, _)| index)
203 .collect()
204 }
205
206 pub fn indexes_satisfy_order_with_prefix(
211 &self,
212 required_order: &Order,
213 prefix: &HashSet<ColumnOrder>,
214 ) -> Vec<(&Arc<TableIndex>, Order)> {
215 let output_col_map = self
216 .output_col_idx()
217 .iter()
218 .cloned()
219 .enumerate()
220 .map(|(id, col)| (col, id))
221 .collect::<BTreeMap<_, _>>();
222 let unmatched_idx = output_col_map.len();
223 let mut index_catalog_and_orders = vec![];
224 for index in self.table_indexes() {
225 let s2p_mapping = index.secondary_to_primary_mapping();
226 let index_orders: Vec<ColumnOrder> = index
227 .index_table
228 .pk()
229 .iter()
230 .map(|idx_item| {
231 let idx = match s2p_mapping.get(&idx_item.column_index) {
232 Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
233 None => unmatched_idx,
235 };
236 ColumnOrder::new(idx, idx_item.order_type)
237 })
238 .collect();
239
240 let mut index_orders_iter = index_orders.into_iter().peekable();
241
242 let fixed_prefix = {
244 let mut fixed_prefix = vec![];
245 loop {
246 match index_orders_iter.peek() {
247 Some(index_col_order) if prefix.contains(index_col_order) => {
248 let index_col_order = index_orders_iter.next().unwrap();
249 fixed_prefix.push(index_col_order);
250 }
251 _ => break,
252 }
253 }
254 Order {
255 column_orders: fixed_prefix,
256 }
257 };
258
259 let remaining_orders = Order {
260 column_orders: index_orders_iter.collect(),
261 };
262 if remaining_orders.satisfies(required_order) {
263 index_catalog_and_orders.push((index, fixed_prefix));
264 }
265 }
266 index_catalog_and_orders
267 }
268
269 pub fn to_index_scan_if_index_covered(&self, index: &Arc<TableIndex>) -> Option<LogicalScan> {
271 let p2s_mapping = index.primary_to_secondary_mapping();
272 if self
273 .required_col_idx()
274 .iter()
275 .all(|x| p2s_mapping.contains_key(x))
276 {
277 let index_scan = self.core.to_index_scan(
278 index.index_table.clone(),
279 p2s_mapping,
280 index.function_mapping(),
281 );
282 Some(index_scan.into())
283 } else {
284 None
285 }
286 }
287
288 pub fn primary_key(&self) -> &[ColumnOrder] {
289 self.core.primary_key()
290 }
291
292 fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
294 self.output_col_idx()
295 .iter()
296 .enumerate()
297 .map(|(i, &col_idx)| {
298 InputRef::new(
299 i,
300 self.table().columns[col_idx].column_desc.data_type.clone(),
301 )
302 .into()
303 })
304 .collect_vec()
305 }
306
307 pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
309 let mut predicate = self.predicate().clone();
310 if predicate.always_true() {
311 return (self.core.clone(), Condition::true_cond(), None);
312 }
313
314 let mut inverse_mapping = {
315 let mapping = ColIndexMapping::new(
316 self.required_col_idx().iter().map(|i| Some(*i)).collect(),
317 self.table().columns.len(),
318 );
319 let mut inverse_map = vec![None; mapping.target_size()];
321 for (src, dst) in mapping.mapping_pairs() {
322 inverse_map[dst] = Some(src);
323 }
324 ColIndexMapping::new(inverse_map, mapping.source_size())
325 };
326
327 predicate = predicate.rewrite_expr(&mut inverse_mapping);
328
329 let scan_without_predicate = generic::TableScan::new(
330 self.required_col_idx().clone(),
331 self.core.table_catalog.clone(),
332 self.table_indexes().to_vec(),
333 self.vector_indexes().to_vec(),
334 self.ctx(),
335 Condition::true_cond(),
336 self.as_of(),
337 );
338 let project_expr = if self.required_col_idx() != self.output_col_idx() {
339 Some(self.output_idx_to_input_ref())
340 } else {
341 None
342 };
343 (scan_without_predicate, predicate, project_expr)
344 }
345
346 fn clone_with_predicate(&self, predicate: Condition) -> Self {
347 generic::TableScan::new_inner(
348 self.output_col_idx().clone(),
349 self.table().clone(),
350 self.table_indexes().to_vec(),
351 self.vector_indexes().to_vec(),
352 self.base.ctx(),
353 predicate,
354 self.as_of(),
355 self.core.table_scan_stream_key().map(ToOwned::to_owned),
356 )
357 .into()
358 }
359
360 pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
361 generic::TableScan::new_inner(
362 output_col_idx,
363 self.core.table_catalog.clone(),
364 self.table_indexes().to_vec(),
365 self.vector_indexes().to_vec(),
366 self.base.ctx(),
367 self.predicate().clone(),
368 self.as_of(),
369 self.core.table_scan_stream_key().map(ToOwned::to_owned),
370 )
371 .into()
372 }
373
374 pub fn output_col_idx(&self) -> &Vec<usize> {
375 &self.core.output_col_idx
376 }
377
378 pub fn required_col_idx(&self) -> &Vec<usize> {
379 &self.core.required_col_idx
380 }
381}
382
383impl_plan_tree_node_for_leaf! { Logical, LogicalScan}
384
385impl Distill for LogicalScan {
386 fn distill<'a>(&self) -> XmlNode<'a> {
387 let verbose = self.base.ctx().is_explain_verbose();
388 let mut vec = Vec::with_capacity(5);
389 vec.push(("table", Pretty::from(self.table_name().to_owned())));
390 let key_is_columns =
391 self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
392 let key = if key_is_columns {
393 "columns"
394 } else {
395 "output_columns"
396 };
397 vec.push((key, self.core.columns_pretty(verbose)));
398 if !key_is_columns {
399 vec.push((
400 "required_columns",
401 Pretty::Array(
402 self.required_col_idx()
403 .iter()
404 .map(|i| {
405 let col_name = &self.table().columns[*i].name;
406 Pretty::from(if verbose {
407 format!("{}.{}", self.table_name(), col_name)
408 } else {
409 col_name.clone()
410 })
411 })
412 .collect(),
413 ),
414 ));
415 }
416
417 if !self.predicate().always_true() {
418 let input_schema = self.core.fields_pretty_schema();
419 vec.push((
420 "predicate",
421 Pretty::display(&ConditionDisplay {
422 condition: self.predicate(),
423 input_schema: &input_schema,
424 }),
425 ))
426 }
427
428 if self.table_cardinality() != Cardinality::unknown() {
429 vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
430 }
431
432 childless_record("LogicalScan", vec)
433 }
434}
435
436impl ColPrunable for LogicalScan {
437 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
438 let output_col_idx: Vec<usize> = required_cols
439 .iter()
440 .map(|i| self.required_col_idx()[*i])
441 .collect();
442 assert!(
443 output_col_idx
444 .iter()
445 .all(|i| self.output_col_idx().contains(i))
446 );
447
448 self.clone_with_output_indices(output_col_idx).into()
449 }
450}
451
452impl ExprRewritable<Logical> for LogicalScan {
453 fn has_rewritable_expr(&self) -> bool {
454 true
455 }
456
457 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
458 let mut core = self.core.clone();
459 core.rewrite_exprs(r);
460 Self {
461 base: self.base.clone_with_new_plan_id(),
462 core,
463 }
464 .into()
465 }
466}
467
468impl ExprVisitable for LogicalScan {
469 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
470 self.core.visit_exprs(v);
471 }
472}
473
474impl PredicatePushdown for LogicalScan {
475 fn predicate_pushdown(
476 &self,
477 mut predicate: Condition,
478 _ctx: &mut PredicatePushdownContext,
479 ) -> PlanRef {
480 struct HasCorrelated {
483 has: bool,
484 }
485 impl ExprVisitor for HasCorrelated {
486 fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
487 self.has = true;
488 }
489 }
490 let non_pushable_predicate: Vec<_> = predicate
491 .conjunctions
492 .extract_if(.., |expr| {
493 if expr.count_nows() > 0 {
494 true
495 } else {
496 let mut visitor = HasCorrelated { has: false };
497 visitor.visit_expr(expr);
498 visitor.has
499 }
500 })
501 .collect();
502 let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
503 self.output_col_idx().iter().map(|i| Some(*i)).collect(),
504 self.table().columns.len(),
505 ));
506 if non_pushable_predicate.is_empty() {
507 self.clone_with_predicate(predicate.and(self.predicate().clone()))
508 .into()
509 } else {
510 LogicalFilter::create(
511 self.clone_with_predicate(predicate.and(self.predicate().clone()))
512 .into(),
513 Condition {
514 conjunctions: non_pushable_predicate,
515 },
516 )
517 }
518 }
519}
520
521impl LogicalScan {
522 fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
523 if self.predicate().always_true() {
524 required_order
525 .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
526 } else {
527 let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
528 self.table(),
529 self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
530 )?;
531 let mut scan = self.clone();
532 scan.core.predicate = predicate; let plan: BatchPlanRef = if scan.core.predicate.always_false() {
535 LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
536 } else {
537 let (scan, predicate, project_expr) = scan.predicate_pull_up();
538
539 let mut plan: BatchPlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
540 if !predicate.always_true() {
541 plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
542 }
543 if let Some(exprs) = project_expr {
544 plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
545 }
546 plan
547 };
548
549 assert_eq!(plan.schema(), self.schema());
550 required_order.enforce_if_not_satisfies(plan)
551 }
552 }
553
554 fn use_index_scan_if_order_is_satisfied(
557 &self,
558 required_order: &Order,
559 ) -> Option<Result<BatchPlanRef>> {
560 if required_order.column_orders.is_empty() {
561 return None;
562 }
563
564 let order_satisfied_index = self.indexes_satisfy_order(required_order);
565 for index in order_satisfied_index {
566 if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
567 return Some(index_scan.to_batch());
568 }
569 }
570
571 None
572 }
573}
574
575impl ToBatch for LogicalScan {
576 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
577 self.to_batch_with_order_required(&Order::any())
578 }
579
580 fn to_batch_with_order_required(
581 &self,
582 required_order: &Order,
583 ) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
584 let new = self.clone_with_predicate(self.predicate().clone());
585
586 if !new.table_indexes().is_empty()
587 && self
588 .base
589 .ctx()
590 .session_ctx()
591 .config()
592 .enable_index_selection()
593 {
594 let index_selection_rule = IndexSelectionRule::create();
595 if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
596 if let Some(scan) = applied.as_logical_scan() {
597 return required_order.enforce_if_not_satisfies(scan.to_batch()?);
599 } else if let Some(join) = applied.as_logical_join() {
600 return required_order
602 .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
603 } else {
604 unreachable!();
605 }
606 } else {
607 if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
609 return plan_ref;
610 }
611 }
612 }
613 new.to_batch_inner_with_required(required_order)
614 }
615}
616
617impl ToStream for LogicalScan {
618 fn to_stream(
619 &self,
620 ctx: &mut ToStreamContext,
621 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
622 if self.predicate().always_true() {
623 if self.core.cross_database() && ctx.backfill_type() == BackfillType::UpstreamOnly {
624 return Err(ErrorCode::NotSupported(
625 "We currently do not support cross database scan in upstream only mode."
626 .to_owned(),
627 "Please ensure the source table is in the same database.".to_owned(),
628 )
629 .into());
630 }
631
632 Ok(StreamTableScan::new_with_stream_scan_type(
633 self.core.clone(),
634 ctx.backfill_type().to_stream_scan_type(),
635 )
636 .into())
637 } else {
638 if ctx.backfill_type() == BackfillType::SnapshotBackfill {
639 let (scan_ranges, _residual) = self.predicate().clone().split_to_scan_ranges(
640 self.table(),
641 self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
642 )?;
643
644 if scan_ranges.len() == 1 && !scan_ranges[0].is_full_table_scan() {
645 let (core, predicate, project_expr) = self.predicate_pull_up();
646 let scan = StreamTableScan::new_with_scan_range(
647 core,
648 ctx.backfill_type().to_stream_scan_type(),
649 Some(scan_ranges.into_iter().next().unwrap()),
650 );
651 LogicalFilter::check_stream_predicate(&predicate)?;
652
653 let mut plan: crate::optimizer::plan_node::StreamPlanRef =
654 StreamFilter::new(generic::Filter::new(predicate, scan.into())).into();
655 if let Some(exprs) = project_expr {
656 plan = StreamProject::new(generic::Project::new(exprs, plan)).into();
657 }
658 return Ok(plan);
659 }
660 }
661
662 let (scan, predicate, project_expr) = self.predicate_pull_up();
663 let mut plan = LogicalFilter::create(scan.into(), predicate);
664 if let Some(exprs) = project_expr {
665 plan = LogicalProject::create(plan, exprs)
666 }
667 plan.to_stream(ctx)
668 }
669 }
670
671 fn logical_rewrite_for_stream(
672 &self,
673 ctx: &mut RewriteStreamContext,
674 ) -> Result<(PlanRef, ColIndexMapping)> {
675 let mut output_col_idx = self.output_col_idx().clone();
676 let original_len = output_col_idx.len();
677
678 for idx in self.table().pk.iter().map(|c| c.column_index) {
680 if !output_col_idx.contains(&idx) {
681 output_col_idx.push(idx);
682 }
683 }
684 for idx in self.table().stream_key() {
686 if !output_col_idx.contains(&idx) {
687 output_col_idx.push(idx);
688 }
689 }
690
691 let (mut scan, col_index_mapping) = if output_col_idx.len() == original_len {
692 (self.clone(), ColIndexMapping::identity(self.schema().len()))
693 } else {
694 let new_len = output_col_idx.len();
695 (
696 self.clone_with_output_indices(output_col_idx),
697 ColIndexMapping::identity_or_none(self.schema().len(), new_len),
698 )
699 };
700
701 if matches!(ctx.backfill_type(), BackfillType::SnapshotBackfill)
702 || self.core.cross_database()
703 {
704 scan.core.extend_table_scan_stream_key_with_primary_key();
709 scan.base = PlanBase::new_logical_with_core(&scan.core);
710 }
711 Ok((scan.into(), col_index_mapping))
712 }
713
714 fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
715 if columns.is_empty() {
716 return None;
717 }
718 let enable_index_selection = self
719 .core
720 .ctx()
721 .session_ctx()
722 .config()
723 .enable_index_selection();
724 let has_indexes = !self.table_indexes().is_empty();
725 let primary_order = self.get_out_column_index_order();
726 let primary_dist_key_satisfied = self
727 .distribution_key()
728 .is_some_and(|dist_key| dist_key.iter().all(|k| columns.contains(k)));
729 let orders = if columns.len() <= 3 {
730 OrderType::all()
731 } else {
732 vec![OrderType::ascending_nulls_last(), OrderType::descending()]
737 };
738 for order_type_combo in columns
739 .iter()
740 .map(|&col| orders.iter().map(move |ot| ColumnOrder::new(col, *ot)))
741 .multi_cartesian_product()
742 .take(256)
743 {
745 let required_order = Order {
746 column_orders: order_type_combo,
747 };
748
749 if primary_dist_key_satisfied && primary_order.satisfies(&required_order) {
750 return Some(self.clone().into());
751 }
752
753 if !enable_index_selection || !has_indexes {
754 continue;
755 }
756
757 let order_satisfied_index = self.indexes_satisfy_order(&required_order);
758 for index in order_satisfied_index {
759 if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
760 if let Some(dist_key) = index_scan.distribution_key()
766 && dist_key.iter().all(|k| columns.contains(k))
767 {
768 return Some(index_scan.into());
769 }
770 }
771 }
772 }
773 None
774 }
775}