risingwave_batch_executors/executor/
log_row_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::sync::Arc;
17
18use futures::prelude::stream::StreamExt;
19use futures_async_stream::try_stream;
20use futures_util::pin_mut;
21use prometheus::Histogram;
22use risingwave_common::array::{DataChunk, Op};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema};
25use risingwave_common::hash::VnodeCountCompat;
26use risingwave_common::row::{Row, RowExt};
27use risingwave_common::types::ScalarImpl;
28use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
31use risingwave_pb::plan_common::StorageTableDesc;
32use risingwave_storage::table::batch_table::BatchTable;
33use risingwave_storage::table::collect_data_chunk;
34use risingwave_storage::{StateStore, dispatch_state_store};
35
36use super::{
37    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange,
38};
39use crate::build_scan_range_from_pb;
40use crate::error::{BatchError, Result};
41use crate::monitor::BatchMetrics;
42
43pub struct LogRowSeqScanExecutor<S: StateStore> {
44    chunk_size: usize,
45    identity: String,
46    // It is table schema + op column
47    schema: Schema,
48
49    /// Batch metrics.
50    /// None: Local mode don't record metrics.
51    metrics: Option<BatchMetrics>,
52
53    table: BatchTable<S>,
54    old_epoch: u64,
55    new_epoch: u64,
56    version_id: HummockVersionId,
57    ordered: bool,
58    scan_range: ScanRange,
59}
60
61impl<S: StateStore> LogRowSeqScanExecutor<S> {
62    pub fn new(
63        table: BatchTable<S>,
64        old_epoch: u64,
65        new_epoch: u64,
66        version_id: HummockVersionId,
67        chunk_size: usize,
68        identity: String,
69        metrics: Option<BatchMetrics>,
70        ordered: bool,
71        scan_range: ScanRange,
72    ) -> Self {
73        let mut schema = table.schema().clone();
74        schema.fields.push(Field::with_name(
75            risingwave_common::types::DataType::Varchar,
76            "op",
77        ));
78        Self {
79            chunk_size,
80            identity,
81            schema,
82            metrics,
83            table,
84            old_epoch,
85            new_epoch,
86            version_id,
87            ordered,
88            scan_range,
89        }
90    }
91}
92
93pub struct LogStoreRowSeqScanExecutorBuilder {}
94
95impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
96    async fn new_boxed_executor(
97        source: &ExecutorBuilder<'_>,
98        inputs: Vec<BoxedExecutor>,
99    ) -> Result<BoxedExecutor> {
100        ensure!(
101            inputs.is_empty(),
102            "LogStore row sequential scan should not have input executor!"
103        );
104        let log_store_seq_scan_node = try_match_expand!(
105            source.plan_node().get_node_body().unwrap(),
106            NodeBody::LogRowSeqScan
107        )?;
108
109        let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?;
110        let column_ids = log_store_seq_scan_node
111            .column_ids
112            .iter()
113            .copied()
114            .map(ColumnId::from)
115            .collect();
116
117        let vnodes = match &log_store_seq_scan_node.vnode_bitmap {
118            Some(vnodes) => Some(Bitmap::from(vnodes).into()),
119            // This is possible for dml. vnode_bitmap is not filled by scheduler.
120            // Or it's single distribution, e.g., distinct agg. We scan in a single executor.
121            None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
122        };
123
124        let chunk_size = source.context().get_config().developer.chunk_size as u32;
125        let metrics = source.context().batch_metrics();
126
127        let Some(BatchQueryEpoch {
128            epoch: Some(batch_query_epoch::Epoch::Committed(old_epoch)),
129        }) = &log_store_seq_scan_node.old_epoch
130        else {
131            unreachable!("invalid old epoch: {:?}", log_store_seq_scan_node.old_epoch)
132        };
133
134        let Some(BatchQueryEpoch {
135            epoch: Some(batch_query_epoch::Epoch::Committed(new_epoch)),
136        }) = &log_store_seq_scan_node.new_epoch
137        else {
138            unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch)
139        };
140
141        assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id);
142        let version_id = old_epoch.hummock_version_id;
143        let old_epoch = old_epoch.epoch;
144        let new_epoch = new_epoch.epoch;
145
146        let scan_range = log_store_seq_scan_node
147            .scan_range
148            .as_ref()
149            .map(|scan_range| build_scan_range_from_pb(scan_range, table_desc))
150            .unwrap_or_else(|| Ok(ScanRange::full()))?;
151
152        dispatch_state_store!(source.context().state_store(), state_store, {
153            let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
154            Ok(Box::new(LogRowSeqScanExecutor::new(
155                table,
156                old_epoch,
157                new_epoch,
158                HummockVersionId::new(version_id),
159                chunk_size as usize,
160                source.plan_node().get_identity().clone(),
161                metrics,
162                log_store_seq_scan_node.ordered,
163                scan_range,
164            )))
165        })
166    }
167}
168impl<S: StateStore> Executor for LogRowSeqScanExecutor<S> {
169    fn schema(&self) -> &Schema {
170        &self.schema
171    }
172
173    fn identity(&self) -> &str {
174        &self.identity
175    }
176
177    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
178        self.do_execute().boxed()
179    }
180}
181
182impl<S: StateStore> LogRowSeqScanExecutor<S> {
183    #[try_stream(ok = DataChunk, error = BatchError)]
184    async fn do_execute(self: Box<Self>) {
185        let Self {
186            chunk_size,
187            metrics,
188            table,
189            old_epoch,
190            new_epoch,
191            version_id,
192            schema,
193            ordered,
194            scan_range,
195            ..
196        } = *self;
197        let table = std::sync::Arc::new(table);
198
199        // Create collector.
200        let histogram = metrics
201            .as_ref()
202            .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
203        // Range Scan
204        // WARN: DO NOT use `select` to execute range scans concurrently
205        //       it can consume too much memory if there're too many ranges.
206        let stream = Self::execute_range(
207            table.clone(),
208            old_epoch,
209            new_epoch,
210            version_id,
211            chunk_size,
212            histogram,
213            Arc::new(schema.clone()),
214            ordered,
215            scan_range,
216        );
217        #[for_await]
218        for chunk in stream {
219            let chunk = chunk?;
220            yield chunk;
221        }
222    }
223
224    #[try_stream(ok = DataChunk, error = BatchError)]
225    async fn execute_range(
226        table: Arc<BatchTable<S>>,
227        old_epoch: u64,
228        new_epoch: u64,
229        version_id: HummockVersionId,
230        chunk_size: usize,
231        histogram: Option<impl Deref<Target = Histogram>>,
232        schema: Arc<Schema>,
233        ordered: bool,
234        scan_range: ScanRange,
235    ) {
236        let pk_prefix = scan_range.pk_prefix.clone();
237        let range_bounds = scan_range.convert_to_range_bounds(&table);
238        // Range Scan.
239        let iter = table
240            .batch_iter_log_with_pk_bounds(
241                old_epoch,
242                HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
243                ordered,
244                range_bounds,
245                pk_prefix,
246            )
247            .await?
248            .flat_map(|r| {
249                futures::stream::iter(std::iter::from_coroutine(
250                    #[coroutine]
251                    move || {
252                        match r {
253                            Ok(change_log_row) => {
254                                fn with_op(op: Op, row: impl Row) -> impl Row {
255                                    row.chain([Some(ScalarImpl::Utf8(op.to_varchar().into()))])
256                                }
257                                for (op, row) in change_log_row.into_op_value_iter() {
258                                    yield Ok(with_op(op, row));
259                                }
260                            }
261                            Err(e) => {
262                                yield Err(e);
263                            }
264                        };
265                    },
266                ))
267            });
268
269        pin_mut!(iter);
270        loop {
271            let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
272
273            let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size))
274                .await
275                .map_err(BatchError::from)?;
276            if let Some(timer) = timer {
277                timer.observe_duration()
278            }
279
280            if let Some(chunk) = chunk {
281                yield chunk
282            } else {
283                break;
284            }
285        }
286    }
287}