risingwave_frontend/optimizer/plan_node/
batch_log_seq_scan.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::util::scan_range::ScanRange;
17use risingwave_pb::batch_plan::LogRowSeqScanNode;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch};
20
21use super::batch::prelude::*;
22use super::utils::{Distill, childless_record, scan_ranges_as_strs};
23use super::{ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, TryToBatchPb, generic};
24use crate::catalog::ColumnId;
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Distribution, DistributionDisplay};
29use crate::scheduler::SchedulerResult;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchLogSeqScan {
33 pub base: PlanBase<Batch>,
34 core: generic::LogScan,
35 scan_range: Option<ScanRange>,
36}
37
38impl BatchLogSeqScan {
39 fn new_inner(
40 core: generic::LogScan,
41 dist: Distribution,
42 scan_range: Option<ScanRange>,
43 ) -> Self {
44 let order = core.get_out_column_index_order();
45 let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order);
46
47 Self {
48 base,
49 core,
50 scan_range,
51 }
52 }
53
54 pub fn new(core: generic::LogScan, scan_range: Option<ScanRange>) -> Self {
55 Self::new_inner(core, Distribution::Single, scan_range)
57 }
58
59 fn clone_with_dist(&self) -> Self {
60 Self::new_inner(
61 self.core.clone(),
62 match self.core.distribution_key() {
63 None => Distribution::SomeShard,
64 Some(distribution_key) => {
65 if distribution_key.is_empty() {
66 Distribution::Single
67 } else {
68 Distribution::UpstreamHashShard(
69 distribution_key,
70 self.core.table_desc.table_id,
71 )
72 }
73 }
74 },
75 self.scan_range.clone(),
76 )
77 }
78
79 #[must_use]
81 pub fn core(&self) -> &generic::LogScan {
82 &self.core
83 }
84}
85
86impl_plan_tree_node_for_leaf! { BatchLogSeqScan }
87
88impl Distill for BatchLogSeqScan {
89 fn distill<'a>(&self) -> XmlNode<'a> {
90 let verbose = self.base.ctx().is_explain_verbose();
91 let mut vec = Vec::with_capacity(3);
92 vec.push(("table", Pretty::from(self.core.table_name.clone())));
93 vec.push(("columns", self.core.columns_pretty(verbose)));
94
95 if verbose {
96 let dist = Pretty::display(&DistributionDisplay {
97 distribution: self.distribution(),
98 input_schema: self.base.schema(),
99 });
100 vec.push(("distribution", dist));
101 }
102 vec.push((
103 "epoch_range",
104 Pretty::from(format!("{:?}", self.core.epoch_range)),
105 ));
106 vec.push(("version_id", Pretty::from(self.core.version_id.to_string())));
107 if let Some(scan_range) = &self.scan_range {
108 let order_names = match verbose {
109 true => self.core.order_names_with_table_prefix(),
110 false => self.core.order_names(),
111 };
112 let range_strs = scan_ranges_as_strs(order_names, &vec![scan_range.clone()]);
113 vec.push((
114 "scan_range",
115 Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
116 ));
117 }
118
119 childless_record("BatchLogSeqScan", vec)
120 }
121}
122
123impl ToDistributedBatch for BatchLogSeqScan {
124 fn to_distributed(&self) -> Result<PlanRef> {
125 Ok(self.clone_with_dist().into())
126 }
127}
128
129impl TryToBatchPb for BatchLogSeqScan {
130 fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
131 Ok(NodeBody::LogRowSeqScan(LogRowSeqScanNode {
132 table_desc: Some(self.core.table_desc.try_to_protobuf()?),
133 column_ids: self
134 .core
135 .output_column_ids()
136 .iter()
137 .map(ColumnId::get_id)
138 .collect(),
139 vnode_bitmap: None,
140 old_epoch: Some(BatchQueryEpoch {
141 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
142 BatchQueryCommittedEpoch {
143 epoch: self.core.epoch_range.0,
144 hummock_version_id: 0,
145 },
146 )),
147 }),
148 new_epoch: Some(BatchQueryEpoch {
149 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
150 BatchQueryCommittedEpoch {
151 epoch: self.core.epoch_range.1,
152 hummock_version_id: 0,
153 },
154 )),
155 }),
156 ordered: !self.order().is_any(),
158 scan_range: self
159 .scan_range
160 .as_ref()
161 .map(|scan_range| scan_range.to_protobuf()),
162 }))
163 }
164}
165
166impl ToLocalBatch for BatchLogSeqScan {
167 fn to_local(&self) -> Result<PlanRef> {
168 let dist = if let Some(distribution_key) = self.core.distribution_key()
169 && !distribution_key.is_empty()
170 {
171 Distribution::UpstreamHashShard(distribution_key, self.core.table_desc.table_id)
172 } else {
173 Distribution::SomeShard
174 };
175 Ok(Self::new_inner(self.core.clone(), dist, self.scan_range.clone()).into())
176 }
177}
178
179impl ExprRewritable for BatchLogSeqScan {}
180
181impl ExprVisitable for BatchLogSeqScan {}