risingwave_frontend/optimizer/plan_node/
batch_seq_scan.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::{ScanRange, is_full_range};
17use risingwave_pb::batch_plan::RowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_sqlparser::ast::AsOf;
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs, to_batch_query_epoch};
23use super::{BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToDistributedBatch, generic};
24use crate::catalog::ColumnId;
25use crate::error::Result;
26use crate::expr::{ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{ToLocalBatch, TryToBatchPb};
29use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
30use crate::scheduler::SchedulerResult;
31
32/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchSeqScan {
35    pub base: PlanBase<Batch>,
36    core: generic::TableScan,
37    scan_ranges: Vec<ScanRange>,
38    limit: Option<u64>,
39    as_of: Option<AsOf>,
40}
41
42impl BatchSeqScan {
43    fn new_inner(
44        core: generic::TableScan,
45        dist: Distribution,
46        scan_ranges: Vec<ScanRange>,
47        limit: Option<u64>,
48    ) -> Self {
49        let orders = if scan_ranges.len() > 1 {
50            vec![Order::any()]
51        } else {
52            let base_order = core.get_out_column_index_order();
53            if scan_ranges.len() == 1 && !base_order.is_any() {
54                let eq_prefix_len = scan_ranges[0].eq_conds.len();
55                if eq_prefix_len > 0 {
56                    let mut orders = vec![base_order.clone()];
57                    let max_trim = eq_prefix_len.min(base_order.column_orders.len());
58                    for trim in 1..=max_trim {
59                        orders.push(Order {
60                            column_orders: base_order.column_orders[trim..].to_vec(),
61                        });
62                    }
63                    orders
64                } else {
65                    vec![base_order]
66                }
67            } else {
68                vec![base_order]
69            }
70        };
71        let base = PlanBase::new_batch_with_core_and_orders(&core, dist, orders);
72
73        {
74            // validate scan_range
75            scan_ranges.iter().for_each(|scan_range| {
76                assert!(!scan_range.is_full_table_scan());
77                let scan_pk_prefix_len = scan_range.eq_conds.len();
78                let order_len = core.table_catalog.pk.len();
79                assert!(
80                    scan_pk_prefix_len < order_len
81                        || (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)),
82                    "invalid scan_range",
83                );
84            })
85        }
86        let as_of = core.as_of.clone();
87
88        Self {
89            base,
90            core,
91            scan_ranges,
92            limit,
93            as_of,
94        }
95    }
96
97    pub fn new(core: generic::TableScan, scan_ranges: Vec<ScanRange>, limit: Option<u64>) -> Self {
98        // Use `Single` by default, will be updated later with `clone_with_dist`.
99        Self::new_inner(core, Distribution::Single, scan_ranges, limit)
100    }
101
102    pub fn new_with_dist(
103        core: generic::TableScan,
104        dist: Distribution,
105        scan_ranges: Vec<ScanRange>,
106        limit: Option<u64>,
107    ) -> Self {
108        Self::new_inner(core, dist, scan_ranges, limit)
109    }
110
111    fn clone_with_dist(&self) -> Self {
112        Self::new_inner(
113            self.core.clone(),
114            match self.core.distribution_key() {
115                None => Distribution::SomeShard,
116                Some(distribution_key) => {
117                    if distribution_key.is_empty() {
118                        Distribution::Single
119                    } else {
120                        // For other batch operators, `HashShard` is a simple hashing, i.e.,
121                        // `target_shard = hash(dist_key) % shard_num`
122                        //
123                        // But MV is actually sharded by consistent hashing, i.e.,
124                        // `target_shard = vnode_mapping.map(hash(dist_key) % vnode_num)`
125                        //
126                        // They are incompatible, so we just specify its distribution as
127                        // `SomeShard` to force an exchange is
128                        // inserted.
129                        Distribution::UpstreamHashShard(
130                            distribution_key,
131                            self.core.table_catalog.id,
132                        )
133                    }
134                }
135            },
136            self.scan_ranges.clone(),
137            self.limit,
138        )
139    }
140
141    /// Get a reference to the batch seq scan's logical.
142    #[must_use]
143    pub fn core(&self) -> &generic::TableScan {
144        &self.core
145    }
146
147    pub fn scan_ranges(&self) -> &[ScanRange] {
148        &self.scan_ranges
149    }
150
151    pub fn limit(&self) -> &Option<u64> {
152        &self.limit
153    }
154}
155
156impl_plan_tree_node_for_leaf! { Batch, BatchSeqScan }
157
158impl Distill for BatchSeqScan {
159    fn distill<'a>(&self) -> XmlNode<'a> {
160        let verbose = self.base.ctx().is_explain_verbose();
161        let mut vec = Vec::with_capacity(4);
162        vec.push(("table", Pretty::from(self.core.table_name().to_owned())));
163        vec.push(("columns", self.core.columns_pretty(verbose)));
164
165        if !self.scan_ranges.is_empty() {
166            let order_names = match verbose {
167                true => self.core.order_names_with_table_prefix(),
168                false => self.core.order_names(),
169            };
170            let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges);
171            vec.push((
172                "scan_ranges",
173                Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
174            ));
175        }
176
177        if let Some(limit) = &self.limit {
178            vec.push(("limit", Pretty::display(limit)));
179        }
180
181        if verbose {
182            let dist = Pretty::display(&DistributionDisplay {
183                distribution: self.distribution(),
184                input_schema: self.base.schema(),
185            });
186            vec.push(("distribution", dist));
187        }
188
189        childless_record("BatchScan", vec)
190    }
191}
192
193impl ToDistributedBatch for BatchSeqScan {
194    fn to_distributed(&self) -> Result<PlanRef> {
195        Ok(self.clone_with_dist().into())
196    }
197}
198
199impl TryToBatchPb for BatchSeqScan {
200    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
201        Ok(NodeBody::RowSeqScan(RowSeqScanNode {
202            table_desc: Some(self.core.table_catalog.table_desc().try_to_protobuf()?),
203            column_ids: self
204                .core
205                .output_column_ids()
206                .iter()
207                .map(ColumnId::get_id)
208                .collect(),
209            scan_ranges: self.scan_ranges.iter().map(|r| r.to_protobuf()).collect(),
210            // To be filled by the scheduler.
211            vnode_bitmap: None,
212            ordered: !self.order().is_any(),
213            limit: *self.limit(),
214            query_epoch: to_batch_query_epoch(&self.as_of)?,
215        }))
216    }
217}
218
219impl ToLocalBatch for BatchSeqScan {
220    fn to_local(&self) -> Result<PlanRef> {
221        let dist = if let Some(distribution_key) = self.core.distribution_key()
222            && !distribution_key.is_empty()
223        {
224            Distribution::UpstreamHashShard(distribution_key, self.core.table_catalog.id)
225        } else {
226            // NOTE(kwannoel): This is a hack to force an exchange to always be inserted before
227            // scan.
228            Distribution::SomeShard
229        };
230        Ok(Self::new_inner(
231            self.core.clone(),
232            dist,
233            self.scan_ranges.clone(),
234            self.limit,
235        )
236        .into())
237    }
238}
239
240impl ExprRewritable<Batch> for BatchSeqScan {
241    fn has_rewritable_expr(&self) -> bool {
242        true
243    }
244
245    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
246        let mut core = self.core.clone();
247        core.rewrite_exprs(r);
248        Self::new(core, self.scan_ranges.clone(), self.limit).into()
249    }
250}
251
252impl ExprVisitable for BatchSeqScan {
253    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
254        self.core.visit_exprs(v);
255    }
256}