risingwave_frontend/optimizer/plan_node/
batch_log_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::ScanRange;
17use risingwave_pb::batch_plan::LogRowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch};
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs};
23use super::{
24    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToDistributedBatch, TryToBatchPb, generic,
25};
26use crate::catalog::ColumnId;
27use crate::error::Result;
28use crate::optimizer::plan_node::ToLocalBatch;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::property::{Distribution, DistributionDisplay};
31use crate::scheduler::SchedulerResult;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchLogSeqScan {
35    pub base: PlanBase<Batch>,
36    core: generic::LogScan,
37    scan_range: Option<ScanRange>,
38}
39
40impl BatchLogSeqScan {
41    fn new_inner(
42        core: generic::LogScan,
43        dist: Distribution,
44        scan_range: Option<ScanRange>,
45    ) -> Self {
46        let order = core.get_out_column_index_order();
47        let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order);
48
49        Self {
50            base,
51            core,
52            scan_range,
53        }
54    }
55
56    pub fn new(core: generic::LogScan, scan_range: Option<ScanRange>) -> Self {
57        // Use `Single` by default, will be updated later with `clone_with_dist`.
58        Self::new_inner(core, Distribution::Single, scan_range)
59    }
60
61    fn clone_with_dist(&self) -> Self {
62        Self::new_inner(
63            self.core.clone(),
64            match self.core.distribution_key() {
65                None => Distribution::SomeShard,
66                Some(distribution_key) => {
67                    if distribution_key.is_empty() {
68                        Distribution::Single
69                    } else {
70                        Distribution::UpstreamHashShard(distribution_key, self.core.table.id)
71                    }
72                }
73            },
74            self.scan_range.clone(),
75        )
76    }
77
78    /// Get a reference to the batch seq scan's logical.
79    #[must_use]
80    pub fn core(&self) -> &generic::LogScan {
81        &self.core
82    }
83}
84
85impl_plan_tree_node_for_leaf! { Batch, BatchLogSeqScan }
86
87impl Distill for BatchLogSeqScan {
88    fn distill<'a>(&self) -> XmlNode<'a> {
89        let verbose = self.base.ctx().is_explain_verbose();
90        let mut vec = Vec::with_capacity(3);
91        vec.push(("table", Pretty::from(self.core.table_name.clone())));
92        vec.push(("columns", self.core.columns_pretty(verbose)));
93
94        if verbose {
95            let dist = Pretty::display(&DistributionDisplay {
96                distribution: self.distribution(),
97                input_schema: self.base.schema(),
98            });
99            vec.push(("distribution", dist));
100        }
101        vec.push((
102            "epoch_range",
103            Pretty::from(format!("{:?}", self.core.epoch_range)),
104        ));
105        vec.push(("version_id", Pretty::from(self.core.version_id.to_string())));
106        if let Some(scan_range) = &self.scan_range {
107            let order_names = match verbose {
108                true => self.core.order_names_with_table_prefix(),
109                false => self.core.order_names(),
110            };
111            let range_strs = scan_ranges_as_strs(order_names, &vec![scan_range.clone()]);
112            vec.push((
113                "scan_range",
114                Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
115            ));
116        }
117
118        childless_record("BatchLogSeqScan", vec)
119    }
120}
121
122impl ToDistributedBatch for BatchLogSeqScan {
123    fn to_distributed(&self) -> Result<PlanRef> {
124        Ok(self.clone_with_dist().into())
125    }
126}
127
128impl TryToBatchPb for BatchLogSeqScan {
129    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
130        Ok(NodeBody::LogRowSeqScan(LogRowSeqScanNode {
131            table_desc: Some(self.core.table.table_desc().try_to_protobuf()?),
132            column_ids: self
133                .core
134                .output_column_ids()
135                .iter()
136                .map(ColumnId::get_id)
137                .collect(),
138            vnode_bitmap: None,
139            old_epoch: Some(BatchQueryEpoch {
140                epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
141                    BatchQueryCommittedEpoch {
142                        epoch: self.core.epoch_range.0,
143                        hummock_version_id: 0,
144                    },
145                )),
146            }),
147            new_epoch: Some(BatchQueryEpoch {
148                epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
149                    BatchQueryCommittedEpoch {
150                        epoch: self.core.epoch_range.1,
151                        hummock_version_id: 0,
152                    },
153                )),
154            }),
155            // It's currently true.
156            ordered: !self.order().is_any(),
157            scan_range: self
158                .scan_range
159                .as_ref()
160                .map(|scan_range| scan_range.to_protobuf()),
161        }))
162    }
163}
164
165impl ToLocalBatch for BatchLogSeqScan {
166    fn to_local(&self) -> Result<PlanRef> {
167        let dist = if let Some(distribution_key) = self.core.distribution_key()
168            && !distribution_key.is_empty()
169        {
170            Distribution::UpstreamHashShard(distribution_key, self.core.table.id)
171        } else {
172            Distribution::SomeShard
173        };
174        Ok(Self::new_inner(self.core.clone(), dist, self.scan_range.clone()).into())
175    }
176}
177
178impl ExprRewritable<Batch> for BatchLogSeqScan {}
179
180impl ExprVisitable for BatchLogSeqScan {}