risingwave_batch_executors/executor/
log_row_seq_scan.rs1use std::ops::Deref;
16use std::sync::Arc;
17
18use futures::prelude::stream::StreamExt;
19use futures_async_stream::try_stream;
20use futures_util::pin_mut;
21use prometheus::Histogram;
22use risingwave_common::array::{DataChunk, Op};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema};
25use risingwave_common::hash::VnodeCountCompat;
26use risingwave_common::row::{Row, RowExt};
27use risingwave_common::types::ScalarImpl;
28use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
31use risingwave_pb::plan_common::StorageTableDesc;
32use risingwave_storage::table::batch_table::BatchTable;
33use risingwave_storage::table::collect_data_chunk;
34use risingwave_storage::{StateStore, dispatch_state_store};
35
36use super::{
37 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange,
38};
39use crate::build_scan_range_from_pb;
40use crate::error::{BatchError, Result};
41use crate::monitor::BatchMetrics;
42
43pub struct LogRowSeqScanExecutor<S: StateStore> {
44 chunk_size: usize,
45 identity: String,
46 schema: Schema,
48
49 metrics: Option<BatchMetrics>,
52
53 table: BatchTable<S>,
54 old_epoch: u64,
55 new_epoch: u64,
56 version_id: HummockVersionId,
57 ordered: bool,
58 scan_range: ScanRange,
59}
60
61impl<S: StateStore> LogRowSeqScanExecutor<S> {
62 pub fn new(
63 table: BatchTable<S>,
64 old_epoch: u64,
65 new_epoch: u64,
66 version_id: HummockVersionId,
67 chunk_size: usize,
68 identity: String,
69 metrics: Option<BatchMetrics>,
70 ordered: bool,
71 scan_range: ScanRange,
72 ) -> Self {
73 let mut schema = table.schema().clone();
74 schema.fields.push(Field::with_name(
75 risingwave_common::types::DataType::Varchar,
76 "op",
77 ));
78 Self {
79 chunk_size,
80 identity,
81 schema,
82 metrics,
83 table,
84 old_epoch,
85 new_epoch,
86 version_id,
87 ordered,
88 scan_range,
89 }
90 }
91}
92
93pub struct LogStoreRowSeqScanExecutorBuilder {}
94
95impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
96 async fn new_boxed_executor(
97 source: &ExecutorBuilder<'_>,
98 inputs: Vec<BoxedExecutor>,
99 ) -> Result<BoxedExecutor> {
100 ensure!(
101 inputs.is_empty(),
102 "LogStore row sequential scan should not have input executor!"
103 );
104 let log_store_seq_scan_node = try_match_expand!(
105 source.plan_node().get_node_body().unwrap(),
106 NodeBody::LogRowSeqScan
107 )?;
108
109 let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?;
110 let column_ids = log_store_seq_scan_node
111 .column_ids
112 .iter()
113 .copied()
114 .map(ColumnId::from)
115 .collect();
116
117 let vnodes = match &log_store_seq_scan_node.vnode_bitmap {
118 Some(vnodes) => Some(Bitmap::from(vnodes).into()),
119 None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
122 };
123
124 let chunk_size = source.context().get_config().developer.chunk_size as u32;
125 let metrics = source.context().batch_metrics();
126
127 let Some(BatchQueryEpoch {
128 epoch: Some(batch_query_epoch::Epoch::Committed(old_epoch)),
129 }) = &log_store_seq_scan_node.old_epoch
130 else {
131 unreachable!("invalid old epoch: {:?}", log_store_seq_scan_node.old_epoch)
132 };
133
134 let Some(BatchQueryEpoch {
135 epoch: Some(batch_query_epoch::Epoch::Committed(new_epoch)),
136 }) = &log_store_seq_scan_node.new_epoch
137 else {
138 unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch)
139 };
140
141 assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id);
142 let version_id = old_epoch.hummock_version_id;
143 let old_epoch = old_epoch.epoch;
144 let new_epoch = new_epoch.epoch;
145
146 let scan_range = log_store_seq_scan_node
147 .scan_range
148 .as_ref()
149 .map(|scan_range| build_scan_range_from_pb(scan_range, table_desc))
150 .unwrap_or_else(|| Ok(ScanRange::full()))?;
151
152 dispatch_state_store!(source.context().state_store(), state_store, {
153 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
154 Ok(Box::new(LogRowSeqScanExecutor::new(
155 table,
156 old_epoch,
157 new_epoch,
158 HummockVersionId::new(version_id),
159 chunk_size as usize,
160 source.plan_node().get_identity().clone(),
161 metrics,
162 log_store_seq_scan_node.ordered,
163 scan_range,
164 )))
165 })
166 }
167}
168impl<S: StateStore> Executor for LogRowSeqScanExecutor<S> {
169 fn schema(&self) -> &Schema {
170 &self.schema
171 }
172
173 fn identity(&self) -> &str {
174 &self.identity
175 }
176
177 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
178 self.do_execute().boxed()
179 }
180}
181
182impl<S: StateStore> LogRowSeqScanExecutor<S> {
183 #[try_stream(ok = DataChunk, error = BatchError)]
184 async fn do_execute(self: Box<Self>) {
185 let Self {
186 chunk_size,
187 metrics,
188 table,
189 old_epoch,
190 new_epoch,
191 version_id,
192 schema,
193 ordered,
194 scan_range,
195 ..
196 } = *self;
197 let table = std::sync::Arc::new(table);
198
199 let histogram = metrics
201 .as_ref()
202 .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
203 let stream = Self::execute_range(
207 table.clone(),
208 old_epoch,
209 new_epoch,
210 version_id,
211 chunk_size,
212 histogram,
213 Arc::new(schema.clone()),
214 ordered,
215 scan_range,
216 );
217 #[for_await]
218 for chunk in stream {
219 let chunk = chunk?;
220 yield chunk;
221 }
222 }
223
224 #[try_stream(ok = DataChunk, error = BatchError)]
225 async fn execute_range(
226 table: Arc<BatchTable<S>>,
227 old_epoch: u64,
228 new_epoch: u64,
229 version_id: HummockVersionId,
230 chunk_size: usize,
231 histogram: Option<impl Deref<Target = Histogram>>,
232 schema: Arc<Schema>,
233 ordered: bool,
234 scan_range: ScanRange,
235 ) {
236 let pk_prefix = scan_range.pk_prefix.clone();
237 let range_bounds = scan_range.convert_to_range_bounds(&table);
238 let iter = table
240 .batch_iter_log_with_pk_bounds(
241 old_epoch,
242 HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
243 ordered,
244 range_bounds,
245 pk_prefix,
246 )
247 .await?
248 .flat_map(|r| {
249 futures::stream::iter(std::iter::from_coroutine(
250 #[coroutine]
251 move || {
252 match r {
253 Ok(change_log_row) => {
254 fn with_op(op: Op, row: impl Row) -> impl Row {
255 row.chain([Some(ScalarImpl::Utf8(op.to_varchar().into()))])
256 }
257 for (op, row) in change_log_row.into_op_value_iter() {
258 yield Ok(with_op(op, row));
259 }
260 }
261 Err(e) => {
262 yield Err(e);
263 }
264 };
265 },
266 ))
267 });
268
269 pin_mut!(iter);
270 loop {
271 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
272
273 let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size))
274 .await
275 .map_err(BatchError::from)?;
276 if let Some(timer) = timer {
277 timer.observe_duration()
278 }
279
280 if let Some(chunk) = chunk {
281 yield chunk
282 } else {
283 break;
284 }
285 }
286 }
287}