risingwave_batch_executors/executor/
log_row_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::sync::Arc;
17
18use futures::prelude::stream::StreamExt;
19use futures_async_stream::try_stream;
20use futures_util::pin_mut;
21use prometheus::Histogram;
22use risingwave_common::array::{DataChunk, Op};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema};
25use risingwave_common::hash::VnodeCountCompat;
26use risingwave_common::row::{Row, RowExt};
27use risingwave_common::types::ScalarImpl;
28use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
31use risingwave_pb::plan_common::StorageTableDesc;
32use risingwave_storage::table::batch_table::BatchTable;
33use risingwave_storage::table::collect_data_chunk;
34use risingwave_storage::{StateStore, dispatch_state_store};
35
36use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
37use crate::error::{BatchError, Result};
38use crate::monitor::BatchMetrics;
39
40pub struct LogRowSeqScanExecutor<S: StateStore> {
41    chunk_size: usize,
42    identity: String,
43    // It is table schema + op column
44    schema: Schema,
45
46    /// Batch metrics.
47    /// None: Local mode don't record mertics.
48    metrics: Option<BatchMetrics>,
49
50    table: BatchTable<S>,
51    old_epoch: u64,
52    new_epoch: u64,
53    version_id: HummockVersionId,
54    ordered: bool,
55}
56
57impl<S: StateStore> LogRowSeqScanExecutor<S> {
58    pub fn new(
59        table: BatchTable<S>,
60        old_epoch: u64,
61        new_epoch: u64,
62        version_id: HummockVersionId,
63        chunk_size: usize,
64        identity: String,
65        metrics: Option<BatchMetrics>,
66        ordered: bool,
67    ) -> Self {
68        let mut schema = table.schema().clone();
69        schema.fields.push(Field::with_name(
70            risingwave_common::types::DataType::Varchar,
71            "op",
72        ));
73        Self {
74            chunk_size,
75            identity,
76            schema,
77            metrics,
78            table,
79            old_epoch,
80            new_epoch,
81            version_id,
82            ordered,
83        }
84    }
85}
86
87pub struct LogStoreRowSeqScanExecutorBuilder {}
88
89impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
90    async fn new_boxed_executor(
91        source: &ExecutorBuilder<'_>,
92        inputs: Vec<BoxedExecutor>,
93    ) -> Result<BoxedExecutor> {
94        ensure!(
95            inputs.is_empty(),
96            "LogStore row sequential scan should not have input executor!"
97        );
98        let log_store_seq_scan_node = try_match_expand!(
99            source.plan_node().get_node_body().unwrap(),
100            NodeBody::LogRowSeqScan
101        )?;
102
103        let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?;
104        let column_ids = log_store_seq_scan_node
105            .column_ids
106            .iter()
107            .copied()
108            .map(ColumnId::from)
109            .collect();
110
111        let vnodes = match &log_store_seq_scan_node.vnode_bitmap {
112            Some(vnodes) => Some(Bitmap::from(vnodes).into()),
113            // This is possible for dml. vnode_bitmap is not filled by scheduler.
114            // Or it's single distribution, e.g., distinct agg. We scan in a single executor.
115            None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
116        };
117
118        let chunk_size = source.context().get_config().developer.chunk_size as u32;
119        let metrics = source.context().batch_metrics();
120
121        let Some(BatchQueryEpoch {
122            epoch: Some(batch_query_epoch::Epoch::Committed(old_epoch)),
123        }) = &log_store_seq_scan_node.old_epoch
124        else {
125            unreachable!("invalid old epoch: {:?}", log_store_seq_scan_node.old_epoch)
126        };
127
128        let Some(BatchQueryEpoch {
129            epoch: Some(batch_query_epoch::Epoch::Committed(new_epoch)),
130        }) = &log_store_seq_scan_node.new_epoch
131        else {
132            unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch)
133        };
134
135        assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id);
136        let version_id = old_epoch.hummock_version_id;
137        let old_epoch = old_epoch.epoch;
138        let new_epoch = new_epoch.epoch;
139
140        dispatch_state_store!(source.context().state_store(), state_store, {
141            let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
142            Ok(Box::new(LogRowSeqScanExecutor::new(
143                table,
144                old_epoch,
145                new_epoch,
146                HummockVersionId::new(version_id),
147                chunk_size as usize,
148                source.plan_node().get_identity().clone(),
149                metrics,
150                log_store_seq_scan_node.ordered,
151            )))
152        })
153    }
154}
155impl<S: StateStore> Executor for LogRowSeqScanExecutor<S> {
156    fn schema(&self) -> &Schema {
157        &self.schema
158    }
159
160    fn identity(&self) -> &str {
161        &self.identity
162    }
163
164    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
165        self.do_execute().boxed()
166    }
167}
168
169impl<S: StateStore> LogRowSeqScanExecutor<S> {
170    #[try_stream(ok = DataChunk, error = BatchError)]
171    async fn do_execute(self: Box<Self>) {
172        let Self {
173            chunk_size,
174            metrics,
175            table,
176            old_epoch,
177            new_epoch,
178            version_id,
179            schema,
180            ordered,
181            ..
182        } = *self;
183        let table = std::sync::Arc::new(table);
184
185        // Create collector.
186        let histogram = metrics
187            .as_ref()
188            .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
189        // Range Scan
190        // WARN: DO NOT use `select` to execute range scans concurrently
191        //       it can consume too much memory if there're too many ranges.
192        let stream = Self::execute_range(
193            table.clone(),
194            old_epoch,
195            new_epoch,
196            version_id,
197            chunk_size,
198            histogram,
199            Arc::new(schema.clone()),
200            ordered,
201        );
202        #[for_await]
203        for chunk in stream {
204            let chunk = chunk?;
205            yield chunk;
206        }
207    }
208
209    #[try_stream(ok = DataChunk, error = BatchError)]
210    async fn execute_range(
211        table: Arc<BatchTable<S>>,
212        old_epoch: u64,
213        new_epoch: u64,
214        version_id: HummockVersionId,
215        chunk_size: usize,
216        histogram: Option<impl Deref<Target = Histogram>>,
217        schema: Arc<Schema>,
218        ordered: bool,
219    ) {
220        // Range Scan.
221        let iter = table
222            .batch_iter_log_with_pk_bounds(
223                old_epoch,
224                HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
225                ordered,
226            )
227            .await?
228            .flat_map(|r| {
229                futures::stream::iter(std::iter::from_coroutine(
230                    #[coroutine]
231                    move || {
232                        match r {
233                            Ok(change_log_row) => {
234                                fn with_op(op: Op, row: impl Row) -> impl Row {
235                                    row.chain([Some(ScalarImpl::Utf8(op.to_varchar().into()))])
236                                }
237                                for (op, row) in change_log_row.into_op_value_iter() {
238                                    yield Ok(with_op(op, row));
239                                }
240                            }
241                            Err(e) => {
242                                yield Err(e);
243                            }
244                        };
245                    },
246                ))
247            });
248
249        pin_mut!(iter);
250        loop {
251            let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
252
253            let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size))
254                .await
255                .map_err(BatchError::from)?;
256            if let Some(timer) = timer {
257                timer.observe_duration()
258            }
259
260            if let Some(chunk) = chunk {
261                yield chunk
262            } else {
263                break;
264            }
265        }
266    }
267}