risingwave_frontend/optimizer/plan_node/
batch_sys_seq_scan.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::{ScanRange, is_full_range};
17use risingwave_pb::batch_plan::SysRowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_pb::plan_common::PbColumnDesc;
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs};
23use super::{ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, generic};
24use crate::error::Result;
25use crate::expr::{ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchSysSeqScan {
33 pub base: PlanBase<Batch>,
34 core: generic::SysScan,
35 scan_ranges: Vec<ScanRange>,
36}
37
38impl BatchSysSeqScan {
39 fn new_inner(core: generic::SysScan, dist: Distribution, scan_ranges: Vec<ScanRange>) -> Self {
40 let order = if scan_ranges.len() > 1 {
41 Order::any()
42 } else {
43 core.get_out_column_index_order()
44 };
45 let base = PlanBase::new_batch_with_core(&core, dist, order);
46
47 {
48 scan_ranges.iter().for_each(|scan_range| {
50 assert!(!scan_range.is_full_table_scan());
51 let scan_pk_prefix_len = scan_range.eq_conds.len();
52 let order_len = core.table_desc.order_column_indices().len();
53 assert!(
54 scan_pk_prefix_len < order_len
55 || (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)),
56 "invalid scan_range",
57 );
58 })
59 }
60
61 Self {
62 base,
63 core,
64 scan_ranges,
65 }
66 }
67
68 pub fn new(core: generic::SysScan, scan_ranges: Vec<ScanRange>) -> Self {
69 Self::new_inner(core, Distribution::Single, scan_ranges)
71 }
72
73 fn clone_with_dist(&self) -> Self {
74 Self::new_inner(
75 self.core.clone(),
76 Distribution::Single,
77 self.scan_ranges.clone(),
78 )
79 }
80
81 #[must_use]
83 pub fn core(&self) -> &generic::SysScan {
84 &self.core
85 }
86
87 pub fn scan_ranges(&self) -> &[ScanRange] {
88 &self.scan_ranges
89 }
90}
91
92impl_plan_tree_node_for_leaf! { BatchSysSeqScan }
93
94impl Distill for BatchSysSeqScan {
95 fn distill<'a>(&self) -> XmlNode<'a> {
96 let verbose = self.base.ctx().is_explain_verbose();
97 let mut vec = Vec::with_capacity(4);
98 vec.push(("table", Pretty::from(self.core.table_name.clone())));
99 vec.push(("columns", self.core.columns_pretty(verbose)));
100
101 if !self.scan_ranges.is_empty() {
102 let order_names = match verbose {
103 true => self.core.order_names_with_table_prefix(),
104 false => self.core.order_names(),
105 };
106 let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges);
107 vec.push((
108 "scan_ranges",
109 Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
110 ));
111 }
112
113 if verbose {
114 let dist = Pretty::display(&DistributionDisplay {
115 distribution: self.distribution(),
116 input_schema: self.base.schema(),
117 });
118 vec.push(("distribution", dist));
119 }
120
121 childless_record("BatchScan", vec)
122 }
123}
124
125impl ToDistributedBatch for BatchSysSeqScan {
126 fn to_distributed(&self) -> Result<PlanRef> {
127 Ok(self.clone_with_dist().into())
128 }
129}
130
131impl ToBatchPb for BatchSysSeqScan {
132 fn to_batch_prost_body(&self) -> NodeBody {
133 let column_descs = self
134 .core
135 .column_descs()
136 .iter()
137 .map(PbColumnDesc::from)
138 .collect();
139 NodeBody::SysRowSeqScan(SysRowSeqScanNode {
140 table_id: self.core.table_desc.table_id.table_id,
141 column_descs,
142 })
143 }
144}
145
146impl ToLocalBatch for BatchSysSeqScan {
147 fn to_local(&self) -> Result<PlanRef> {
148 Ok(Self::new_inner(
149 self.core.clone(),
150 Distribution::Single,
151 self.scan_ranges.clone(),
152 )
153 .into())
154 }
155}
156
157impl ExprRewritable for BatchSysSeqScan {
158 fn has_rewritable_expr(&self) -> bool {
159 true
160 }
161
162 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
163 let mut core = self.core.clone();
164 core.rewrite_exprs(r);
165 Self::new(core, self.scan_ranges.clone()).into()
166 }
167}
168
169impl ExprVisitable for BatchSysSeqScan {
170 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
171 self.core.visit_exprs(v);
172 }
173}