risingwave_batch_executors/executor/
log_row_seq_scan.rs1use std::ops::Deref;
16use std::sync::Arc;
17
18use futures::prelude::stream::StreamExt;
19use futures_async_stream::try_stream;
20use futures_util::pin_mut;
21use prometheus::Histogram;
22use risingwave_common::array::{DataChunk, Op};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema};
25use risingwave_common::hash::VnodeCountCompat;
26use risingwave_common::row::{Row, RowExt};
27use risingwave_common::types::ScalarImpl;
28use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
31use risingwave_pb::plan_common::StorageTableDesc;
32use risingwave_storage::table::batch_table::BatchTable;
33use risingwave_storage::table::collect_data_chunk;
34use risingwave_storage::{StateStore, dispatch_state_store};
35
36use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
37use crate::error::{BatchError, Result};
38use crate::monitor::BatchMetrics;
39
40pub struct LogRowSeqScanExecutor<S: StateStore> {
41 chunk_size: usize,
42 identity: String,
43 schema: Schema,
45
46 metrics: Option<BatchMetrics>,
49
50 table: BatchTable<S>,
51 old_epoch: u64,
52 new_epoch: u64,
53 version_id: HummockVersionId,
54 ordered: bool,
55}
56
57impl<S: StateStore> LogRowSeqScanExecutor<S> {
58 pub fn new(
59 table: BatchTable<S>,
60 old_epoch: u64,
61 new_epoch: u64,
62 version_id: HummockVersionId,
63 chunk_size: usize,
64 identity: String,
65 metrics: Option<BatchMetrics>,
66 ordered: bool,
67 ) -> Self {
68 let mut schema = table.schema().clone();
69 schema.fields.push(Field::with_name(
70 risingwave_common::types::DataType::Varchar,
71 "op",
72 ));
73 Self {
74 chunk_size,
75 identity,
76 schema,
77 metrics,
78 table,
79 old_epoch,
80 new_epoch,
81 version_id,
82 ordered,
83 }
84 }
85}
86
87pub struct LogStoreRowSeqScanExecutorBuilder {}
88
89impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
90 async fn new_boxed_executor(
91 source: &ExecutorBuilder<'_>,
92 inputs: Vec<BoxedExecutor>,
93 ) -> Result<BoxedExecutor> {
94 ensure!(
95 inputs.is_empty(),
96 "LogStore row sequential scan should not have input executor!"
97 );
98 let log_store_seq_scan_node = try_match_expand!(
99 source.plan_node().get_node_body().unwrap(),
100 NodeBody::LogRowSeqScan
101 )?;
102
103 let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?;
104 let column_ids = log_store_seq_scan_node
105 .column_ids
106 .iter()
107 .copied()
108 .map(ColumnId::from)
109 .collect();
110
111 let vnodes = match &log_store_seq_scan_node.vnode_bitmap {
112 Some(vnodes) => Some(Bitmap::from(vnodes).into()),
113 None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
116 };
117
118 let chunk_size = source.context().get_config().developer.chunk_size as u32;
119 let metrics = source.context().batch_metrics();
120
121 let Some(BatchQueryEpoch {
122 epoch: Some(batch_query_epoch::Epoch::Committed(old_epoch)),
123 }) = &log_store_seq_scan_node.old_epoch
124 else {
125 unreachable!("invalid old epoch: {:?}", log_store_seq_scan_node.old_epoch)
126 };
127
128 let Some(BatchQueryEpoch {
129 epoch: Some(batch_query_epoch::Epoch::Committed(new_epoch)),
130 }) = &log_store_seq_scan_node.new_epoch
131 else {
132 unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch)
133 };
134
135 assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id);
136 let version_id = old_epoch.hummock_version_id;
137 let old_epoch = old_epoch.epoch;
138 let new_epoch = new_epoch.epoch;
139
140 dispatch_state_store!(source.context().state_store(), state_store, {
141 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
142 Ok(Box::new(LogRowSeqScanExecutor::new(
143 table,
144 old_epoch,
145 new_epoch,
146 HummockVersionId::new(version_id),
147 chunk_size as usize,
148 source.plan_node().get_identity().clone(),
149 metrics,
150 log_store_seq_scan_node.ordered,
151 )))
152 })
153 }
154}
155impl<S: StateStore> Executor for LogRowSeqScanExecutor<S> {
156 fn schema(&self) -> &Schema {
157 &self.schema
158 }
159
160 fn identity(&self) -> &str {
161 &self.identity
162 }
163
164 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
165 self.do_execute().boxed()
166 }
167}
168
169impl<S: StateStore> LogRowSeqScanExecutor<S> {
170 #[try_stream(ok = DataChunk, error = BatchError)]
171 async fn do_execute(self: Box<Self>) {
172 let Self {
173 chunk_size,
174 metrics,
175 table,
176 old_epoch,
177 new_epoch,
178 version_id,
179 schema,
180 ordered,
181 ..
182 } = *self;
183 let table = std::sync::Arc::new(table);
184
185 let histogram = metrics
187 .as_ref()
188 .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
189 let stream = Self::execute_range(
193 table.clone(),
194 old_epoch,
195 new_epoch,
196 version_id,
197 chunk_size,
198 histogram,
199 Arc::new(schema.clone()),
200 ordered,
201 );
202 #[for_await]
203 for chunk in stream {
204 let chunk = chunk?;
205 yield chunk;
206 }
207 }
208
209 #[try_stream(ok = DataChunk, error = BatchError)]
210 async fn execute_range(
211 table: Arc<BatchTable<S>>,
212 old_epoch: u64,
213 new_epoch: u64,
214 version_id: HummockVersionId,
215 chunk_size: usize,
216 histogram: Option<impl Deref<Target = Histogram>>,
217 schema: Arc<Schema>,
218 ordered: bool,
219 ) {
220 let iter = table
222 .batch_iter_log_with_pk_bounds(
223 old_epoch,
224 HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
225 ordered,
226 )
227 .await?
228 .flat_map(|r| {
229 futures::stream::iter(std::iter::from_coroutine(
230 #[coroutine]
231 move || {
232 match r {
233 Ok(change_log_row) => {
234 fn with_op(op: Op, row: impl Row) -> impl Row {
235 row.chain([Some(ScalarImpl::Utf8(op.to_varchar().into()))])
236 }
237 for (op, row) in change_log_row.into_op_value_iter() {
238 yield Ok(with_op(op, row));
239 }
240 }
241 Err(e) => {
242 yield Err(e);
243 }
244 };
245 },
246 ))
247 });
248
249 pin_mut!(iter);
250 loop {
251 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
252
253 let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size))
254 .await
255 .map_err(BatchError::from)?;
256 if let Some(timer) = timer {
257 timer.observe_duration()
258 }
259
260 if let Some(chunk) = chunk {
261 yield chunk
262 } else {
263 break;
264 }
265 }
266 }
267}