risingwave_frontend/optimizer/plan_node/
logical_sys_scan.rs1use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::bail_not_implemented;
20use risingwave_common::catalog::{ColumnDesc, TableDesc};
21
22use super::generic::{GenericPlanNode, GenericPlanRef};
23use super::utils::{Distill, childless_record};
24use super::{
25 BatchFilter, BatchProject, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef,
26 PredicatePushdown, ToBatch, ToStream, generic,
27};
28use crate::error::Result;
29use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
30use crate::optimizer::optimizer_context::OptimizerContextRef;
31use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
32use crate::optimizer::plan_node::{
33 BatchSysSeqScan, ColumnPruningContext, LogicalFilter, LogicalValues, PredicatePushdownContext,
34 RewriteStreamContext, ToStreamContext,
35};
36use crate::optimizer::property::{Cardinality, Order};
37use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalSysScan {
42 pub base: PlanBase<Logical>,
43 core: generic::SysScan,
44}
45
46impl From<generic::SysScan> for LogicalSysScan {
47 fn from(core: generic::SysScan) -> Self {
48 let base = PlanBase::new_logical_with_core(&core);
49 Self { base, core }
50 }
51}
52
53impl From<generic::SysScan> for PlanRef {
54 fn from(core: generic::SysScan) -> Self {
55 LogicalSysScan::from(core).into()
56 }
57}
58
59impl LogicalSysScan {
60 pub fn create(
62 table_name: String, table_desc: Rc<TableDesc>,
64 ctx: OptimizerContextRef,
65 table_cardinality: Cardinality,
66 ) -> Self {
67 generic::SysScan::new(
68 table_name,
69 (0..table_desc.columns.len()).collect(),
70 table_desc,
71 ctx,
72 Condition::true_cond(),
73 table_cardinality,
74 )
75 .into()
76 }
77
78 pub fn table_name(&self) -> &str {
79 &self.core.table_name
80 }
81
82 pub fn table_cardinality(&self) -> Cardinality {
84 self.core.table_cardinality
85 }
86
87 pub fn table_desc(&self) -> &TableDesc {
89 self.core.table_desc.as_ref()
90 }
91
92 pub fn column_descs(&self) -> Vec<ColumnDesc> {
94 self.core.column_descs()
95 }
96
97 pub fn predicate(&self) -> &Condition {
99 &self.core.predicate
100 }
101
102 fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
104 self.output_col_idx()
105 .iter()
106 .enumerate()
107 .map(|(i, &col_idx)| {
108 InputRef::new(i, self.table_desc().columns[col_idx].data_type.clone()).into()
109 })
110 .collect_vec()
111 }
112
113 pub fn predicate_pull_up(&self) -> (generic::SysScan, Condition, Option<Vec<ExprImpl>>) {
115 let mut predicate = self.predicate().clone();
116 if predicate.always_true() {
117 return (self.core.clone(), Condition::true_cond(), None);
118 }
119
120 let mut inverse_mapping = {
121 let mapping = ColIndexMapping::new(
122 self.required_col_idx().iter().map(|i| Some(*i)).collect(),
123 self.table_desc().columns.len(),
124 );
125 let mut inverse_map = vec![None; mapping.target_size()];
127 for (src, dst) in mapping.mapping_pairs() {
128 inverse_map[dst] = Some(src);
129 }
130 ColIndexMapping::new(inverse_map, mapping.source_size())
131 };
132
133 predicate = predicate.rewrite_expr(&mut inverse_mapping);
134
135 let scan_without_predicate = generic::SysScan::new(
136 self.table_name().to_owned(),
137 self.required_col_idx().to_vec(),
138 self.core.table_desc.clone(),
139 self.ctx(),
140 Condition::true_cond(),
141 self.table_cardinality(),
142 );
143 let project_expr = if self.required_col_idx() != self.output_col_idx() {
144 Some(self.output_idx_to_input_ref())
145 } else {
146 None
147 };
148 (scan_without_predicate, predicate, project_expr)
149 }
150
151 fn clone_with_predicate(&self, predicate: Condition) -> Self {
152 generic::SysScan::new_inner(
153 self.table_name().to_owned(),
154 self.output_col_idx().to_vec(),
155 self.core.table_desc.clone(),
156 self.base.ctx().clone(),
157 predicate,
158 self.table_cardinality(),
159 )
160 .into()
161 }
162
163 pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
164 generic::SysScan::new_inner(
165 self.table_name().to_owned(),
166 output_col_idx,
167 self.core.table_desc.clone(),
168 self.base.ctx().clone(),
169 self.predicate().clone(),
170 self.table_cardinality(),
171 )
172 .into()
173 }
174
175 pub fn output_col_idx(&self) -> &Vec<usize> {
176 &self.core.output_col_idx
177 }
178
179 pub fn required_col_idx(&self) -> &Vec<usize> {
180 &self.core.required_col_idx
181 }
182}
183
184impl_plan_tree_node_for_leaf! {LogicalSysScan}
185
186impl Distill for LogicalSysScan {
187 fn distill<'a>(&self) -> XmlNode<'a> {
188 let verbose = self.base.ctx().is_explain_verbose();
189 let mut vec = Vec::with_capacity(5);
190 vec.push(("table", Pretty::from(self.table_name().to_owned())));
191 let key_is_columns =
192 self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
193 let key = if key_is_columns {
194 "columns"
195 } else {
196 "output_columns"
197 };
198 vec.push((key, self.core.columns_pretty(verbose)));
199 if !key_is_columns {
200 vec.push((
201 "required_columns",
202 Pretty::Array(
203 self.required_col_idx()
204 .iter()
205 .map(|i| {
206 let col_name = &self.table_desc().columns[*i].name;
207 Pretty::from(if verbose {
208 format!("{}.{}", self.table_name(), col_name)
209 } else {
210 col_name.to_string()
211 })
212 })
213 .collect(),
214 ),
215 ));
216 }
217
218 if !self.predicate().always_true() {
219 let input_schema = self.core.fields_pretty_schema();
220 vec.push((
221 "predicate",
222 Pretty::display(&ConditionDisplay {
223 condition: self.predicate(),
224 input_schema: &input_schema,
225 }),
226 ))
227 }
228
229 if self.table_cardinality() != Cardinality::unknown() {
230 vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
231 }
232
233 childless_record("LogicalSysScan", vec)
234 }
235}
236
237impl ColPrunable for LogicalSysScan {
238 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
239 let output_col_idx: Vec<usize> = required_cols
240 .iter()
241 .map(|i| self.required_col_idx()[*i])
242 .collect();
243 assert!(
244 output_col_idx
245 .iter()
246 .all(|i| self.output_col_idx().contains(i))
247 );
248
249 self.clone_with_output_indices(output_col_idx).into()
250 }
251}
252
253impl ExprRewritable for LogicalSysScan {
254 fn has_rewritable_expr(&self) -> bool {
255 true
256 }
257
258 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
259 let mut core = self.core.clone();
260 core.rewrite_exprs(r);
261 Self {
262 base: self.base.clone_with_new_plan_id(),
263 core,
264 }
265 .into()
266 }
267}
268
269impl ExprVisitable for LogicalSysScan {
270 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
271 self.core.visit_exprs(v);
272 }
273}
274
275impl PredicatePushdown for LogicalSysScan {
276 fn predicate_pushdown(
278 &self,
279 mut predicate: Condition,
280 _ctx: &mut PredicatePushdownContext,
281 ) -> PlanRef {
282 struct HasCorrelated {
285 has: bool,
286 }
287 impl ExprVisitor for HasCorrelated {
288 fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
289 self.has = true;
290 }
291 }
292 let non_pushable_predicate: Vec<_> = predicate
293 .conjunctions
294 .extract_if(.., |expr| {
295 if expr.count_nows() > 0 {
296 true
297 } else {
298 let mut visitor = HasCorrelated { has: false };
299 visitor.visit_expr(expr);
300 visitor.has
301 }
302 })
303 .collect();
304 let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
305 self.output_col_idx().iter().map(|i| Some(*i)).collect(),
306 self.table_desc().columns.len(),
307 ));
308 if non_pushable_predicate.is_empty() {
309 self.clone_with_predicate(predicate.and(self.predicate().clone()))
310 .into()
311 } else {
312 LogicalFilter::create(
313 self.clone_with_predicate(predicate.and(self.predicate().clone()))
314 .into(),
315 Condition {
316 conjunctions: non_pushable_predicate,
317 },
318 )
319 }
320 }
321}
322
323impl LogicalSysScan {
324 fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<PlanRef> {
326 if self.predicate().always_true() {
327 required_order
328 .enforce_if_not_satisfies(BatchSysSeqScan::new(self.core.clone(), vec![]).into())
329 } else {
330 let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
331 self.core.table_desc.clone(),
332 self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
333 )?;
334 let mut scan = self.clone();
335 scan.core.predicate = predicate; let plan: PlanRef = if scan.core.predicate.always_false() {
338 LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
339 } else {
340 let (scan, predicate, project_expr) = scan.predicate_pull_up();
341
342 let mut plan: PlanRef = BatchSysSeqScan::new(scan, scan_ranges).into();
343 if !predicate.always_true() {
344 plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
345 }
346 if let Some(exprs) = project_expr {
347 plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
348 }
349 plan
350 };
351
352 assert_eq!(plan.schema(), self.schema());
353 required_order.enforce_if_not_satisfies(plan)
354 }
355 }
356}
357
358impl ToBatch for LogicalSysScan {
359 fn to_batch(&self) -> Result<PlanRef> {
360 self.to_batch_with_order_required(&Order::any())
361 }
362
363 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<PlanRef> {
364 let new = self.clone_with_predicate(self.predicate().clone());
365 new.to_batch_inner_with_required(required_order)
366 }
367}
368
369impl ToStream for LogicalSysScan {
370 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
371 bail_not_implemented!("streaming on system table");
372 }
373
374 fn logical_rewrite_for_stream(
375 &self,
376 _ctx: &mut RewriteStreamContext,
377 ) -> Result<(PlanRef, ColIndexMapping)> {
378 bail_not_implemented!("streaming on system table");
379 }
380}