1use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{ColumnDesc, Schema};
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_pb::stream_plan::StreamScanType;
23use risingwave_sqlparser::ast::AsOf;
24
25use super::generic::{GenericPlanNode, GenericPlanRef};
26use super::utils::{Distill, childless_record};
27use super::{
28 BatchFilter, BatchPlanRef, BatchProject, ColPrunable, ExprRewritable, Logical,
29 LogicalPlanRef as PlanRef, PlanBase, PlanNodeId, PredicatePushdown, StreamTableScan, ToBatch,
30 ToStream, generic,
31};
32use crate::TableCatalog;
33use crate::binder::BoundBaseTable;
34use crate::catalog::ColumnId;
35use crate::catalog::index_catalog::{IndexType, TableIndex, VectorIndex};
36use crate::error::{ErrorCode, Result};
37use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
38use crate::optimizer::ApplyResult;
39use crate::optimizer::optimizer_context::OptimizerContextRef;
40use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
41use crate::optimizer::plan_node::plan_node_meta::AnyPlanNodeMeta;
42use crate::optimizer::plan_node::{
43 BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
44 PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
45};
46use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order, WatermarkColumns};
47use crate::optimizer::rule::IndexSelectionRule;
48use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct LogicalScan {
53 pub base: PlanBase<Logical>,
54 core: generic::TableScan,
55}
56
57impl From<generic::TableScan> for LogicalScan {
58 fn from(core: generic::TableScan) -> Self {
59 let base = PlanBase::new_logical_with_core(&core);
60 Self { base, core }
61 }
62}
63
64impl From<generic::TableScan> for PlanRef {
65 fn from(core: generic::TableScan) -> Self {
66 LogicalScan::from(core).into()
67 }
68}
69
70impl GenericPlanRef for LogicalScan {
71 fn id(&self) -> PlanNodeId {
72 self.plan_base().id()
73 }
74
75 fn schema(&self) -> &Schema {
76 self.plan_base().schema()
77 }
78
79 fn stream_key(&self) -> Option<&[usize]> {
80 self.plan_base().stream_key()
81 }
82
83 fn ctx(&self) -> OptimizerContextRef {
84 self.plan_base().ctx()
85 }
86
87 fn functional_dependency(&self) -> &FunctionalDependencySet {
88 self.plan_base().functional_dependency()
89 }
90}
91
92impl LogicalScan {
93 pub fn create(
95 table_catalog: Arc<TableCatalog>,
96 ctx: OptimizerContextRef,
97 as_of: Option<AsOf>,
98 ) -> Self {
99 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
100 generic::TableScan::new(
101 output_col_idx,
102 table_catalog,
103 vec![],
104 vec![],
105 ctx,
106 Condition::true_cond(),
107 as_of,
108 )
109 .into()
110 }
111
112 pub fn from_base_table(
113 base_table: &BoundBaseTable,
114 ctx: OptimizerContextRef,
115 as_of: Option<AsOf>,
116 ) -> Self {
117 let table_catalog = base_table.table_catalog.clone();
118 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
119 let mut table_indexes = vec![];
120 let mut vector_indexes = vec![];
121 for index in &base_table.table_indexes {
122 match &index.index_type {
123 IndexType::Table(index) => table_indexes.push(index.clone()),
124 IndexType::Vector(index) => vector_indexes.push(index.clone()),
125 }
126 }
127 generic::TableScan::new(
128 output_col_idx,
129 table_catalog,
130 table_indexes,
131 vector_indexes,
132 ctx,
133 Condition::true_cond(),
134 as_of,
135 )
136 .into()
137 }
138
139 pub fn table_name(&self) -> &str {
140 &self.core.table_catalog.name
141 }
142
143 pub fn as_of(&self) -> Option<AsOf> {
144 self.core.as_of.clone()
145 }
146
147 pub fn table_cardinality(&self) -> Cardinality {
149 self.core.table_catalog.cardinality
150 }
151
152 pub fn table(&self) -> &Arc<TableCatalog> {
153 &self.core.table_catalog
154 }
155
156 pub fn column_descs(&self) -> Vec<ColumnDesc> {
158 self.core.column_descs()
159 }
160
161 pub fn output_column_ids(&self) -> Vec<ColumnId> {
163 self.core.output_column_ids()
164 }
165
166 pub fn table_indexes(&self) -> &[Arc<TableIndex>] {
168 &self.core.table_indexes
169 }
170
171 pub fn vector_indexes(&self) -> &[Arc<VectorIndex>] {
173 &self.core.vector_indexes
174 }
175
176 pub fn predicate(&self) -> &Condition {
178 &self.core.predicate
179 }
180
181 pub fn get_out_column_index_order(&self) -> Order {
184 self.core.get_out_column_index_order()
185 }
186
187 pub fn distribution_key(&self) -> Option<Vec<usize>> {
188 self.core.distribution_key()
189 }
190
191 pub fn watermark_columns(&self) -> WatermarkColumns {
192 self.core.watermark_columns()
193 }
194
195 pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Arc<TableIndex>> {
197 self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
198 .into_iter()
199 .map(|(index, _)| index)
200 .collect()
201 }
202
203 pub fn indexes_satisfy_order_with_prefix(
208 &self,
209 required_order: &Order,
210 prefix: &HashSet<ColumnOrder>,
211 ) -> Vec<(&Arc<TableIndex>, Order)> {
212 let output_col_map = self
213 .output_col_idx()
214 .iter()
215 .cloned()
216 .enumerate()
217 .map(|(id, col)| (col, id))
218 .collect::<BTreeMap<_, _>>();
219 let unmatched_idx = output_col_map.len();
220 let mut index_catalog_and_orders = vec![];
221 for index in self.table_indexes() {
222 let s2p_mapping = index.secondary_to_primary_mapping();
223 let index_orders: Vec<ColumnOrder> = index
224 .index_table
225 .pk()
226 .iter()
227 .map(|idx_item| {
228 let idx = match s2p_mapping.get(&idx_item.column_index) {
229 Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
230 None => unmatched_idx,
232 };
233 ColumnOrder::new(idx, idx_item.order_type)
234 })
235 .collect();
236
237 let mut index_orders_iter = index_orders.into_iter().peekable();
238
239 let fixed_prefix = {
241 let mut fixed_prefix = vec![];
242 loop {
243 match index_orders_iter.peek() {
244 Some(index_col_order) if prefix.contains(index_col_order) => {
245 let index_col_order = index_orders_iter.next().unwrap();
246 fixed_prefix.push(index_col_order);
247 }
248 _ => break,
249 }
250 }
251 Order {
252 column_orders: fixed_prefix,
253 }
254 };
255
256 let remaining_orders = Order {
257 column_orders: index_orders_iter.collect(),
258 };
259 if remaining_orders.satisfies(required_order) {
260 index_catalog_and_orders.push((index, fixed_prefix));
261 }
262 }
263 index_catalog_and_orders
264 }
265
266 pub fn to_index_scan_if_index_covered(&self, index: &Arc<TableIndex>) -> Option<LogicalScan> {
268 let p2s_mapping = index.primary_to_secondary_mapping();
269 if self
270 .required_col_idx()
271 .iter()
272 .all(|x| p2s_mapping.contains_key(x))
273 {
274 let index_scan = self.core.to_index_scan(
275 index.index_table.clone(),
276 p2s_mapping,
277 index.function_mapping(),
278 );
279 Some(index_scan.into())
280 } else {
281 None
282 }
283 }
284
285 pub fn primary_key(&self) -> &[ColumnOrder] {
286 self.core.primary_key()
287 }
288
289 fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
291 self.output_col_idx()
292 .iter()
293 .enumerate()
294 .map(|(i, &col_idx)| {
295 InputRef::new(
296 i,
297 self.table().columns[col_idx].column_desc.data_type.clone(),
298 )
299 .into()
300 })
301 .collect_vec()
302 }
303
304 pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
306 let mut predicate = self.predicate().clone();
307 if predicate.always_true() {
308 return (self.core.clone(), Condition::true_cond(), None);
309 }
310
311 let mut inverse_mapping = {
312 let mapping = ColIndexMapping::new(
313 self.required_col_idx().iter().map(|i| Some(*i)).collect(),
314 self.table().columns.len(),
315 );
316 let mut inverse_map = vec![None; mapping.target_size()];
318 for (src, dst) in mapping.mapping_pairs() {
319 inverse_map[dst] = Some(src);
320 }
321 ColIndexMapping::new(inverse_map, mapping.source_size())
322 };
323
324 predicate = predicate.rewrite_expr(&mut inverse_mapping);
325
326 let scan_without_predicate = generic::TableScan::new(
327 self.required_col_idx().to_vec(),
328 self.core.table_catalog.clone(),
329 self.table_indexes().to_vec(),
330 self.vector_indexes().to_vec(),
331 self.ctx(),
332 Condition::true_cond(),
333 self.as_of(),
334 );
335 let project_expr = if self.required_col_idx() != self.output_col_idx() {
336 Some(self.output_idx_to_input_ref())
337 } else {
338 None
339 };
340 (scan_without_predicate, predicate, project_expr)
341 }
342
343 fn clone_with_predicate(&self, predicate: Condition) -> Self {
344 generic::TableScan::new_inner(
345 self.output_col_idx().to_vec(),
346 self.table().clone(),
347 self.table_indexes().to_vec(),
348 self.vector_indexes().to_vec(),
349 self.base.ctx().clone(),
350 predicate,
351 self.as_of(),
352 )
353 .into()
354 }
355
356 pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
357 generic::TableScan::new_inner(
358 output_col_idx,
359 self.core.table_catalog.clone(),
360 self.table_indexes().to_vec(),
361 self.vector_indexes().to_vec(),
362 self.base.ctx().clone(),
363 self.predicate().clone(),
364 self.as_of(),
365 )
366 .into()
367 }
368
369 pub fn output_col_idx(&self) -> &Vec<usize> {
370 &self.core.output_col_idx
371 }
372
373 pub fn required_col_idx(&self) -> &Vec<usize> {
374 &self.core.required_col_idx
375 }
376}
377
378impl_plan_tree_node_for_leaf! { Logical, LogicalScan}
379
380impl Distill for LogicalScan {
381 fn distill<'a>(&self) -> XmlNode<'a> {
382 let verbose = self.base.ctx().is_explain_verbose();
383 let mut vec = Vec::with_capacity(5);
384 vec.push(("table", Pretty::from(self.table_name().to_owned())));
385 let key_is_columns =
386 self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
387 let key = if key_is_columns {
388 "columns"
389 } else {
390 "output_columns"
391 };
392 vec.push((key, self.core.columns_pretty(verbose)));
393 if !key_is_columns {
394 vec.push((
395 "required_columns",
396 Pretty::Array(
397 self.required_col_idx()
398 .iter()
399 .map(|i| {
400 let col_name = &self.table().columns[*i].name;
401 Pretty::from(if verbose {
402 format!("{}.{}", self.table_name(), col_name)
403 } else {
404 col_name.clone()
405 })
406 })
407 .collect(),
408 ),
409 ));
410 }
411
412 if !self.predicate().always_true() {
413 let input_schema = self.core.fields_pretty_schema();
414 vec.push((
415 "predicate",
416 Pretty::display(&ConditionDisplay {
417 condition: self.predicate(),
418 input_schema: &input_schema,
419 }),
420 ))
421 }
422
423 if self.table_cardinality() != Cardinality::unknown() {
424 vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
425 }
426
427 childless_record("LogicalScan", vec)
428 }
429}
430
431impl ColPrunable for LogicalScan {
432 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
433 let output_col_idx: Vec<usize> = required_cols
434 .iter()
435 .map(|i| self.required_col_idx()[*i])
436 .collect();
437 assert!(
438 output_col_idx
439 .iter()
440 .all(|i| self.output_col_idx().contains(i))
441 );
442
443 self.clone_with_output_indices(output_col_idx).into()
444 }
445}
446
447impl ExprRewritable<Logical> for LogicalScan {
448 fn has_rewritable_expr(&self) -> bool {
449 true
450 }
451
452 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
453 let mut core = self.core.clone();
454 core.rewrite_exprs(r);
455 Self {
456 base: self.base.clone_with_new_plan_id(),
457 core,
458 }
459 .into()
460 }
461}
462
463impl ExprVisitable for LogicalScan {
464 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
465 self.core.visit_exprs(v);
466 }
467}
468
469impl PredicatePushdown for LogicalScan {
470 fn predicate_pushdown(
471 &self,
472 mut predicate: Condition,
473 _ctx: &mut PredicatePushdownContext,
474 ) -> PlanRef {
475 struct HasCorrelated {
478 has: bool,
479 }
480 impl ExprVisitor for HasCorrelated {
481 fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
482 self.has = true;
483 }
484 }
485 let non_pushable_predicate: Vec<_> = predicate
486 .conjunctions
487 .extract_if(.., |expr| {
488 if expr.count_nows() > 0 {
489 true
490 } else {
491 let mut visitor = HasCorrelated { has: false };
492 visitor.visit_expr(expr);
493 visitor.has
494 }
495 })
496 .collect();
497 let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
498 self.output_col_idx().iter().map(|i| Some(*i)).collect(),
499 self.table().columns.len(),
500 ));
501 if non_pushable_predicate.is_empty() {
502 self.clone_with_predicate(predicate.and(self.predicate().clone()))
503 .into()
504 } else {
505 LogicalFilter::create(
506 self.clone_with_predicate(predicate.and(self.predicate().clone()))
507 .into(),
508 Condition {
509 conjunctions: non_pushable_predicate,
510 },
511 )
512 }
513 }
514}
515
516impl LogicalScan {
517 fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
518 if self.predicate().always_true() {
519 required_order
520 .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
521 } else {
522 let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
523 self.table(),
524 self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
525 )?;
526 let mut scan = self.clone();
527 scan.core.predicate = predicate; let plan: BatchPlanRef = if scan.core.predicate.always_false() {
530 LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
531 } else {
532 let (scan, predicate, project_expr) = scan.predicate_pull_up();
533
534 let mut plan: BatchPlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
535 if !predicate.always_true() {
536 plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
537 }
538 if let Some(exprs) = project_expr {
539 plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
540 }
541 plan
542 };
543
544 assert_eq!(plan.schema(), self.schema());
545 required_order.enforce_if_not_satisfies(plan)
546 }
547 }
548
549 fn use_index_scan_if_order_is_satisfied(
552 &self,
553 required_order: &Order,
554 ) -> Option<Result<BatchPlanRef>> {
555 if required_order.column_orders.is_empty() {
556 return None;
557 }
558
559 let order_satisfied_index = self.indexes_satisfy_order(required_order);
560 for index in order_satisfied_index {
561 if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
562 return Some(index_scan.to_batch());
563 }
564 }
565
566 None
567 }
568}
569
570impl ToBatch for LogicalScan {
571 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
572 self.to_batch_with_order_required(&Order::any())
573 }
574
575 fn to_batch_with_order_required(
576 &self,
577 required_order: &Order,
578 ) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
579 let new = self.clone_with_predicate(self.predicate().clone());
580
581 if !new.table_indexes().is_empty()
582 && self
583 .base
584 .ctx()
585 .session_ctx()
586 .config()
587 .enable_index_selection()
588 {
589 let index_selection_rule = IndexSelectionRule::create();
590 if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
591 if let Some(scan) = applied.as_logical_scan() {
592 return required_order.enforce_if_not_satisfies(scan.to_batch()?);
594 } else if let Some(join) = applied.as_logical_join() {
595 return required_order
597 .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
598 } else {
599 unreachable!();
600 }
601 } else {
602 if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
604 return plan_ref;
605 }
606 }
607 }
608 new.to_batch_inner_with_required(required_order)
609 }
610}
611
612impl ToStream for LogicalScan {
613 fn to_stream(
614 &self,
615 ctx: &mut ToStreamContext,
616 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
617 if self.predicate().always_true() {
618 if self.core.cross_database() && ctx.stream_scan_type() == StreamScanType::UpstreamOnly
619 {
620 return Err(ErrorCode::NotSupported(
621 "We currently do not support cross database scan in upstream only mode."
622 .to_owned(),
623 "Please ensure the source table is in the same database.".to_owned(),
624 )
625 .into());
626 }
627
628 Ok(StreamTableScan::new_with_stream_scan_type(
629 self.core.clone(),
630 ctx.stream_scan_type(),
631 )
632 .into())
633 } else {
634 let (scan, predicate, project_expr) = self.predicate_pull_up();
635 let mut plan = LogicalFilter::create(scan.into(), predicate);
636 if let Some(exprs) = project_expr {
637 plan = LogicalProject::create(plan, exprs)
638 }
639 plan.to_stream(ctx)
640 }
641 }
642
643 fn logical_rewrite_for_stream(
644 &self,
645 _ctx: &mut RewriteStreamContext,
646 ) -> Result<(PlanRef, ColIndexMapping)> {
647 match self.base.stream_key().is_none() {
648 true => {
649 let mut col_ids = HashSet::new();
650
651 for &idx in self.output_col_idx() {
652 col_ids.insert(self.table().columns[idx].column_id);
653 }
654 let col_need_to_add = self
655 .table()
656 .pk
657 .iter()
658 .filter_map(|c| {
659 if !col_ids.contains(&self.table().columns[c.column_index].column_id) {
660 Some(c.column_index)
661 } else {
662 None
663 }
664 })
665 .collect_vec();
666
667 let mut output_col_idx = self.output_col_idx().clone();
668 output_col_idx.extend(col_need_to_add);
669 let new_len = output_col_idx.len();
670 Ok((
671 self.clone_with_output_indices(output_col_idx).into(),
672 ColIndexMapping::identity_or_none(self.schema().len(), new_len),
673 ))
674 }
675 false => Ok((
676 self.clone().into(),
677 ColIndexMapping::identity(self.schema().len()),
678 )),
679 }
680 }
681
682 fn try_better_locality(&self, columns: &[usize]) -> Option<PlanRef> {
683 if !self
684 .core
685 .ctx()
686 .session_ctx()
687 .config()
688 .enable_index_selection()
689 {
690 return None;
691 }
692 if columns.is_empty() {
693 return None;
694 }
695 if self.table_indexes().is_empty() {
696 return None;
697 }
698 let orders = if columns.len() <= 3 {
699 OrderType::all()
700 } else {
701 vec![OrderType::ascending_nulls_last(), OrderType::descending()]
706 };
707 for order_type_combo in columns
708 .iter()
709 .map(|&col| orders.iter().map(move |ot| ColumnOrder::new(col, *ot)))
710 .multi_cartesian_product()
711 .take(256)
712 {
714 let required_order = Order {
715 column_orders: order_type_combo,
716 };
717
718 let order_satisfied_index = self.indexes_satisfy_order(&required_order);
719 for index in order_satisfied_index {
720 if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
721 return Some(index_scan.into());
722 }
723 }
724 }
725 None
726 }
727}