risingwave_stream/executor/backfill/
arrangement_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use either::Either;
18use futures::stream::{select_all, select_with_strategy};
19use futures::{TryStreamExt, stream};
20use itertools::Itertools;
21use risingwave_common::array::{DataChunk, Op};
22use risingwave_common::bail;
23use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
26use risingwave_storage::row_serde::value_serde::ValueRowSerde;
27use risingwave_storage::store::PrefetchOptions;
28
29use crate::common::table::state_table::ReplicatedStateTable;
30#[cfg(debug_assertions)]
31use crate::executor::backfill::utils::METADATA_STATE_LEN;
32use crate::executor::backfill::utils::{
33    BackfillProgressPerVnode, BackfillState, compute_bounds, create_builder,
34    get_progress_per_vnode, mapping_chunk, mapping_message, mark_chunk_ref_by_vnode,
35    persist_state_per_vnode, update_pos_by_vnode,
36};
37use crate::executor::prelude::*;
38use crate::task::CreateMviewProgressReporter;
39
40type Builders = HashMap<VirtualNode, DataChunkBuilder>;
41
42/// Similar to [`super::no_shuffle_backfill::BackfillExecutor`].
43/// Main differences:
44/// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled
45///   independently.
46/// - To synchronize upstream shared buffer, it is initialized with a [`ReplicatedStateTable`].
47pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
48    /// Upstream table
49    upstream_table: ReplicatedStateTable<S, SD>,
50
51    /// Upstream with the same schema with the upstream table.
52    upstream: Executor,
53
54    /// Internal state table for persisting state of backfill state.
55    state_table: StateTable<S>,
56
57    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
58    output_indices: Vec<usize>,
59
60    progress: CreateMviewProgressReporter,
61
62    actor_id: ActorId,
63
64    metrics: Arc<StreamingMetrics>,
65
66    chunk_size: usize,
67
68    rate_limiter: MonitoredRateLimiter,
69}
70
71impl<S, SD> ArrangementBackfillExecutor<S, SD>
72where
73    S: StateStore,
74    SD: ValueRowSerde,
75{
76    #[allow(clippy::too_many_arguments)]
77    #[allow(dead_code)]
78    pub fn new(
79        upstream_table: ReplicatedStateTable<S, SD>,
80        upstream: Executor,
81        state_table: StateTable<S>,
82        output_indices: Vec<usize>,
83        progress: CreateMviewProgressReporter,
84        metrics: Arc<StreamingMetrics>,
85        chunk_size: usize,
86        rate_limit: RateLimit,
87    ) -> Self {
88        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
89        Self {
90            upstream_table,
91            upstream,
92            state_table,
93            output_indices,
94            actor_id: progress.actor_id(),
95            progress,
96            metrics,
97            chunk_size,
98            rate_limiter,
99        }
100    }
101
102    #[try_stream(ok = Message, error = StreamExecutorError)]
103    async fn execute_inner(mut self) {
104        tracing::debug!("backfill executor started");
105        // The primary key columns, in the output columns of the upstream_table scan.
106        // Table scan scans a subset of the columns of the upstream table.
107        let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
108        #[cfg(debug_assertions)]
109        let state_len = self.upstream_table.pk_indices().len() + METADATA_STATE_LEN;
110        let pk_order = self.upstream_table.pk_serde().get_order_types().to_vec();
111        let upstream_table_id = self.upstream_table.table_id();
112        let mut upstream_table = self.upstream_table;
113        let vnodes = upstream_table.vnodes().clone();
114
115        // These builders will build data chunks.
116        // We must supply them with the full datatypes which correspond to
117        // pk + output_indices.
118        let snapshot_data_types = self
119            .upstream
120            .schema()
121            .fields()
122            .iter()
123            .map(|field| field.data_type.clone())
124            .collect_vec();
125        let mut builders: Builders = upstream_table
126            .vnodes()
127            .iter_vnodes()
128            .map(|vnode| {
129                let builder = create_builder(
130                    self.rate_limiter.rate_limit(),
131                    self.chunk_size,
132                    snapshot_data_types.clone(),
133                );
134                (vnode, builder)
135            })
136            .collect();
137
138        let mut upstream = self.upstream.execute();
139
140        // Poll the upstream to get the first barrier.
141        let first_barrier = expect_first_barrier(&mut upstream).await?;
142        let mut paused = first_barrier.is_pause_on_startup();
143        let first_epoch = first_barrier.epoch;
144        let is_newly_added = first_barrier.is_newly_added(self.actor_id);
145        // The first barrier message should be propagated.
146        yield Message::Barrier(first_barrier);
147
148        self.state_table.init_epoch(first_epoch).await?;
149
150        let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?;
151
152        let is_completely_finished = progress_per_vnode.iter().all(|(_, p)| {
153            matches!(
154                p.current_state(),
155                &BackfillProgressPerVnode::Completed { .. }
156            )
157        });
158        if is_completely_finished {
159            assert!(!is_newly_added);
160        }
161
162        upstream_table.init_epoch(first_epoch).await?;
163
164        let mut backfill_state: BackfillState = progress_per_vnode.into();
165
166        let to_backfill = !is_completely_finished;
167
168        // If no need backfill, but state was still "unfinished" we need to finish it.
169        // So we just update the state + progress to meta at the next barrier to finish progress,
170        // and forward other messages.
171        //
172        // Reason for persisting on second barrier rather than first:
173        // We can't update meta with progress as finished until state_table
174        // has been updated.
175        // We also can't update state_table in first epoch, since state_table
176        // expects to have been initialized in previous epoch.
177
178        // The epoch used to snapshot read upstream mv.
179        let mut snapshot_read_epoch;
180
181        // Keep track of rows from the snapshot.
182        let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();
183
184        // Arrangement Backfill Algorithm:
185        //
186        //   backfill_stream
187        //  /               \
188        // upstream       snapshot
189        //
190        // We construct a backfill stream with upstream as its left input and mv snapshot read
191        // stream as its right input. When a chunk comes from upstream, we will buffer it.
192        //
193        // When a barrier comes from upstream:
194        //  Immediately break out of backfill loop.
195        //  - For each row of the upstream chunk buffer, compute vnode.
196        //  - Get the `current_pos` corresponding to the vnode. Forward it to downstream if its pk
197        //    <= `current_pos`, otherwise ignore it.
198        //  - Flush all buffered upstream_chunks to replicated state table.
199        //  - Update the `snapshot_read_epoch`.
200        //  - Reconstruct the whole backfill stream with upstream and new mv snapshot read stream
201        //    with the `snapshot_read_epoch`.
202        //
203        // When a chunk comes from snapshot, we forward it to the downstream and raise
204        // `current_pos`.
205        //
206        // When we reach the end of the snapshot read stream, it means backfill has been
207        // finished.
208        //
209        // Once the backfill loop ends, we forward the upstream directly to the downstream.
210        if to_backfill {
211            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
212            let mut pending_barrier: Option<Barrier> = None;
213
214            let metrics = self
215                .metrics
216                .new_backfill_metrics(upstream_table_id, self.actor_id);
217
218            'backfill_loop: loop {
219                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
220                let mut cur_barrier_upstream_processed_rows: u64 = 0;
221                let mut snapshot_read_complete = false;
222                let mut has_snapshot_read = false;
223
224                // NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be
225                // dropped. Then we can write to `upstream_table` on barrier in the
226                // next block.
227                {
228                    let left_upstream = upstream.by_ref().map(Either::Left);
229
230                    // Check if stream paused
231                    let paused =
232                        paused || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
233                    // Create the snapshot stream
234                    let right_snapshot = pin!(
235                        Self::make_snapshot_stream(
236                            &upstream_table,
237                            backfill_state.clone(), // FIXME: Use mutable reference instead.
238                            paused,
239                            &self.rate_limiter,
240                        )
241                        .map(Either::Right)
242                    );
243
244                    // Prefer to select upstream, so we can stop snapshot stream as soon as the
245                    // barrier comes.
246                    let mut backfill_stream =
247                        select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
248                            stream::PollNext::Left
249                        });
250
251                    #[for_await]
252                    for either in &mut backfill_stream {
253                        match either {
254                            // Upstream
255                            Either::Left(msg) => {
256                                match msg? {
257                                    Message::Barrier(barrier) => {
258                                        // We have to process the barrier outside of the loop.
259                                        // This is because our state_table reference is still live
260                                        // here, we have to break the loop to drop it,
261                                        // so we can do replication of upstream state_table.
262                                        pending_barrier = Some(barrier);
263
264                                        // Break the for loop and start a new snapshot read stream.
265                                        break;
266                                    }
267                                    Message::Chunk(chunk) => {
268                                        // Buffer the upstream chunk.
269                                        upstream_chunk_buffer.push(chunk.compact());
270                                    }
271                                    Message::Watermark(_) => {
272                                        // Ignore watermark during backfill.
273                                    }
274                                }
275                            }
276                            // Snapshot read
277                            Either::Right(msg) => {
278                                has_snapshot_read = true;
279                                match msg? {
280                                    None => {
281                                        // Consume remaining rows in the builder.
282                                        for (vnode, builder) in &mut builders {
283                                            if let Some(data_chunk) = builder.consume_all() {
284                                                yield Message::Chunk(Self::handle_snapshot_chunk(
285                                                    data_chunk,
286                                                    *vnode,
287                                                    &pk_in_output_indices,
288                                                    &mut backfill_state,
289                                                    &mut cur_barrier_snapshot_processed_rows,
290                                                    &mut total_snapshot_processed_rows,
291                                                    &self.output_indices,
292                                                )?);
293                                            }
294                                        }
295
296                                        // End of the snapshot read stream.
297                                        // We should not mark the chunk anymore,
298                                        // otherwise, we will ignore some rows
299                                        // in the buffer. Here we choose to never mark the chunk.
300                                        // Consume with the renaming stream buffer chunk without
301                                        // mark.
302                                        for chunk in upstream_chunk_buffer.drain(..) {
303                                            let chunk_cardinality = chunk.cardinality() as u64;
304                                            cur_barrier_upstream_processed_rows +=
305                                                chunk_cardinality;
306                                            yield Message::Chunk(mapping_chunk(
307                                                chunk,
308                                                &self.output_indices,
309                                            ));
310                                        }
311                                        metrics
312                                            .backfill_snapshot_read_row_count
313                                            .inc_by(cur_barrier_snapshot_processed_rows);
314                                        metrics
315                                            .backfill_upstream_output_row_count
316                                            .inc_by(cur_barrier_upstream_processed_rows);
317                                        break 'backfill_loop;
318                                    }
319                                    Some((vnode, row)) => {
320                                        let builder = builders.get_mut(&vnode).unwrap();
321                                        if let Some(chunk) = builder.append_one_row(row) {
322                                            yield Message::Chunk(Self::handle_snapshot_chunk(
323                                                chunk,
324                                                vnode,
325                                                &pk_in_output_indices,
326                                                &mut backfill_state,
327                                                &mut cur_barrier_snapshot_processed_rows,
328                                                &mut total_snapshot_processed_rows,
329                                                &self.output_indices,
330                                            )?);
331                                        }
332                                    }
333                                }
334                            }
335                        }
336                    }
337
338                    // Before processing barrier, if did not snapshot read,
339                    // do a snapshot read first.
340                    // This is so we don't lose the tombstone iteration progress.
341                    // Or if s3 read latency is high, we don't fail to read from s3.
342                    //
343                    // If paused, we can't read any snapshot records, skip this.
344                    //
345                    // If rate limit is set, respect the rate limit, check if we can read,
346                    // If we can't, skip it. If no rate limit set, we can read.
347                    let rate_limit_ready = self.rate_limiter.check(1).is_ok();
348                    if !has_snapshot_read && !paused && rate_limit_ready {
349                        debug_assert!(builders.values().all(|b| b.is_empty()));
350                        let (_, snapshot) = backfill_stream.into_inner();
351                        #[for_await]
352                        for msg in snapshot {
353                            let Either::Right(msg) = msg else {
354                                bail!("BUG: snapshot_read contains upstream messages");
355                            };
356                            match msg? {
357                                None => {
358                                    // End of the snapshot read stream.
359                                    // We let the barrier handling logic take care of upstream updates.
360                                    // But we still want to exit backfill loop, so we mark snapshot read complete.
361                                    snapshot_read_complete = true;
362                                    break;
363                                }
364                                Some((vnode, row)) => {
365                                    let builder = builders.get_mut(&vnode).unwrap();
366                                    if let Some(chunk) = builder.append_one_row(row) {
367                                        yield Message::Chunk(Self::handle_snapshot_chunk(
368                                            chunk,
369                                            vnode,
370                                            &pk_in_output_indices,
371                                            &mut backfill_state,
372                                            &mut cur_barrier_snapshot_processed_rows,
373                                            &mut total_snapshot_processed_rows,
374                                            &self.output_indices,
375                                        )?);
376                                    }
377
378                                    break;
379                                }
380                            }
381                        }
382                    }
383                }
384
385                // Process barrier
386                // When we break out of inner backfill_stream loop, it means we have a barrier.
387                // If there are no updates and there are no snapshots left,
388                // we already finished backfill and should have exited the outer backfill loop.
389                let barrier = match pending_barrier.take() {
390                    Some(barrier) => barrier,
391                    None => bail!("BUG: current_backfill loop exited without a barrier"),
392                };
393
394                // Process barrier:
395                // - consume snapshot rows left in builder.
396                // - consume upstream buffer chunk
397                // - handle mutations
398                // - switch snapshot
399
400                // consume snapshot rows left in builder.
401                // NOTE(kwannoel): `zip_eq_debug` does not work here,
402                // we encounter "higher-ranked lifetime error".
403                for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| {
404                    let chunk = b.consume_all().map(|chunk| {
405                        let ops = vec![Op::Insert; chunk.capacity()];
406                        StreamChunk::from_parts(ops, chunk)
407                    });
408                    (vnode, chunk)
409                }) {
410                    if let Some(chunk) = chunk {
411                        let chunk_cardinality = chunk.cardinality() as u64;
412                        // Raise the current position.
413                        // As snapshot read streams are ordered by pk, so we can
414                        // just use the last row to update `current_pos`.
415                        update_pos_by_vnode(
416                            *vnode,
417                            &chunk,
418                            &pk_in_output_indices,
419                            &mut backfill_state,
420                            chunk_cardinality,
421                        )?;
422
423                        cur_barrier_snapshot_processed_rows += chunk_cardinality;
424                        total_snapshot_processed_rows += chunk_cardinality;
425                        yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
426                    }
427                }
428
429                // consume upstream buffer chunk
430                for chunk in upstream_chunk_buffer.drain(..) {
431                    cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
432                    // FIXME: Replace with `snapshot_is_processed`
433                    // Flush downstream.
434                    // If no current_pos, means no snapshot processed yet.
435                    // Also means we don't need propagate any updates <= current_pos.
436                    if backfill_state.has_progress() {
437                        yield Message::Chunk(mapping_chunk(
438                            mark_chunk_ref_by_vnode(
439                                &chunk,
440                                &backfill_state,
441                                &pk_in_output_indices,
442                                &upstream_table,
443                                &pk_order,
444                            )?,
445                            &self.output_indices,
446                        ));
447                    }
448
449                    // Replicate
450                    upstream_table.write_chunk(chunk);
451                }
452
453                upstream_table
454                    .commit_assert_no_update_vnode_bitmap(barrier.epoch)
455                    .await?;
456
457                metrics
458                    .backfill_snapshot_read_row_count
459                    .inc_by(cur_barrier_snapshot_processed_rows);
460                metrics
461                    .backfill_upstream_output_row_count
462                    .inc_by(cur_barrier_upstream_processed_rows);
463
464                // Update snapshot read epoch.
465                snapshot_read_epoch = barrier.epoch.prev;
466
467                // TODO(kwannoel): Not sure if this holds for arrangement backfill.
468                // May need to revisit it.
469                // Need to check it after scale-in / scale-out.
470                self.progress.update(
471                    barrier.epoch,
472                    snapshot_read_epoch,
473                    total_snapshot_processed_rows,
474                );
475
476                // Persist state on barrier
477                persist_state_per_vnode(
478                    barrier.epoch,
479                    &mut self.state_table,
480                    &mut backfill_state,
481                    #[cfg(debug_assertions)]
482                    state_len,
483                    vnodes.iter_vnodes(),
484                )
485                .await?;
486
487                tracing::trace!(
488                    barrier = ?barrier,
489                    "barrier persisted"
490                );
491
492                // handle mutations
493                if let Some(mutation) = barrier.mutation.as_deref() {
494                    use crate::executor::Mutation;
495                    match mutation {
496                        Mutation::Pause => {
497                            paused = true;
498                        }
499                        Mutation::Resume => {
500                            paused = false;
501                        }
502                        Mutation::Throttle(actor_to_apply) => {
503                            let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
504                            if let Some(new_rate_limit) = new_rate_limit_entry {
505                                let new_rate_limit = (*new_rate_limit).into();
506                                let old_rate_limit = self.rate_limiter.update(new_rate_limit);
507                                if old_rate_limit != new_rate_limit {
508                                    tracing::info!(
509                                        old_rate_limit = ?old_rate_limit,
510                                        new_rate_limit = ?new_rate_limit,
511                                        upstream_table_id = upstream_table_id,
512                                        actor_id = self.actor_id,
513                                        "backfill rate limit changed",
514                                    );
515                                    builders = upstream_table
516                                        .vnodes()
517                                        .iter_vnodes()
518                                        .map(|vnode| {
519                                            let builder = create_builder(
520                                                new_rate_limit,
521                                                self.chunk_size,
522                                                snapshot_data_types.clone(),
523                                            );
524                                            (vnode, builder)
525                                        })
526                                        .collect();
527                                }
528                            }
529                        }
530                        _ => {}
531                    }
532                }
533
534                yield Message::Barrier(barrier);
535
536                // We will switch snapshot at the start of the next iteration of the backfill loop.
537                // Unless snapshot read is already completed.
538                if snapshot_read_complete {
539                    break 'backfill_loop;
540                }
541            }
542        }
543
544        tracing::debug!("snapshot read finished, wait to commit state on next barrier");
545
546        // Update our progress as finished in state table.
547
548        // Wait for first barrier to come after backfill is finished.
549        // So we can update our progress + persist the status.
550        while let Some(Ok(msg)) = upstream.next().await {
551            if let Some(msg) = mapping_message(msg, &self.output_indices) {
552                // If not finished then we need to update state, otherwise no need.
553                if let Message::Barrier(barrier) = &msg {
554                    if is_completely_finished {
555                        // If already finished, no need to persist any state. But we need to advance the epoch anyway
556                        self.state_table
557                            .commit_assert_no_update_vnode_bitmap(barrier.epoch)
558                            .await?;
559                    } else {
560                        // If snapshot was empty, we do not need to backfill,
561                        // but we still need to persist the finished state.
562                        // We currently persist it on the second barrier here rather than first.
563                        // This is because we can't update state table in first epoch,
564                        // since it expects to have been initialized in previous epoch
565                        // (there's no epoch before the first epoch).
566                        for vnode in upstream_table.vnodes().iter_vnodes() {
567                            backfill_state
568                                .finish_progress(vnode, upstream_table.pk_indices().len());
569                        }
570
571                        persist_state_per_vnode(
572                            barrier.epoch,
573                            &mut self.state_table,
574                            &mut backfill_state,
575                            #[cfg(debug_assertions)]
576                            state_len,
577                            vnodes.iter_vnodes(),
578                        )
579                        .await?;
580                    }
581
582                    self.progress
583                        .finish(barrier.epoch, total_snapshot_processed_rows);
584                    yield msg;
585                    break;
586                }
587                // Allow other messages to pass through.
588                // We won't yield twice here, since if there's a barrier,
589                // we will always break out of the loop.
590                yield msg;
591            }
592        }
593
594        tracing::debug!("backfill finished");
595
596        // After progress finished + state persisted,
597        // we can forward messages directly to the downstream,
598        // as backfill is finished.
599        #[for_await]
600        for msg in upstream {
601            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
602                if let Message::Barrier(barrier) = &msg {
603                    // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
604                    self.state_table
605                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
606                        .await?;
607                }
608                yield msg;
609            }
610        }
611    }
612
613    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
614    async fn make_snapshot_stream<'a>(
615        upstream_table: &'a ReplicatedStateTable<S, SD>,
616        backfill_state: BackfillState,
617        paused: bool,
618        rate_limiter: &'a MonitoredRateLimiter,
619    ) {
620        if paused {
621            #[for_await]
622            for _ in tokio_stream::pending() {
623                bail!("BUG: paused stream should not yield");
624            }
625        } else {
626            // Checked the rate limit is not zero.
627            #[for_await]
628            for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
629                let r = r?;
630                rate_limiter.wait(1).await;
631                yield r;
632            }
633        }
634    }
635
636    fn handle_snapshot_chunk(
637        chunk: DataChunk,
638        vnode: VirtualNode,
639        pk_in_output_indices: &[usize],
640        backfill_state: &mut BackfillState,
641        cur_barrier_snapshot_processed_rows: &mut u64,
642        total_snapshot_processed_rows: &mut u64,
643        output_indices: &[usize],
644    ) -> StreamExecutorResult<StreamChunk> {
645        let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
646        // Raise the current position.
647        // As snapshot read streams are ordered by pk, so we can
648        // just use the last row to update `current_pos`.
649        let snapshot_row_count_delta = chunk.cardinality() as u64;
650        update_pos_by_vnode(
651            vnode,
652            &chunk,
653            pk_in_output_indices,
654            backfill_state,
655            snapshot_row_count_delta,
656        )?;
657
658        let chunk_cardinality = chunk.cardinality() as u64;
659        *cur_barrier_snapshot_processed_rows += chunk_cardinality;
660        *total_snapshot_processed_rows += chunk_cardinality;
661        Ok(mapping_chunk(chunk, output_indices))
662    }
663
664    /// Read snapshot per vnode.
665    /// These streams should be sorted in storage layer.
666    /// 1. Get row iterator / vnode.
667    /// 2. Merge it with `select_all`.
668    /// 3. Change it into a chunk iterator with `iter_chunks`.
669    /// This means it should fetch a row from each iterator to form a chunk.
670    ///
671    /// We interleave at chunk per vnode level rather than rows.
672    /// This is so that we can compute `current_pos` once per chunk, since they correspond to 1
673    /// vnode.
674    ///
675    /// The stream contains pairs of `(VirtualNode, StreamChunk)`.
676    /// The `VirtualNode` is the vnode that the chunk belongs to.
677    /// The `StreamChunk` is the chunk that contains the rows from the vnode.
678    /// If it's `None`, it means the vnode has no more rows for this snapshot read.
679    ///
680    /// The `snapshot_read_epoch` is supplied as a parameter for `state_table`.
681    /// It is required to ensure we read a fully-checkpointed snapshot the **first time**.
682    ///
683    /// The rows from upstream snapshot read will be buffered inside the `builder`.
684    /// If snapshot is dropped before its rows are consumed,
685    /// remaining data in `builder` must be flushed manually.
686    /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
687    /// present, Then when we flush we contain duplicate rows.
688    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
689    async fn snapshot_read_per_vnode(
690        upstream_table: &ReplicatedStateTable<S, SD>,
691        backfill_state: BackfillState,
692    ) {
693        let mut iterators = vec![];
694        for vnode in upstream_table.vnodes().iter_vnodes() {
695            let backfill_progress = backfill_state.get_progress(&vnode)?;
696            let current_pos = match backfill_progress {
697                BackfillProgressPerVnode::NotStarted => None,
698                BackfillProgressPerVnode::Completed { .. } => {
699                    continue;
700                }
701                BackfillProgressPerVnode::InProgress { current_pos, .. } => {
702                    Some(current_pos.clone())
703                }
704            };
705
706            let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
707            if range_bounds.is_none() {
708                continue;
709            }
710            let range_bounds = range_bounds.unwrap();
711
712            tracing::trace!(
713                vnode = ?vnode,
714                current_pos = ?current_pos,
715                range_bounds = ?range_bounds,
716                "iter_with_vnode_and_output_indices"
717            );
718            let vnode_row_iter = upstream_table
719                .iter_with_vnode_and_output_indices(
720                    vnode,
721                    &range_bounds,
722                    PrefetchOptions::prefetch_for_small_range_scan(),
723                )
724                .await?;
725
726            let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));
727
728            let vnode_row_iter = Box::pin(vnode_row_iter);
729
730            iterators.push(vnode_row_iter);
731        }
732
733        // TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial.
734        let vnode_row_iter = select_all(iterators);
735
736        #[for_await]
737        for vnode_and_row in vnode_row_iter {
738            yield Some(vnode_and_row?);
739        }
740        yield None;
741        return Ok(());
742    }
743}
744
745impl<S, SD> Execute for ArrangementBackfillExecutor<S, SD>
746where
747    S: StateStore,
748    SD: ValueRowSerde,
749{
750    fn execute(self: Box<Self>) -> BoxedMessageStream {
751        self.execute_inner().boxed()
752    }
753}