risingwave_frontend/optimizer/plan_node/
batch_log_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::batch_plan::LogRowSeqScanNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch};
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, TryToBatchPb, generic};
23use crate::catalog::ColumnId;
24use crate::error::Result;
25use crate::optimizer::plan_node::ToLocalBatch;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
28use crate::scheduler::SchedulerResult;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchLogSeqScan {
32    pub base: PlanBase<Batch>,
33    core: generic::LogScan,
34}
35
36impl BatchLogSeqScan {
37    fn new_inner(core: generic::LogScan, dist: Distribution) -> Self {
38        let order = Order::new(core.table_desc.pk.clone());
39        let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order);
40
41        Self { base, core }
42    }
43
44    pub fn new(core: generic::LogScan) -> Self {
45        // Use `Single` by default, will be updated later with `clone_with_dist`.
46        Self::new_inner(core, Distribution::Single)
47    }
48
49    fn clone_with_dist(&self) -> Self {
50        Self::new_inner(
51            self.core.clone(),
52            match self.core.distribution_key() {
53                None => Distribution::SomeShard,
54                Some(distribution_key) => {
55                    if distribution_key.is_empty() {
56                        Distribution::Single
57                    } else {
58                        Distribution::UpstreamHashShard(
59                            distribution_key,
60                            self.core.table_desc.table_id,
61                        )
62                    }
63                }
64            },
65        )
66    }
67
68    /// Get a reference to the batch seq scan's logical.
69    #[must_use]
70    pub fn core(&self) -> &generic::LogScan {
71        &self.core
72    }
73}
74
75impl_plan_tree_node_for_leaf! { BatchLogSeqScan }
76
77impl Distill for BatchLogSeqScan {
78    fn distill<'a>(&self) -> XmlNode<'a> {
79        let verbose = self.base.ctx().is_explain_verbose();
80        let mut vec = Vec::with_capacity(3);
81        vec.push(("table", Pretty::from(self.core.table_name.clone())));
82        vec.push(("columns", self.core.columns_pretty(verbose)));
83
84        if verbose {
85            let dist = Pretty::display(&DistributionDisplay {
86                distribution: self.distribution(),
87                input_schema: self.base.schema(),
88            });
89            vec.push(("distribution", dist));
90        }
91        vec.push(("old_epoch", Pretty::from(self.core.old_epoch.to_string())));
92        vec.push(("new_epoch", Pretty::from(self.core.new_epoch.to_string())));
93        vec.push(("version_id", Pretty::from(self.core.version_id.to_string())));
94
95        childless_record("BatchLogSeqScan", vec)
96    }
97}
98
99impl ToDistributedBatch for BatchLogSeqScan {
100    fn to_distributed(&self) -> Result<PlanRef> {
101        Ok(self.clone_with_dist().into())
102    }
103}
104
105impl TryToBatchPb for BatchLogSeqScan {
106    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
107        Ok(NodeBody::LogRowSeqScan(LogRowSeqScanNode {
108            table_desc: Some(self.core.table_desc.try_to_protobuf()?),
109            column_ids: self
110                .core
111                .output_column_ids()
112                .iter()
113                .map(ColumnId::get_id)
114                .collect(),
115            vnode_bitmap: None,
116            old_epoch: Some(BatchQueryEpoch {
117                epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
118                    BatchQueryCommittedEpoch {
119                        epoch: self.core.old_epoch,
120                        hummock_version_id: 0,
121                    },
122                )),
123            }),
124            new_epoch: Some(BatchQueryEpoch {
125                epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
126                    BatchQueryCommittedEpoch {
127                        epoch: self.core.new_epoch,
128                        hummock_version_id: 0,
129                    },
130                )),
131            }),
132            // It's currently true.
133            ordered: !self.order().is_any(),
134        }))
135    }
136}
137
138impl ToLocalBatch for BatchLogSeqScan {
139    fn to_local(&self) -> Result<PlanRef> {
140        let dist = if let Some(distribution_key) = self.core.distribution_key()
141            && !distribution_key.is_empty()
142        {
143            Distribution::UpstreamHashShard(distribution_key, self.core.table_desc.table_id)
144        } else {
145            Distribution::SomeShard
146        };
147        Ok(Self::new_inner(self.core.clone(), dist).into())
148    }
149}
150
151impl ExprRewritable for BatchLogSeqScan {}
152
153impl ExprVisitable for BatchLogSeqScan {}