risingwave_frontend/optimizer/plan_node/
batch_log_seq_scan.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::ScanRange;
17use risingwave_pb::batch_plan::LogRowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch};
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs};
23use super::{
24 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToDistributedBatch, TryToBatchPb, generic,
25};
26use crate::catalog::ColumnId;
27use crate::error::Result;
28use crate::optimizer::plan_node::ToLocalBatch;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::property::{Distribution, DistributionDisplay};
31use crate::scheduler::SchedulerResult;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchLogSeqScan {
35 pub base: PlanBase<Batch>,
36 core: generic::LogScan,
37 scan_range: Option<ScanRange>,
38}
39
40impl BatchLogSeqScan {
41 fn new_inner(
42 core: generic::LogScan,
43 dist: Distribution,
44 scan_range: Option<ScanRange>,
45 ) -> Self {
46 let order = core.get_out_column_index_order();
47 let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order);
48
49 Self {
50 base,
51 core,
52 scan_range,
53 }
54 }
55
56 pub fn new(core: generic::LogScan, scan_range: Option<ScanRange>) -> Self {
57 Self::new_inner(core, Distribution::Single, scan_range)
59 }
60
61 fn clone_with_dist(&self) -> Self {
62 Self::new_inner(
63 self.core.clone(),
64 match self.core.distribution_key() {
65 None => Distribution::SomeShard,
66 Some(distribution_key) => {
67 if distribution_key.is_empty() {
68 Distribution::Single
69 } else {
70 Distribution::UpstreamHashShard(distribution_key, self.core.table.id)
71 }
72 }
73 },
74 self.scan_range.clone(),
75 )
76 }
77
78 #[must_use]
80 pub fn core(&self) -> &generic::LogScan {
81 &self.core
82 }
83}
84
85impl_plan_tree_node_for_leaf! { Batch, BatchLogSeqScan }
86
87impl Distill for BatchLogSeqScan {
88 fn distill<'a>(&self) -> XmlNode<'a> {
89 let verbose = self.base.ctx().is_explain_verbose();
90 let mut vec = Vec::with_capacity(3);
91 vec.push(("table", Pretty::from(self.core.table_name.clone())));
92 vec.push(("columns", self.core.columns_pretty(verbose)));
93
94 if verbose {
95 let dist = Pretty::display(&DistributionDisplay {
96 distribution: self.distribution(),
97 input_schema: self.base.schema(),
98 });
99 vec.push(("distribution", dist));
100 }
101 vec.push((
102 "epoch_range",
103 Pretty::from(format!("{:?}", self.core.epoch_range)),
104 ));
105 vec.push(("version_id", Pretty::from(self.core.version_id.to_string())));
106 if let Some(scan_range) = &self.scan_range {
107 let order_names = match verbose {
108 true => self.core.order_names_with_table_prefix(),
109 false => self.core.order_names(),
110 };
111 let range_strs = scan_ranges_as_strs(order_names, &vec![scan_range.clone()]);
112 vec.push((
113 "scan_range",
114 Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
115 ));
116 }
117
118 childless_record("BatchLogSeqScan", vec)
119 }
120}
121
122impl ToDistributedBatch for BatchLogSeqScan {
123 fn to_distributed(&self) -> Result<PlanRef> {
124 Ok(self.clone_with_dist().into())
125 }
126}
127
128impl TryToBatchPb for BatchLogSeqScan {
129 fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
130 Ok(NodeBody::LogRowSeqScan(LogRowSeqScanNode {
131 table_desc: Some(self.core.table.table_desc().try_to_protobuf()?),
132 column_ids: self
133 .core
134 .output_column_ids()
135 .iter()
136 .map(ColumnId::get_id)
137 .collect(),
138 vnode_bitmap: None,
139 old_epoch: Some(BatchQueryEpoch {
140 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
141 BatchQueryCommittedEpoch {
142 epoch: self.core.epoch_range.0,
143 hummock_version_id: 0,
144 },
145 )),
146 }),
147 new_epoch: Some(BatchQueryEpoch {
148 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
149 BatchQueryCommittedEpoch {
150 epoch: self.core.epoch_range.1,
151 hummock_version_id: 0,
152 },
153 )),
154 }),
155 ordered: !self.order().is_any(),
157 scan_range: self
158 .scan_range
159 .as_ref()
160 .map(|scan_range| scan_range.to_protobuf()),
161 }))
162 }
163}
164
165impl ToLocalBatch for BatchLogSeqScan {
166 fn to_local(&self) -> Result<PlanRef> {
167 let dist = if let Some(distribution_key) = self.core.distribution_key()
168 && !distribution_key.is_empty()
169 {
170 Distribution::UpstreamHashShard(distribution_key, self.core.table.id)
171 } else {
172 Distribution::SomeShard
173 };
174 Ok(Self::new_inner(self.core.clone(), dist, self.scan_range.clone()).into())
175 }
176}
177
178impl ExprRewritable<Batch> for BatchLogSeqScan {}
179
180impl ExprVisitable for BatchLogSeqScan {}