risingwave_frontend/optimizer/plan_node/
batch_seq_scan.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::{ScanRange, is_full_range};
17use risingwave_pb::batch_plan::RowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_sqlparser::ast::AsOf;
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs, to_batch_query_epoch};
23use super::{BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToDistributedBatch, generic};
24use crate::catalog::ColumnId;
25use crate::error::Result;
26use crate::expr::{ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{ToLocalBatch, TryToBatchPb};
29use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
30use crate::scheduler::SchedulerResult;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchSeqScan {
35 pub base: PlanBase<Batch>,
36 core: generic::TableScan,
37 scan_ranges: Vec<ScanRange>,
38 limit: Option<u64>,
39 as_of: Option<AsOf>,
40}
41
42impl BatchSeqScan {
43 fn new_inner(
44 core: generic::TableScan,
45 dist: Distribution,
46 scan_ranges: Vec<ScanRange>,
47 limit: Option<u64>,
48 ) -> Self {
49 let orders = if scan_ranges.len() > 1 {
50 vec![Order::any()]
51 } else {
52 let base_order = core.get_out_column_index_order();
53 if scan_ranges.len() == 1 && !base_order.is_any() {
54 let eq_prefix_len = scan_ranges[0].eq_conds.len();
55 if eq_prefix_len > 0 {
56 let mut orders = vec![base_order.clone()];
57 let max_trim = eq_prefix_len.min(base_order.column_orders.len());
58 for trim in 1..=max_trim {
59 orders.push(Order {
60 column_orders: base_order.column_orders[trim..].to_vec(),
61 });
62 }
63 orders
64 } else {
65 vec![base_order]
66 }
67 } else {
68 vec![base_order]
69 }
70 };
71 let base = PlanBase::new_batch_with_core_and_orders(&core, dist, orders);
72
73 {
74 scan_ranges.iter().for_each(|scan_range| {
76 assert!(!scan_range.is_full_table_scan());
77 let scan_pk_prefix_len = scan_range.eq_conds.len();
78 let order_len = core.table_catalog.pk.len();
79 assert!(
80 scan_pk_prefix_len < order_len
81 || (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)),
82 "invalid scan_range",
83 );
84 })
85 }
86 let as_of = core.as_of.clone();
87
88 Self {
89 base,
90 core,
91 scan_ranges,
92 limit,
93 as_of,
94 }
95 }
96
97 pub fn new(core: generic::TableScan, scan_ranges: Vec<ScanRange>, limit: Option<u64>) -> Self {
98 Self::new_inner(core, Distribution::Single, scan_ranges, limit)
100 }
101
102 pub fn new_with_dist(
103 core: generic::TableScan,
104 dist: Distribution,
105 scan_ranges: Vec<ScanRange>,
106 limit: Option<u64>,
107 ) -> Self {
108 Self::new_inner(core, dist, scan_ranges, limit)
109 }
110
111 fn clone_with_dist(&self) -> Self {
112 Self::new_inner(
113 self.core.clone(),
114 match self.core.distribution_key() {
115 None => Distribution::SomeShard,
116 Some(distribution_key) => {
117 if distribution_key.is_empty() {
118 Distribution::Single
119 } else {
120 Distribution::UpstreamHashShard(
130 distribution_key,
131 self.core.table_catalog.id,
132 )
133 }
134 }
135 },
136 self.scan_ranges.clone(),
137 self.limit,
138 )
139 }
140
141 #[must_use]
143 pub fn core(&self) -> &generic::TableScan {
144 &self.core
145 }
146
147 pub fn scan_ranges(&self) -> &[ScanRange] {
148 &self.scan_ranges
149 }
150
151 pub fn limit(&self) -> &Option<u64> {
152 &self.limit
153 }
154}
155
156impl_plan_tree_node_for_leaf! { Batch, BatchSeqScan }
157
158impl Distill for BatchSeqScan {
159 fn distill<'a>(&self) -> XmlNode<'a> {
160 let verbose = self.base.ctx().is_explain_verbose();
161 let mut vec = Vec::with_capacity(4);
162 vec.push(("table", Pretty::from(self.core.table_name().to_owned())));
163 vec.push(("columns", self.core.columns_pretty(verbose)));
164
165 if !self.scan_ranges.is_empty() {
166 let order_names = match verbose {
167 true => self.core.order_names_with_table_prefix(),
168 false => self.core.order_names(),
169 };
170 let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges);
171 vec.push((
172 "scan_ranges",
173 Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
174 ));
175 }
176
177 if let Some(limit) = &self.limit {
178 vec.push(("limit", Pretty::display(limit)));
179 }
180
181 if verbose {
182 let dist = Pretty::display(&DistributionDisplay {
183 distribution: self.distribution(),
184 input_schema: self.base.schema(),
185 });
186 vec.push(("distribution", dist));
187 }
188
189 childless_record("BatchScan", vec)
190 }
191}
192
193impl ToDistributedBatch for BatchSeqScan {
194 fn to_distributed(&self) -> Result<PlanRef> {
195 Ok(self.clone_with_dist().into())
196 }
197}
198
199impl TryToBatchPb for BatchSeqScan {
200 fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
201 Ok(NodeBody::RowSeqScan(RowSeqScanNode {
202 table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
203 column_ids: self
204 .core
205 .output_column_ids()
206 .iter()
207 .map(ColumnId::get_id)
208 .collect(),
209 scan_ranges: self.scan_ranges.iter().map(|r| r.to_protobuf()).collect(),
210 vnode_bitmap: None,
212 ordered: !self.order().is_any(),
213 limit: *self.limit(),
214 query_epoch: to_batch_query_epoch(&self.as_of)?,
215 }))
216 }
217}
218
219impl ToLocalBatch for BatchSeqScan {
220 fn to_local(&self) -> Result<PlanRef> {
221 let dist = if let Some(distribution_key) = self.core.distribution_key()
222 && !distribution_key.is_empty()
223 {
224 Distribution::UpstreamHashShard(distribution_key, self.core.table_catalog.id)
225 } else {
226 Distribution::SomeShard
229 };
230 Ok(Self::new_inner(
231 self.core.clone(),
232 dist,
233 self.scan_ranges.clone(),
234 self.limit,
235 )
236 .into())
237 }
238}
239
240impl ExprRewritable<Batch> for BatchSeqScan {
241 fn has_rewritable_expr(&self) -> bool {
242 true
243 }
244
245 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
246 let mut core = self.core.clone();
247 core.rewrite_exprs(r);
248 Self::new(core, self.scan_ranges.clone(), self.limit).into()
249 }
250}
251
252impl ExprVisitable for BatchSeqScan {
253 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
254 self.core.visit_exprs(v);
255 }
256}