1use std::collections::{BTreeMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnDesc, TableDesc};
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_pb::stream_plan::StreamScanType;
24use risingwave_sqlparser::ast::AsOf;
25
26use super::generic::{GenericPlanNode, GenericPlanRef};
27use super::utils::{Distill, childless_record};
28use super::{
29 BatchFilter, BatchProject, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef,
30 PredicatePushdown, StreamTableScan, ToBatch, ToStream, generic,
31};
32use crate::TableCatalog;
33use crate::catalog::{ColumnId, IndexCatalog};
34use crate::error::Result;
35use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
36use crate::optimizer::ApplyResult;
37use crate::optimizer::optimizer_context::OptimizerContextRef;
38use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
39use crate::optimizer::plan_node::{
40 BatchSeqScan, ColumnPruningContext, LogicalFilter, LogicalProject, LogicalValues,
41 PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
42};
43use crate::optimizer::property::{Cardinality, Order, WatermarkColumns};
44use crate::optimizer::rule::IndexSelectionRule;
45use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct LogicalScan {
50 pub base: PlanBase<Logical>,
51 core: generic::TableScan,
52}
53
54impl From<generic::TableScan> for LogicalScan {
55 fn from(core: generic::TableScan) -> Self {
56 let base = PlanBase::new_logical_with_core(&core);
57 Self { base, core }
58 }
59}
60
61impl From<generic::TableScan> for PlanRef {
62 fn from(core: generic::TableScan) -> Self {
63 LogicalScan::from(core).into()
64 }
65}
66
67impl LogicalScan {
68 pub fn create(
70 table_name: String, table_catalog: Arc<TableCatalog>,
72 indexes: Vec<Rc<IndexCatalog>>,
73 ctx: OptimizerContextRef,
74 as_of: Option<AsOf>,
75 table_cardinality: Cardinality,
76 ) -> Self {
77 let output_col_idx: Vec<usize> = (0..table_catalog.columns().len()).collect();
78 generic::TableScan::new(
79 table_name,
80 output_col_idx,
81 table_catalog,
82 indexes,
83 ctx,
84 Condition::true_cond(),
85 as_of,
86 table_cardinality,
87 )
88 .into()
89 }
90
91 pub fn table_name(&self) -> &str {
92 &self.core.table_name
93 }
94
95 pub fn as_of(&self) -> Option<AsOf> {
96 self.core.as_of.clone()
97 }
98
99 pub fn table_cardinality(&self) -> Cardinality {
101 self.core.table_cardinality
102 }
103
104 pub fn table_desc(&self) -> &TableDesc {
107 self.core.table_desc.as_ref()
108 }
109
110 pub fn table_catalog(&self) -> Arc<TableCatalog> {
111 self.core.table_catalog.clone()
112 }
113
114 pub fn column_descs(&self) -> Vec<ColumnDesc> {
116 self.core.column_descs()
117 }
118
119 pub fn output_column_ids(&self) -> Vec<ColumnId> {
121 self.core.output_column_ids()
122 }
123
124 pub fn indexes(&self) -> &[Rc<IndexCatalog>] {
126 &self.core.indexes
127 }
128
129 pub fn predicate(&self) -> &Condition {
131 &self.core.predicate
132 }
133
134 pub fn get_out_column_index_order(&self) -> Order {
137 self.core.get_out_column_index_order()
138 }
139
140 pub fn distribution_key(&self) -> Option<Vec<usize>> {
141 self.core.distribution_key()
142 }
143
144 pub fn watermark_columns(&self) -> WatermarkColumns {
145 self.core.watermark_columns()
146 }
147
148 pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Rc<IndexCatalog>> {
150 self.indexes_satisfy_order_with_prefix(required_order, &HashSet::new())
151 .into_iter()
152 .map(|(index, _)| index)
153 .collect()
154 }
155
156 pub fn indexes_satisfy_order_with_prefix(
161 &self,
162 required_order: &Order,
163 prefix: &HashSet<ColumnOrder>,
164 ) -> Vec<(&Rc<IndexCatalog>, Order)> {
165 let output_col_map = self
166 .output_col_idx()
167 .iter()
168 .cloned()
169 .enumerate()
170 .map(|(id, col)| (col, id))
171 .collect::<BTreeMap<_, _>>();
172 let unmatched_idx = output_col_map.len();
173 let mut index_catalog_and_orders = vec![];
174 for index in self.indexes() {
175 let s2p_mapping = index.secondary_to_primary_mapping();
176 let index_orders: Vec<ColumnOrder> = index
177 .index_table
178 .pk()
179 .iter()
180 .map(|idx_item| {
181 let idx = match s2p_mapping.get(&idx_item.column_index) {
182 Some(col_idx) => *output_col_map.get(col_idx).unwrap_or(&unmatched_idx),
183 None => unmatched_idx,
185 };
186 ColumnOrder::new(idx, idx_item.order_type)
187 })
188 .collect();
189
190 let mut index_orders_iter = index_orders.into_iter().peekable();
191
192 let fixed_prefix = {
194 let mut fixed_prefix = vec![];
195 loop {
196 match index_orders_iter.peek() {
197 Some(index_col_order) if prefix.contains(index_col_order) => {
198 let index_col_order = index_orders_iter.next().unwrap();
199 fixed_prefix.push(index_col_order);
200 }
201 _ => break,
202 }
203 }
204 Order {
205 column_orders: fixed_prefix,
206 }
207 };
208
209 let remaining_orders = Order {
210 column_orders: index_orders_iter.collect(),
211 };
212 if remaining_orders.satisfies(required_order) {
213 index_catalog_and_orders.push((index, fixed_prefix));
214 }
215 }
216 index_catalog_and_orders
217 }
218
219 pub fn to_index_scan_if_index_covered(&self, index: &Rc<IndexCatalog>) -> Option<LogicalScan> {
221 let p2s_mapping = index.primary_to_secondary_mapping();
222 if self
223 .required_col_idx()
224 .iter()
225 .all(|x| p2s_mapping.contains_key(x))
226 {
227 let index_scan = self.core.to_index_scan(
228 &index.name,
229 index.index_table.clone(),
230 p2s_mapping,
231 index.function_mapping(),
232 );
233 Some(index_scan.into())
234 } else {
235 None
236 }
237 }
238
239 pub fn primary_key(&self) -> &[ColumnOrder] {
240 self.core.primary_key()
241 }
242
243 fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
245 self.output_col_idx()
246 .iter()
247 .enumerate()
248 .map(|(i, &col_idx)| {
249 InputRef::new(i, self.table_desc().columns[col_idx].data_type.clone()).into()
250 })
251 .collect_vec()
252 }
253
254 pub fn predicate_pull_up(&self) -> (generic::TableScan, Condition, Option<Vec<ExprImpl>>) {
256 let mut predicate = self.predicate().clone();
257 if predicate.always_true() {
258 return (self.core.clone(), Condition::true_cond(), None);
259 }
260
261 let mut inverse_mapping = {
262 let mapping = ColIndexMapping::new(
263 self.required_col_idx().iter().map(|i| Some(*i)).collect(),
264 self.table_desc().columns.len(),
265 );
266 let mut inverse_map = vec![None; mapping.target_size()];
268 for (src, dst) in mapping.mapping_pairs() {
269 inverse_map[dst] = Some(src);
270 }
271 ColIndexMapping::new(inverse_map, mapping.source_size())
272 };
273
274 predicate = predicate.rewrite_expr(&mut inverse_mapping);
275
276 let scan_without_predicate = generic::TableScan::new(
277 self.table_name().to_owned(),
278 self.required_col_idx().to_vec(),
279 self.core.table_catalog.clone(),
280 self.indexes().to_vec(),
281 self.ctx(),
282 Condition::true_cond(),
283 self.as_of(),
284 self.table_cardinality(),
285 );
286 let project_expr = if self.required_col_idx() != self.output_col_idx() {
287 Some(self.output_idx_to_input_ref())
288 } else {
289 None
290 };
291 (scan_without_predicate, predicate, project_expr)
292 }
293
294 fn clone_with_predicate(&self, predicate: Condition) -> Self {
295 generic::TableScan::new_inner(
296 self.table_name().to_owned(),
297 self.output_col_idx().to_vec(),
298 self.table_catalog(),
299 self.indexes().to_vec(),
300 self.base.ctx().clone(),
301 predicate,
302 self.as_of(),
303 self.table_cardinality(),
304 )
305 .into()
306 }
307
308 pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
309 generic::TableScan::new_inner(
310 self.table_name().to_owned(),
311 output_col_idx,
312 self.core.table_catalog.clone(),
313 self.indexes().to_vec(),
314 self.base.ctx().clone(),
315 self.predicate().clone(),
316 self.as_of(),
317 self.table_cardinality(),
318 )
319 .into()
320 }
321
322 pub fn output_col_idx(&self) -> &Vec<usize> {
323 &self.core.output_col_idx
324 }
325
326 pub fn required_col_idx(&self) -> &Vec<usize> {
327 &self.core.required_col_idx
328 }
329}
330
331impl_plan_tree_node_for_leaf! {LogicalScan}
332
333impl Distill for LogicalScan {
334 fn distill<'a>(&self) -> XmlNode<'a> {
335 let verbose = self.base.ctx().is_explain_verbose();
336 let mut vec = Vec::with_capacity(5);
337 vec.push(("table", Pretty::from(self.table_name().to_owned())));
338 let key_is_columns =
339 self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
340 let key = if key_is_columns {
341 "columns"
342 } else {
343 "output_columns"
344 };
345 vec.push((key, self.core.columns_pretty(verbose)));
346 if !key_is_columns {
347 vec.push((
348 "required_columns",
349 Pretty::Array(
350 self.required_col_idx()
351 .iter()
352 .map(|i| {
353 let col_name = &self.table_desc().columns[*i].name;
354 Pretty::from(if verbose {
355 format!("{}.{}", self.table_name(), col_name)
356 } else {
357 col_name.to_string()
358 })
359 })
360 .collect(),
361 ),
362 ));
363 }
364
365 if !self.predicate().always_true() {
366 let input_schema = self.core.fields_pretty_schema();
367 vec.push((
368 "predicate",
369 Pretty::display(&ConditionDisplay {
370 condition: self.predicate(),
371 input_schema: &input_schema,
372 }),
373 ))
374 }
375
376 if self.table_cardinality() != Cardinality::unknown() {
377 vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
378 }
379
380 childless_record("LogicalScan", vec)
381 }
382}
383
384impl ColPrunable for LogicalScan {
385 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
386 let output_col_idx: Vec<usize> = required_cols
387 .iter()
388 .map(|i| self.required_col_idx()[*i])
389 .collect();
390 assert!(
391 output_col_idx
392 .iter()
393 .all(|i| self.output_col_idx().contains(i))
394 );
395
396 self.clone_with_output_indices(output_col_idx).into()
397 }
398}
399
400impl ExprRewritable for LogicalScan {
401 fn has_rewritable_expr(&self) -> bool {
402 true
403 }
404
405 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
406 let mut core = self.core.clone();
407 core.rewrite_exprs(r);
408 Self {
409 base: self.base.clone_with_new_plan_id(),
410 core,
411 }
412 .into()
413 }
414}
415
416impl ExprVisitable for LogicalScan {
417 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
418 self.core.visit_exprs(v);
419 }
420}
421
422impl PredicatePushdown for LogicalScan {
423 fn predicate_pushdown(
424 &self,
425 mut predicate: Condition,
426 _ctx: &mut PredicatePushdownContext,
427 ) -> PlanRef {
428 struct HasCorrelated {
431 has: bool,
432 }
433 impl ExprVisitor for HasCorrelated {
434 fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
435 self.has = true;
436 }
437 }
438 let non_pushable_predicate: Vec<_> = predicate
439 .conjunctions
440 .extract_if(.., |expr| {
441 if expr.count_nows() > 0 {
442 true
443 } else {
444 let mut visitor = HasCorrelated { has: false };
445 visitor.visit_expr(expr);
446 visitor.has
447 }
448 })
449 .collect();
450 let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
451 self.output_col_idx().iter().map(|i| Some(*i)).collect(),
452 self.table_desc().columns.len(),
453 ));
454 if non_pushable_predicate.is_empty() {
455 self.clone_with_predicate(predicate.and(self.predicate().clone()))
456 .into()
457 } else {
458 LogicalFilter::create(
459 self.clone_with_predicate(predicate.and(self.predicate().clone()))
460 .into(),
461 Condition {
462 conjunctions: non_pushable_predicate,
463 },
464 )
465 }
466 }
467}
468
469impl LogicalScan {
470 fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<PlanRef> {
471 if self.predicate().always_true() {
472 required_order
473 .enforce_if_not_satisfies(BatchSeqScan::new(self.core.clone(), vec![], None).into())
474 } else {
475 let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
476 self.core.table_desc.clone(),
477 self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
478 )?;
479 let mut scan = self.clone();
480 scan.core.predicate = predicate; let plan: PlanRef = if scan.core.predicate.always_false() {
483 LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
484 } else {
485 let (scan, predicate, project_expr) = scan.predicate_pull_up();
486
487 let mut plan: PlanRef = BatchSeqScan::new(scan, scan_ranges, None).into();
488 if !predicate.always_true() {
489 plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
490 }
491 if let Some(exprs) = project_expr {
492 plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
493 }
494 plan
495 };
496
497 assert_eq!(plan.schema(), self.schema());
498 required_order.enforce_if_not_satisfies(plan)
499 }
500 }
501
502 fn use_index_scan_if_order_is_satisfied(
505 &self,
506 required_order: &Order,
507 ) -> Option<Result<PlanRef>> {
508 if required_order.column_orders.is_empty() {
509 return None;
510 }
511
512 let order_satisfied_index = self.indexes_satisfy_order(required_order);
513 for index in order_satisfied_index {
514 if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
515 return Some(index_scan.to_batch());
516 }
517 }
518
519 None
520 }
521}
522
523impl ToBatch for LogicalScan {
524 fn to_batch(&self) -> Result<PlanRef> {
525 self.to_batch_with_order_required(&Order::any())
526 }
527
528 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<PlanRef> {
529 let new = self.clone_with_predicate(self.predicate().clone());
530
531 if !new.indexes().is_empty() {
532 let index_selection_rule = IndexSelectionRule::create();
533 if let ApplyResult::Ok(applied) = index_selection_rule.apply(new.clone().into()) {
534 if let Some(scan) = applied.as_logical_scan() {
535 return required_order.enforce_if_not_satisfies(scan.to_batch()?);
537 } else if let Some(join) = applied.as_logical_join() {
538 return required_order
540 .enforce_if_not_satisfies(join.index_lookup_join_to_batch_lookup_join()?);
541 } else {
542 unreachable!();
543 }
544 } else {
545 if let Some(plan_ref) = new.use_index_scan_if_order_is_satisfied(required_order) {
547 return plan_ref;
548 }
549 }
550 }
551 new.to_batch_inner_with_required(required_order)
552 }
553}
554
555impl ToStream for LogicalScan {
556 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
557 if self.predicate().always_true() {
558 if self.core.table_catalog.database_id != self.base.ctx().session_ctx().database_id() {
560 Ok(StreamTableScan::new_with_stream_scan_type(
561 self.core.clone(),
562 StreamScanType::CrossDbSnapshotBackfill,
563 )
564 .into())
565 } else {
566 Ok(StreamTableScan::new_with_stream_scan_type(
567 self.core.clone(),
568 ctx.stream_scan_type(),
569 )
570 .into())
571 }
572 } else {
573 let (scan, predicate, project_expr) = self.predicate_pull_up();
574 let mut plan = LogicalFilter::create(scan.into(), predicate);
575 if let Some(exprs) = project_expr {
576 plan = LogicalProject::create(plan, exprs)
577 }
578 plan.to_stream(ctx)
579 }
580 }
581
582 fn logical_rewrite_for_stream(
583 &self,
584 _ctx: &mut RewriteStreamContext,
585 ) -> Result<(PlanRef, ColIndexMapping)> {
586 match self.base.stream_key().is_none() {
587 true => {
588 let mut col_ids = HashSet::new();
589
590 for &idx in self.output_col_idx() {
591 col_ids.insert(self.table_desc().columns[idx].column_id);
592 }
593 let col_need_to_add = self
594 .table_desc()
595 .pk
596 .iter()
597 .filter_map(|c| {
598 if !col_ids.contains(&self.table_desc().columns[c.column_index].column_id) {
599 Some(c.column_index)
600 } else {
601 None
602 }
603 })
604 .collect_vec();
605
606 let mut output_col_idx = self.output_col_idx().clone();
607 output_col_idx.extend(col_need_to_add);
608 let new_len = output_col_idx.len();
609 Ok((
610 self.clone_with_output_indices(output_col_idx).into(),
611 ColIndexMapping::identity_or_none(self.schema().len(), new_len),
612 ))
613 }
614 false => Ok((
615 self.clone().into(),
616 ColIndexMapping::identity(self.schema().len()),
617 )),
618 }
619 }
620}