risingwave_frontend/optimizer/plan_node/
logical_sys_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::bail_not_implemented;
20use risingwave_common::catalog::{ColumnDesc, TableDesc};
21
22use super::generic::{GenericPlanNode, GenericPlanRef};
23use super::utils::{Distill, childless_record};
24use super::{
25    BatchFilter, BatchProject, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef,
26    PredicatePushdown, ToBatch, ToStream, generic,
27};
28use crate::error::Result;
29use crate::expr::{CorrelatedInputRef, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
30use crate::optimizer::optimizer_context::OptimizerContextRef;
31use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
32use crate::optimizer::plan_node::{
33    BatchSysSeqScan, ColumnPruningContext, LogicalFilter, LogicalValues, PredicatePushdownContext,
34    RewriteStreamContext, ToStreamContext,
35};
36use crate::optimizer::property::{Cardinality, Order};
37use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
38
39/// `LogicalSysScan` returns contents of a table or other equivalent object
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalSysScan {
42    pub base: PlanBase<Logical>,
43    core: generic::SysScan,
44}
45
46impl From<generic::SysScan> for LogicalSysScan {
47    fn from(core: generic::SysScan) -> Self {
48        let base = PlanBase::new_logical_with_core(&core);
49        Self { base, core }
50    }
51}
52
53impl From<generic::SysScan> for PlanRef {
54    fn from(core: generic::SysScan) -> Self {
55        LogicalSysScan::from(core).into()
56    }
57}
58
59impl LogicalSysScan {
60    /// Create a [`LogicalSysScan`] node. Used by planner.
61    pub fn create(
62        table_name: String, // explain-only
63        table_desc: Rc<TableDesc>,
64        ctx: OptimizerContextRef,
65        table_cardinality: Cardinality,
66    ) -> Self {
67        generic::SysScan::new(
68            table_name,
69            (0..table_desc.columns.len()).collect(),
70            table_desc,
71            ctx,
72            Condition::true_cond(),
73            table_cardinality,
74        )
75        .into()
76    }
77
78    pub fn table_name(&self) -> &str {
79        &self.core.table_name
80    }
81
82    /// The cardinality of the table **without** applying the predicate.
83    pub fn table_cardinality(&self) -> Cardinality {
84        self.core.table_cardinality
85    }
86
87    /// Get a reference to the logical scan's table desc.
88    pub fn table_desc(&self) -> &TableDesc {
89        self.core.table_desc.as_ref()
90    }
91
92    /// Get the descs of the output columns.
93    pub fn column_descs(&self) -> Vec<ColumnDesc> {
94        self.core.column_descs()
95    }
96
97    /// Get the logical scan's filter predicate
98    pub fn predicate(&self) -> &Condition {
99        &self.core.predicate
100    }
101
102    /// a vec of `InputRef` corresponding to `output_col_idx`, which can represent a pulled project.
103    fn output_idx_to_input_ref(&self) -> Vec<ExprImpl> {
104        self.output_col_idx()
105            .iter()
106            .enumerate()
107            .map(|(i, &col_idx)| {
108                InputRef::new(i, self.table_desc().columns[col_idx].data_type.clone()).into()
109            })
110            .collect_vec()
111    }
112
113    /// Undo predicate push down when predicate in scan is not supported.
114    pub fn predicate_pull_up(&self) -> (generic::SysScan, Condition, Option<Vec<ExprImpl>>) {
115        let mut predicate = self.predicate().clone();
116        if predicate.always_true() {
117            return (self.core.clone(), Condition::true_cond(), None);
118        }
119
120        let mut inverse_mapping = {
121            let mapping = ColIndexMapping::new(
122                self.required_col_idx().iter().map(|i| Some(*i)).collect(),
123                self.table_desc().columns.len(),
124            );
125            // Since `required_col_idx` mapping is not invertible, we need to inverse manually.
126            let mut inverse_map = vec![None; mapping.target_size()];
127            for (src, dst) in mapping.mapping_pairs() {
128                inverse_map[dst] = Some(src);
129            }
130            ColIndexMapping::new(inverse_map, mapping.source_size())
131        };
132
133        predicate = predicate.rewrite_expr(&mut inverse_mapping);
134
135        let scan_without_predicate = generic::SysScan::new(
136            self.table_name().to_owned(),
137            self.required_col_idx().to_vec(),
138            self.core.table_desc.clone(),
139            self.ctx(),
140            Condition::true_cond(),
141            self.table_cardinality(),
142        );
143        let project_expr = if self.required_col_idx() != self.output_col_idx() {
144            Some(self.output_idx_to_input_ref())
145        } else {
146            None
147        };
148        (scan_without_predicate, predicate, project_expr)
149    }
150
151    fn clone_with_predicate(&self, predicate: Condition) -> Self {
152        generic::SysScan::new_inner(
153            self.table_name().to_owned(),
154            self.output_col_idx().to_vec(),
155            self.core.table_desc.clone(),
156            self.base.ctx().clone(),
157            predicate,
158            self.table_cardinality(),
159        )
160        .into()
161    }
162
163    pub fn clone_with_output_indices(&self, output_col_idx: Vec<usize>) -> Self {
164        generic::SysScan::new_inner(
165            self.table_name().to_owned(),
166            output_col_idx,
167            self.core.table_desc.clone(),
168            self.base.ctx().clone(),
169            self.predicate().clone(),
170            self.table_cardinality(),
171        )
172        .into()
173    }
174
175    pub fn output_col_idx(&self) -> &Vec<usize> {
176        &self.core.output_col_idx
177    }
178
179    pub fn required_col_idx(&self) -> &Vec<usize> {
180        &self.core.required_col_idx
181    }
182}
183
184impl_plan_tree_node_for_leaf! {LogicalSysScan}
185
186impl Distill for LogicalSysScan {
187    fn distill<'a>(&self) -> XmlNode<'a> {
188        let verbose = self.base.ctx().is_explain_verbose();
189        let mut vec = Vec::with_capacity(5);
190        vec.push(("table", Pretty::from(self.table_name().to_owned())));
191        let key_is_columns =
192            self.predicate().always_true() || self.output_col_idx() == self.required_col_idx();
193        let key = if key_is_columns {
194            "columns"
195        } else {
196            "output_columns"
197        };
198        vec.push((key, self.core.columns_pretty(verbose)));
199        if !key_is_columns {
200            vec.push((
201                "required_columns",
202                Pretty::Array(
203                    self.required_col_idx()
204                        .iter()
205                        .map(|i| {
206                            let col_name = &self.table_desc().columns[*i].name;
207                            Pretty::from(if verbose {
208                                format!("{}.{}", self.table_name(), col_name)
209                            } else {
210                                col_name.to_string()
211                            })
212                        })
213                        .collect(),
214                ),
215            ));
216        }
217
218        if !self.predicate().always_true() {
219            let input_schema = self.core.fields_pretty_schema();
220            vec.push((
221                "predicate",
222                Pretty::display(&ConditionDisplay {
223                    condition: self.predicate(),
224                    input_schema: &input_schema,
225                }),
226            ))
227        }
228
229        if self.table_cardinality() != Cardinality::unknown() {
230            vec.push(("cardinality", Pretty::display(&self.table_cardinality())));
231        }
232
233        childless_record("LogicalSysScan", vec)
234    }
235}
236
237impl ColPrunable for LogicalSysScan {
238    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
239        let output_col_idx: Vec<usize> = required_cols
240            .iter()
241            .map(|i| self.required_col_idx()[*i])
242            .collect();
243        assert!(
244            output_col_idx
245                .iter()
246                .all(|i| self.output_col_idx().contains(i))
247        );
248
249        self.clone_with_output_indices(output_col_idx).into()
250    }
251}
252
253impl ExprRewritable for LogicalSysScan {
254    fn has_rewritable_expr(&self) -> bool {
255        true
256    }
257
258    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
259        let mut core = self.core.clone();
260        core.rewrite_exprs(r);
261        Self {
262            base: self.base.clone_with_new_plan_id(),
263            core,
264        }
265        .into()
266    }
267}
268
269impl ExprVisitable for LogicalSysScan {
270    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
271        self.core.visit_exprs(v);
272    }
273}
274
275impl PredicatePushdown for LogicalSysScan {
276    // TODO(kwannoel): Unify this with logical_scan.
277    fn predicate_pushdown(
278        &self,
279        mut predicate: Condition,
280        _ctx: &mut PredicatePushdownContext,
281    ) -> PlanRef {
282        // If the predicate contains `CorrelatedInputRef` or `now()`. We don't push down.
283        // This case could come from the predicate push down before the subquery unnesting.
284        struct HasCorrelated {
285            has: bool,
286        }
287        impl ExprVisitor for HasCorrelated {
288            fn visit_correlated_input_ref(&mut self, _: &CorrelatedInputRef) {
289                self.has = true;
290            }
291        }
292        let non_pushable_predicate: Vec<_> = predicate
293            .conjunctions
294            .extract_if(.., |expr| {
295                if expr.count_nows() > 0 {
296                    true
297                } else {
298                    let mut visitor = HasCorrelated { has: false };
299                    visitor.visit_expr(expr);
300                    visitor.has
301                }
302            })
303            .collect();
304        let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
305            self.output_col_idx().iter().map(|i| Some(*i)).collect(),
306            self.table_desc().columns.len(),
307        ));
308        if non_pushable_predicate.is_empty() {
309            self.clone_with_predicate(predicate.and(self.predicate().clone()))
310                .into()
311        } else {
312            LogicalFilter::create(
313                self.clone_with_predicate(predicate.and(self.predicate().clone()))
314                    .into(),
315                Condition {
316                    conjunctions: non_pushable_predicate,
317                },
318            )
319        }
320    }
321}
322
323impl LogicalSysScan {
324    // TODO(kwannoel): Unify this with logical_scan.
325    fn to_batch_inner_with_required(&self, required_order: &Order) -> Result<PlanRef> {
326        if self.predicate().always_true() {
327            required_order
328                .enforce_if_not_satisfies(BatchSysSeqScan::new(self.core.clone(), vec![]).into())
329        } else {
330            let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
331                self.core.table_desc.clone(),
332                self.base.ctx().session_ctx().config().max_split_range_gap() as u64,
333            )?;
334            let mut scan = self.clone();
335            scan.core.predicate = predicate; // We want to keep `required_col_idx` unchanged, so do not call `clone_with_predicate`.
336
337            let plan: PlanRef = if scan.core.predicate.always_false() {
338                LogicalValues::create(vec![], scan.core.schema(), scan.core.ctx).to_batch()?
339            } else {
340                let (scan, predicate, project_expr) = scan.predicate_pull_up();
341
342                let mut plan: PlanRef = BatchSysSeqScan::new(scan, scan_ranges).into();
343                if !predicate.always_true() {
344                    plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into();
345                }
346                if let Some(exprs) = project_expr {
347                    plan = BatchProject::new(generic::Project::new(exprs, plan)).into()
348                }
349                plan
350            };
351
352            assert_eq!(plan.schema(), self.schema());
353            required_order.enforce_if_not_satisfies(plan)
354        }
355    }
356}
357
358impl ToBatch for LogicalSysScan {
359    fn to_batch(&self) -> Result<PlanRef> {
360        self.to_batch_with_order_required(&Order::any())
361    }
362
363    fn to_batch_with_order_required(&self, required_order: &Order) -> Result<PlanRef> {
364        let new = self.clone_with_predicate(self.predicate().clone());
365        new.to_batch_inner_with_required(required_order)
366    }
367}
368
369impl ToStream for LogicalSysScan {
370    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
371        bail_not_implemented!("streaming on system table");
372    }
373
374    fn logical_rewrite_for_stream(
375        &self,
376        _ctx: &mut RewriteStreamContext,
377    ) -> Result<(PlanRef, ColIndexMapping)> {
378        bail_not_implemented!("streaming on system table");
379    }
380}