risingwave_frontend/optimizer/plan_node/
batch_log_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::ScanRange;
17use risingwave_pb::batch_plan::LogRowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch};
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs};
23use super::{ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, TryToBatchPb, generic};
24use crate::catalog::ColumnId;
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Distribution, DistributionDisplay};
29use crate::scheduler::SchedulerResult;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchLogSeqScan {
33    pub base: PlanBase<Batch>,
34    core: generic::LogScan,
35    scan_range: Option<ScanRange>,
36}
37
38impl BatchLogSeqScan {
39    fn new_inner(
40        core: generic::LogScan,
41        dist: Distribution,
42        scan_range: Option<ScanRange>,
43    ) -> Self {
44        let order = core.get_out_column_index_order();
45        let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order);
46
47        Self {
48            base,
49            core,
50            scan_range,
51        }
52    }
53
54    pub fn new(core: generic::LogScan, scan_range: Option<ScanRange>) -> Self {
55        // Use `Single` by default, will be updated later with `clone_with_dist`.
56        Self::new_inner(core, Distribution::Single, scan_range)
57    }
58
59    fn clone_with_dist(&self) -> Self {
60        Self::new_inner(
61            self.core.clone(),
62            match self.core.distribution_key() {
63                None => Distribution::SomeShard,
64                Some(distribution_key) => {
65                    if distribution_key.is_empty() {
66                        Distribution::Single
67                    } else {
68                        Distribution::UpstreamHashShard(
69                            distribution_key,
70                            self.core.table_desc.table_id,
71                        )
72                    }
73                }
74            },
75            self.scan_range.clone(),
76        )
77    }
78
79    /// Get a reference to the batch seq scan's logical.
80    #[must_use]
81    pub fn core(&self) -> &generic::LogScan {
82        &self.core
83    }
84}
85
86impl_plan_tree_node_for_leaf! { BatchLogSeqScan }
87
88impl Distill for BatchLogSeqScan {
89    fn distill<'a>(&self) -> XmlNode<'a> {
90        let verbose = self.base.ctx().is_explain_verbose();
91        let mut vec = Vec::with_capacity(3);
92        vec.push(("table", Pretty::from(self.core.table_name.clone())));
93        vec.push(("columns", self.core.columns_pretty(verbose)));
94
95        if verbose {
96            let dist = Pretty::display(&DistributionDisplay {
97                distribution: self.distribution(),
98                input_schema: self.base.schema(),
99            });
100            vec.push(("distribution", dist));
101        }
102        vec.push((
103            "epoch_range",
104            Pretty::from(format!("{:?}", self.core.epoch_range)),
105        ));
106        vec.push(("version_id", Pretty::from(self.core.version_id.to_string())));
107        if let Some(scan_range) = &self.scan_range {
108            let order_names = match verbose {
109                true => self.core.order_names_with_table_prefix(),
110                false => self.core.order_names(),
111            };
112            let range_strs = scan_ranges_as_strs(order_names, &vec![scan_range.clone()]);
113            vec.push((
114                "scan_range",
115                Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
116            ));
117        }
118
119        childless_record("BatchLogSeqScan", vec)
120    }
121}
122
123impl ToDistributedBatch for BatchLogSeqScan {
124    fn to_distributed(&self) -> Result<PlanRef> {
125        Ok(self.clone_with_dist().into())
126    }
127}
128
129impl TryToBatchPb for BatchLogSeqScan {
130    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
131        Ok(NodeBody::LogRowSeqScan(LogRowSeqScanNode {
132            table_desc: Some(self.core.table_desc.try_to_protobuf()?),
133            column_ids: self
134                .core
135                .output_column_ids()
136                .iter()
137                .map(ColumnId::get_id)
138                .collect(),
139            vnode_bitmap: None,
140            old_epoch: Some(BatchQueryEpoch {
141                epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
142                    BatchQueryCommittedEpoch {
143                        epoch: self.core.epoch_range.0,
144                        hummock_version_id: 0,
145                    },
146                )),
147            }),
148            new_epoch: Some(BatchQueryEpoch {
149                epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
150                    BatchQueryCommittedEpoch {
151                        epoch: self.core.epoch_range.1,
152                        hummock_version_id: 0,
153                    },
154                )),
155            }),
156            // It's currently true.
157            ordered: !self.order().is_any(),
158            scan_range: self
159                .scan_range
160                .as_ref()
161                .map(|scan_range| scan_range.to_protobuf()),
162        }))
163    }
164}
165
166impl ToLocalBatch for BatchLogSeqScan {
167    fn to_local(&self) -> Result<PlanRef> {
168        let dist = if let Some(distribution_key) = self.core.distribution_key()
169            && !distribution_key.is_empty()
170        {
171            Distribution::UpstreamHashShard(distribution_key, self.core.table_desc.table_id)
172        } else {
173            Distribution::SomeShard
174        };
175        Ok(Self::new_inner(self.core.clone(), dist, self.scan_range.clone()).into())
176    }
177}
178
179impl ExprRewritable for BatchLogSeqScan {}
180
181impl ExprVisitable for BatchLogSeqScan {}