risingwave_frontend/optimizer/plan_node/
batch_seq_scan.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::{ScanRange, is_full_range};
17use risingwave_pb::batch_plan::RowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_sqlparser::ast::AsOf;
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs, to_pb_time_travel_as_of};
23use super::{ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, generic};
24use crate::catalog::ColumnId;
25use crate::error::Result;
26use crate::expr::{ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{ToLocalBatch, TryToBatchPb};
29use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
30use crate::scheduler::SchedulerResult;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchSeqScan {
35 pub base: PlanBase<Batch>,
36 core: generic::TableScan,
37 scan_ranges: Vec<ScanRange>,
38 limit: Option<u64>,
39 as_of: Option<AsOf>,
40}
41
42impl BatchSeqScan {
43 fn new_inner(
44 core: generic::TableScan,
45 dist: Distribution,
46 scan_ranges: Vec<ScanRange>,
47 limit: Option<u64>,
48 ) -> Self {
49 let order = if scan_ranges.len() > 1 {
50 Order::any()
51 } else {
52 core.get_out_column_index_order()
53 };
54 let base = PlanBase::new_batch_with_core(&core, dist, order);
55
56 {
57 scan_ranges.iter().for_each(|scan_range| {
59 assert!(!scan_range.is_full_table_scan());
60 let scan_pk_prefix_len = scan_range.eq_conds.len();
61 let order_len = core.table_desc.order_column_indices().len();
62 assert!(
63 scan_pk_prefix_len < order_len
64 || (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)),
65 "invalid scan_range",
66 );
67 })
68 }
69 let as_of = core.as_of.clone();
70
71 Self {
72 base,
73 core,
74 scan_ranges,
75 limit,
76 as_of,
77 }
78 }
79
80 pub fn new(core: generic::TableScan, scan_ranges: Vec<ScanRange>, limit: Option<u64>) -> Self {
81 Self::new_inner(core, Distribution::Single, scan_ranges, limit)
83 }
84
85 pub fn new_with_dist(
86 core: generic::TableScan,
87 dist: Distribution,
88 scan_ranges: Vec<ScanRange>,
89 limit: Option<u64>,
90 ) -> Self {
91 Self::new_inner(core, dist, scan_ranges, limit)
92 }
93
94 fn clone_with_dist(&self) -> Self {
95 Self::new_inner(
96 self.core.clone(),
97 match self.core.distribution_key() {
98 None => Distribution::SomeShard,
99 Some(distribution_key) => {
100 if distribution_key.is_empty() {
101 Distribution::Single
102 } else {
103 Distribution::UpstreamHashShard(
113 distribution_key,
114 self.core.table_desc.table_id,
115 )
116 }
117 }
118 },
119 self.scan_ranges.clone(),
120 self.limit,
121 )
122 }
123
124 #[must_use]
126 pub fn core(&self) -> &generic::TableScan {
127 &self.core
128 }
129
130 pub fn scan_ranges(&self) -> &[ScanRange] {
131 &self.scan_ranges
132 }
133
134 pub fn limit(&self) -> &Option<u64> {
135 &self.limit
136 }
137}
138
139impl_plan_tree_node_for_leaf! { BatchSeqScan }
140
141impl Distill for BatchSeqScan {
142 fn distill<'a>(&self) -> XmlNode<'a> {
143 let verbose = self.base.ctx().is_explain_verbose();
144 let mut vec = Vec::with_capacity(4);
145 vec.push(("table", Pretty::from(self.core.table_name.clone())));
146 vec.push(("columns", self.core.columns_pretty(verbose)));
147
148 if !self.scan_ranges.is_empty() {
149 let order_names = match verbose {
150 true => self.core.order_names_with_table_prefix(),
151 false => self.core.order_names(),
152 };
153 let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges);
154 vec.push((
155 "scan_ranges",
156 Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
157 ));
158 }
159
160 if let Some(limit) = &self.limit {
161 vec.push(("limit", Pretty::display(limit)));
162 }
163
164 if verbose {
165 let dist = Pretty::display(&DistributionDisplay {
166 distribution: self.distribution(),
167 input_schema: self.base.schema(),
168 });
169 vec.push(("distribution", dist));
170 }
171
172 childless_record("BatchScan", vec)
173 }
174}
175
176impl ToDistributedBatch for BatchSeqScan {
177 fn to_distributed(&self) -> Result<PlanRef> {
178 Ok(self.clone_with_dist().into())
179 }
180}
181
182impl TryToBatchPb for BatchSeqScan {
183 fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
184 Ok(NodeBody::RowSeqScan(RowSeqScanNode {
185 table_desc: Some(self.core.table_desc.try_to_protobuf()?),
186 column_ids: self
187 .core
188 .output_column_ids()
189 .iter()
190 .map(ColumnId::get_id)
191 .collect(),
192 scan_ranges: self.scan_ranges.iter().map(|r| r.to_protobuf()).collect(),
193 vnode_bitmap: None,
195 ordered: !self.order().is_any(),
196 limit: *self.limit(),
197 as_of: to_pb_time_travel_as_of(&self.as_of)?,
198 }))
199 }
200}
201
202impl ToLocalBatch for BatchSeqScan {
203 fn to_local(&self) -> Result<PlanRef> {
204 let dist = if let Some(distribution_key) = self.core.distribution_key()
205 && !distribution_key.is_empty()
206 {
207 Distribution::UpstreamHashShard(distribution_key, self.core.table_desc.table_id)
208 } else {
209 Distribution::SomeShard
212 };
213 Ok(Self::new_inner(
214 self.core.clone(),
215 dist,
216 self.scan_ranges.clone(),
217 self.limit,
218 )
219 .into())
220 }
221}
222
223impl ExprRewritable for BatchSeqScan {
224 fn has_rewritable_expr(&self) -> bool {
225 true
226 }
227
228 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
229 let mut core = self.core.clone();
230 core.rewrite_exprs(r);
231 Self::new(core, self.scan_ranges.clone(), self.limit).into()
232 }
233}
234
235impl ExprVisitable for BatchSeqScan {
236 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
237 self.core.visit_exprs(v);
238 }
239}