risingwave_frontend/optimizer/plan_node/
batch_sys_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::{ScanRange, is_full_range};
17use risingwave_pb::batch_plan::SysRowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_pb::plan_common::PbColumnDesc;
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs};
23use super::{ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, generic};
24use crate::error::Result;
25use crate::expr::{ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
29
30/// `BatchSysSeqScan` implements [`super::LogicalSysScan`] to scan from a row-oriented table
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchSysSeqScan {
33    pub base: PlanBase<Batch>,
34    core: generic::SysScan,
35    scan_ranges: Vec<ScanRange>,
36}
37
38impl BatchSysSeqScan {
39    fn new_inner(core: generic::SysScan, dist: Distribution, scan_ranges: Vec<ScanRange>) -> Self {
40        let order = if scan_ranges.len() > 1 {
41            Order::any()
42        } else {
43            core.get_out_column_index_order()
44        };
45        let base = PlanBase::new_batch_with_core(&core, dist, order);
46
47        {
48            // validate scan_range
49            scan_ranges.iter().for_each(|scan_range| {
50                assert!(!scan_range.is_full_table_scan());
51                let scan_pk_prefix_len = scan_range.eq_conds.len();
52                let order_len = core.table_desc.order_column_indices().len();
53                assert!(
54                    scan_pk_prefix_len < order_len
55                        || (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)),
56                    "invalid scan_range",
57                );
58            })
59        }
60
61        Self {
62            base,
63            core,
64            scan_ranges,
65        }
66    }
67
68    pub fn new(core: generic::SysScan, scan_ranges: Vec<ScanRange>) -> Self {
69        // Use `Single` by default, will be updated later with `clone_with_dist`.
70        Self::new_inner(core, Distribution::Single, scan_ranges)
71    }
72
73    fn clone_with_dist(&self) -> Self {
74        Self::new_inner(
75            self.core.clone(),
76            Distribution::Single,
77            self.scan_ranges.clone(),
78        )
79    }
80
81    /// Get a reference to the batch seq scan's logical.
82    #[must_use]
83    pub fn core(&self) -> &generic::SysScan {
84        &self.core
85    }
86
87    pub fn scan_ranges(&self) -> &[ScanRange] {
88        &self.scan_ranges
89    }
90}
91
92impl_plan_tree_node_for_leaf! { BatchSysSeqScan }
93
94impl Distill for BatchSysSeqScan {
95    fn distill<'a>(&self) -> XmlNode<'a> {
96        let verbose = self.base.ctx().is_explain_verbose();
97        let mut vec = Vec::with_capacity(4);
98        vec.push(("table", Pretty::from(self.core.table_name.clone())));
99        vec.push(("columns", self.core.columns_pretty(verbose)));
100
101        if !self.scan_ranges.is_empty() {
102            let order_names = match verbose {
103                true => self.core.order_names_with_table_prefix(),
104                false => self.core.order_names(),
105            };
106            let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges);
107            vec.push((
108                "scan_ranges",
109                Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
110            ));
111        }
112
113        if verbose {
114            let dist = Pretty::display(&DistributionDisplay {
115                distribution: self.distribution(),
116                input_schema: self.base.schema(),
117            });
118            vec.push(("distribution", dist));
119        }
120
121        childless_record("BatchScan", vec)
122    }
123}
124
125impl ToDistributedBatch for BatchSysSeqScan {
126    fn to_distributed(&self) -> Result<PlanRef> {
127        Ok(self.clone_with_dist().into())
128    }
129}
130
131impl ToBatchPb for BatchSysSeqScan {
132    fn to_batch_prost_body(&self) -> NodeBody {
133        let column_descs = self
134            .core
135            .column_descs()
136            .iter()
137            .map(PbColumnDesc::from)
138            .collect();
139        NodeBody::SysRowSeqScan(SysRowSeqScanNode {
140            table_id: self.core.table_desc.table_id.table_id,
141            column_descs,
142        })
143    }
144}
145
146impl ToLocalBatch for BatchSysSeqScan {
147    fn to_local(&self) -> Result<PlanRef> {
148        Ok(Self::new_inner(
149            self.core.clone(),
150            Distribution::Single,
151            self.scan_ranges.clone(),
152        )
153        .into())
154    }
155}
156
157impl ExprRewritable for BatchSysSeqScan {
158    fn has_rewritable_expr(&self) -> bool {
159        true
160    }
161
162    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
163        let mut core = self.core.clone();
164        core.rewrite_exprs(r);
165        Self::new(core, self.scan_ranges.clone()).into()
166    }
167}
168
169impl ExprVisitable for BatchSysSeqScan {
170    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
171        self.core.visit_exprs(v);
172    }
173}