risingwave_frontend/optimizer/plan_node/
batch_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::{ScanRange, is_full_range};
17use risingwave_pb::batch_plan::RowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_sqlparser::ast::AsOf;
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs, to_pb_time_travel_as_of};
23use super::{ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, generic};
24use crate::catalog::ColumnId;
25use crate::error::Result;
26use crate::expr::{ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{ToLocalBatch, TryToBatchPb};
29use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
30use crate::scheduler::SchedulerResult;
31
32/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchSeqScan {
35    pub base: PlanBase<Batch>,
36    core: generic::TableScan,
37    scan_ranges: Vec<ScanRange>,
38    limit: Option<u64>,
39    as_of: Option<AsOf>,
40}
41
42impl BatchSeqScan {
43    fn new_inner(
44        core: generic::TableScan,
45        dist: Distribution,
46        scan_ranges: Vec<ScanRange>,
47        limit: Option<u64>,
48    ) -> Self {
49        let order = if scan_ranges.len() > 1 {
50            Order::any()
51        } else {
52            core.get_out_column_index_order()
53        };
54        let base = PlanBase::new_batch_with_core(&core, dist, order);
55
56        {
57            // validate scan_range
58            scan_ranges.iter().for_each(|scan_range| {
59                assert!(!scan_range.is_full_table_scan());
60                let scan_pk_prefix_len = scan_range.eq_conds.len();
61                let order_len = core.table_desc.order_column_indices().len();
62                assert!(
63                    scan_pk_prefix_len < order_len
64                        || (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)),
65                    "invalid scan_range",
66                );
67            })
68        }
69        let as_of = core.as_of.clone();
70
71        Self {
72            base,
73            core,
74            scan_ranges,
75            limit,
76            as_of,
77        }
78    }
79
80    pub fn new(core: generic::TableScan, scan_ranges: Vec<ScanRange>, limit: Option<u64>) -> Self {
81        // Use `Single` by default, will be updated later with `clone_with_dist`.
82        Self::new_inner(core, Distribution::Single, scan_ranges, limit)
83    }
84
85    pub fn new_with_dist(
86        core: generic::TableScan,
87        dist: Distribution,
88        scan_ranges: Vec<ScanRange>,
89        limit: Option<u64>,
90    ) -> Self {
91        Self::new_inner(core, dist, scan_ranges, limit)
92    }
93
94    fn clone_with_dist(&self) -> Self {
95        Self::new_inner(
96            self.core.clone(),
97            match self.core.distribution_key() {
98                None => Distribution::SomeShard,
99                Some(distribution_key) => {
100                    if distribution_key.is_empty() {
101                        Distribution::Single
102                    } else {
103                        // For other batch operators, `HashShard` is a simple hashing, i.e.,
104                        // `target_shard = hash(dist_key) % shard_num`
105                        //
106                        // But MV is actually sharded by consistent hashing, i.e.,
107                        // `target_shard = vnode_mapping.map(hash(dist_key) % vnode_num)`
108                        //
109                        // They are incompatible, so we just specify its distribution as
110                        // `SomeShard` to force an exchange is
111                        // inserted.
112                        Distribution::UpstreamHashShard(
113                            distribution_key,
114                            self.core.table_desc.table_id,
115                        )
116                    }
117                }
118            },
119            self.scan_ranges.clone(),
120            self.limit,
121        )
122    }
123
124    /// Get a reference to the batch seq scan's logical.
125    #[must_use]
126    pub fn core(&self) -> &generic::TableScan {
127        &self.core
128    }
129
130    pub fn scan_ranges(&self) -> &[ScanRange] {
131        &self.scan_ranges
132    }
133
134    pub fn limit(&self) -> &Option<u64> {
135        &self.limit
136    }
137}
138
139impl_plan_tree_node_for_leaf! { BatchSeqScan }
140
141impl Distill for BatchSeqScan {
142    fn distill<'a>(&self) -> XmlNode<'a> {
143        let verbose = self.base.ctx().is_explain_verbose();
144        let mut vec = Vec::with_capacity(4);
145        vec.push(("table", Pretty::from(self.core.table_name.clone())));
146        vec.push(("columns", self.core.columns_pretty(verbose)));
147
148        if !self.scan_ranges.is_empty() {
149            let order_names = match verbose {
150                true => self.core.order_names_with_table_prefix(),
151                false => self.core.order_names(),
152            };
153            let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges);
154            vec.push((
155                "scan_ranges",
156                Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
157            ));
158        }
159
160        if let Some(limit) = &self.limit {
161            vec.push(("limit", Pretty::display(limit)));
162        }
163
164        if verbose {
165            let dist = Pretty::display(&DistributionDisplay {
166                distribution: self.distribution(),
167                input_schema: self.base.schema(),
168            });
169            vec.push(("distribution", dist));
170        }
171
172        childless_record("BatchScan", vec)
173    }
174}
175
176impl ToDistributedBatch for BatchSeqScan {
177    fn to_distributed(&self) -> Result<PlanRef> {
178        Ok(self.clone_with_dist().into())
179    }
180}
181
182impl TryToBatchPb for BatchSeqScan {
183    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
184        Ok(NodeBody::RowSeqScan(RowSeqScanNode {
185            table_desc: Some(self.core.table_desc.try_to_protobuf()?),
186            column_ids: self
187                .core
188                .output_column_ids()
189                .iter()
190                .map(ColumnId::get_id)
191                .collect(),
192            scan_ranges: self.scan_ranges.iter().map(|r| r.to_protobuf()).collect(),
193            // To be filled by the scheduler.
194            vnode_bitmap: None,
195            ordered: !self.order().is_any(),
196            limit: *self.limit(),
197            as_of: to_pb_time_travel_as_of(&self.as_of)?,
198        }))
199    }
200}
201
202impl ToLocalBatch for BatchSeqScan {
203    fn to_local(&self) -> Result<PlanRef> {
204        let dist = if let Some(distribution_key) = self.core.distribution_key()
205            && !distribution_key.is_empty()
206        {
207            Distribution::UpstreamHashShard(distribution_key, self.core.table_desc.table_id)
208        } else {
209            // NOTE(kwannoel): This is a hack to force an exchange to always be inserted before
210            // scan.
211            Distribution::SomeShard
212        };
213        Ok(Self::new_inner(
214            self.core.clone(),
215            dist,
216            self.scan_ranges.clone(),
217            self.limit,
218        )
219        .into())
220    }
221}
222
223impl ExprRewritable for BatchSeqScan {
224    fn has_rewritable_expr(&self) -> bool {
225        true
226    }
227
228    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
229        let mut core = self.core.clone();
230        core.rewrite_exprs(r);
231        Self::new(core, self.scan_ranges.clone(), self.limit).into()
232    }
233}
234
235impl ExprVisitable for BatchSeqScan {
236    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
237        self.core.visit_exprs(v);
238    }
239}