risingwave_stream/executor/backfill/arrangement_backfill.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use either::Either;
18use futures::stream::{select_all, select_with_strategy};
19use futures::{TryStreamExt, stream};
20use itertools::Itertools;
21use risingwave_common::array::{DataChunk, Op};
22use risingwave_common::bail;
23use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
26use risingwave_storage::row_serde::value_serde::ValueRowSerde;
27use risingwave_storage::store::PrefetchOptions;
28
29use crate::common::table::state_table::ReplicatedStateTable;
30#[cfg(debug_assertions)]
31use crate::executor::backfill::utils::METADATA_STATE_LEN;
32use crate::executor::backfill::utils::{
33 BackfillProgressPerVnode, BackfillState, compute_bounds, create_builder,
34 get_progress_per_vnode, mapping_chunk, mapping_message, mark_chunk_ref_by_vnode,
35 persist_state_per_vnode, update_pos_by_vnode,
36};
37use crate::executor::prelude::*;
38use crate::task::CreateMviewProgressReporter;
39
40type Builders = HashMap<VirtualNode, DataChunkBuilder>;
41
42/// Similar to [`super::no_shuffle_backfill::BackfillExecutor`].
43/// Main differences:
44/// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled
45/// independently.
46/// - To synchronize upstream shared buffer, it is initialized with a [`ReplicatedStateTable`].
47pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
48 /// Upstream table
49 upstream_table: ReplicatedStateTable<S, SD>,
50
51 /// Upstream with the same schema with the upstream table.
52 upstream: Executor,
53
54 /// Internal state table for persisting state of backfill state.
55 state_table: StateTable<S>,
56
57 /// The column indices need to be forwarded to the downstream from the upstream and table scan.
58 output_indices: Vec<usize>,
59
60 progress: CreateMviewProgressReporter,
61
62 actor_id: ActorId,
63
64 metrics: Arc<StreamingMetrics>,
65
66 chunk_size: usize,
67
68 rate_limiter: MonitoredRateLimiter,
69}
70
71impl<S, SD> ArrangementBackfillExecutor<S, SD>
72where
73 S: StateStore,
74 SD: ValueRowSerde,
75{
76 #[allow(clippy::too_many_arguments)]
77 #[allow(dead_code)]
78 pub fn new(
79 upstream_table: ReplicatedStateTable<S, SD>,
80 upstream: Executor,
81 state_table: StateTable<S>,
82 output_indices: Vec<usize>,
83 progress: CreateMviewProgressReporter,
84 metrics: Arc<StreamingMetrics>,
85 chunk_size: usize,
86 rate_limit: RateLimit,
87 ) -> Self {
88 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
89 Self {
90 upstream_table,
91 upstream,
92 state_table,
93 output_indices,
94 actor_id: progress.actor_id(),
95 progress,
96 metrics,
97 chunk_size,
98 rate_limiter,
99 }
100 }
101
102 #[try_stream(ok = Message, error = StreamExecutorError)]
103 async fn execute_inner(mut self) {
104 tracing::debug!("backfill executor started");
105 // The primary key columns, in the output columns of the upstream_table scan.
106 // Table scan scans a subset of the columns of the upstream table.
107 let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
108 #[cfg(debug_assertions)]
109 let state_len = self.upstream_table.pk_indices().len() + METADATA_STATE_LEN;
110 let pk_order = self.upstream_table.pk_serde().get_order_types().to_vec();
111 let upstream_table_id = self.upstream_table.table_id();
112 let mut upstream_table = self.upstream_table;
113 let vnodes = upstream_table.vnodes().clone();
114
115 // These builders will build data chunks.
116 // We must supply them with the full datatypes which correspond to
117 // pk + output_indices.
118 let snapshot_data_types = self
119 .upstream
120 .schema()
121 .fields()
122 .iter()
123 .map(|field| field.data_type.clone())
124 .collect_vec();
125 let mut builders: Builders = upstream_table
126 .vnodes()
127 .iter_vnodes()
128 .map(|vnode| {
129 let builder = create_builder(
130 self.rate_limiter.rate_limit(),
131 self.chunk_size,
132 snapshot_data_types.clone(),
133 );
134 (vnode, builder)
135 })
136 .collect();
137
138 let mut upstream = self.upstream.execute();
139
140 // Poll the upstream to get the first barrier.
141 let first_barrier = expect_first_barrier(&mut upstream).await?;
142 let mut paused = first_barrier.is_pause_on_startup();
143 let first_epoch = first_barrier.epoch;
144 let is_newly_added = first_barrier.is_newly_added(self.actor_id);
145 // The first barrier message should be propagated.
146 yield Message::Barrier(first_barrier);
147
148 self.state_table.init_epoch(first_epoch).await?;
149
150 let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?;
151
152 let is_completely_finished = progress_per_vnode.iter().all(|(_, p)| {
153 matches!(
154 p.current_state(),
155 &BackfillProgressPerVnode::Completed { .. }
156 )
157 });
158 if is_completely_finished {
159 assert!(!is_newly_added);
160 }
161
162 upstream_table.init_epoch(first_epoch).await?;
163
164 let mut backfill_state: BackfillState = progress_per_vnode.into();
165
166 let to_backfill = !is_completely_finished;
167
168 // If no need backfill, but state was still "unfinished" we need to finish it.
169 // So we just update the state + progress to meta at the next barrier to finish progress,
170 // and forward other messages.
171 //
172 // Reason for persisting on second barrier rather than first:
173 // We can't update meta with progress as finished until state_table
174 // has been updated.
175 // We also can't update state_table in first epoch, since state_table
176 // expects to have been initialized in previous epoch.
177
178 // The epoch used to snapshot read upstream mv.
179 let mut snapshot_read_epoch;
180
181 // Keep track of rows from the snapshot.
182 let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();
183
184 // Arrangement Backfill Algorithm:
185 //
186 // backfill_stream
187 // / \
188 // upstream snapshot
189 //
190 // We construct a backfill stream with upstream as its left input and mv snapshot read
191 // stream as its right input. When a chunk comes from upstream, we will buffer it.
192 //
193 // When a barrier comes from upstream:
194 // Immediately break out of backfill loop.
195 // - For each row of the upstream chunk buffer, compute vnode.
196 // - Get the `current_pos` corresponding to the vnode. Forward it to downstream if its pk
197 // <= `current_pos`, otherwise ignore it.
198 // - Flush all buffered upstream_chunks to replicated state table.
199 // - Update the `snapshot_read_epoch`.
200 // - Reconstruct the whole backfill stream with upstream and new mv snapshot read stream
201 // with the `snapshot_read_epoch`.
202 //
203 // When a chunk comes from snapshot, we forward it to the downstream and raise
204 // `current_pos`.
205 //
206 // When we reach the end of the snapshot read stream, it means backfill has been
207 // finished.
208 //
209 // Once the backfill loop ends, we forward the upstream directly to the downstream.
210 if to_backfill {
211 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
212 let mut pending_barrier: Option<Barrier> = None;
213
214 let metrics = self
215 .metrics
216 .new_backfill_metrics(upstream_table_id, self.actor_id);
217
218 'backfill_loop: loop {
219 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
220 let mut cur_barrier_upstream_processed_rows: u64 = 0;
221 let mut snapshot_read_complete = false;
222 let mut has_snapshot_read = false;
223
224 // NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be
225 // dropped. Then we can write to `upstream_table` on barrier in the
226 // next block.
227 {
228 let left_upstream = upstream.by_ref().map(Either::Left);
229
230 // Check if stream paused
231 let paused =
232 paused || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
233 // Create the snapshot stream
234 let right_snapshot = pin!(
235 Self::make_snapshot_stream(
236 &upstream_table,
237 backfill_state.clone(), // FIXME: Use mutable reference instead.
238 paused,
239 &self.rate_limiter,
240 )
241 .map(Either::Right)
242 );
243
244 // Prefer to select upstream, so we can stop snapshot stream as soon as the
245 // barrier comes.
246 let mut backfill_stream =
247 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
248 stream::PollNext::Left
249 });
250
251 #[for_await]
252 for either in &mut backfill_stream {
253 match either {
254 // Upstream
255 Either::Left(msg) => {
256 match msg? {
257 Message::Barrier(barrier) => {
258 // We have to process the barrier outside of the loop.
259 // This is because our state_table reference is still live
260 // here, we have to break the loop to drop it,
261 // so we can do replication of upstream state_table.
262 pending_barrier = Some(barrier);
263
264 // Break the for loop and start a new snapshot read stream.
265 break;
266 }
267 Message::Chunk(chunk) => {
268 // Buffer the upstream chunk.
269 upstream_chunk_buffer.push(chunk.compact());
270 }
271 Message::Watermark(_) => {
272 // Ignore watermark during backfill.
273 }
274 }
275 }
276 // Snapshot read
277 Either::Right(msg) => {
278 has_snapshot_read = true;
279 match msg? {
280 None => {
281 // Consume remaining rows in the builder.
282 for (vnode, builder) in &mut builders {
283 if let Some(data_chunk) = builder.consume_all() {
284 yield Message::Chunk(Self::handle_snapshot_chunk(
285 data_chunk,
286 *vnode,
287 &pk_in_output_indices,
288 &mut backfill_state,
289 &mut cur_barrier_snapshot_processed_rows,
290 &mut total_snapshot_processed_rows,
291 &self.output_indices,
292 )?);
293 }
294 }
295
296 // End of the snapshot read stream.
297 // We should not mark the chunk anymore,
298 // otherwise, we will ignore some rows
299 // in the buffer. Here we choose to never mark the chunk.
300 // Consume with the renaming stream buffer chunk without
301 // mark.
302 for chunk in upstream_chunk_buffer.drain(..) {
303 let chunk_cardinality = chunk.cardinality() as u64;
304 cur_barrier_upstream_processed_rows +=
305 chunk_cardinality;
306 yield Message::Chunk(mapping_chunk(
307 chunk,
308 &self.output_indices,
309 ));
310 }
311 metrics
312 .backfill_snapshot_read_row_count
313 .inc_by(cur_barrier_snapshot_processed_rows);
314 metrics
315 .backfill_upstream_output_row_count
316 .inc_by(cur_barrier_upstream_processed_rows);
317 break 'backfill_loop;
318 }
319 Some((vnode, row)) => {
320 let builder = builders.get_mut(&vnode).unwrap();
321 if let Some(chunk) = builder.append_one_row(row) {
322 yield Message::Chunk(Self::handle_snapshot_chunk(
323 chunk,
324 vnode,
325 &pk_in_output_indices,
326 &mut backfill_state,
327 &mut cur_barrier_snapshot_processed_rows,
328 &mut total_snapshot_processed_rows,
329 &self.output_indices,
330 )?);
331 }
332 }
333 }
334 }
335 }
336 }
337
338 // Before processing barrier, if did not snapshot read,
339 // do a snapshot read first.
340 // This is so we don't lose the tombstone iteration progress.
341 // Or if s3 read latency is high, we don't fail to read from s3.
342 //
343 // If paused, we can't read any snapshot records, skip this.
344 //
345 // If rate limit is set, respect the rate limit, check if we can read,
346 // If we can't, skip it. If no rate limit set, we can read.
347 let rate_limit_ready = self.rate_limiter.check(1).is_ok();
348 if !has_snapshot_read && !paused && rate_limit_ready {
349 debug_assert!(builders.values().all(|b| b.is_empty()));
350 let (_, snapshot) = backfill_stream.into_inner();
351 #[for_await]
352 for msg in snapshot {
353 let Either::Right(msg) = msg else {
354 bail!("BUG: snapshot_read contains upstream messages");
355 };
356 match msg? {
357 None => {
358 // End of the snapshot read stream.
359 // We let the barrier handling logic take care of upstream updates.
360 // But we still want to exit backfill loop, so we mark snapshot read complete.
361 snapshot_read_complete = true;
362 break;
363 }
364 Some((vnode, row)) => {
365 let builder = builders.get_mut(&vnode).unwrap();
366 if let Some(chunk) = builder.append_one_row(row) {
367 yield Message::Chunk(Self::handle_snapshot_chunk(
368 chunk,
369 vnode,
370 &pk_in_output_indices,
371 &mut backfill_state,
372 &mut cur_barrier_snapshot_processed_rows,
373 &mut total_snapshot_processed_rows,
374 &self.output_indices,
375 )?);
376 }
377
378 break;
379 }
380 }
381 }
382 }
383 }
384
385 // Process barrier
386 // When we break out of inner backfill_stream loop, it means we have a barrier.
387 // If there are no updates and there are no snapshots left,
388 // we already finished backfill and should have exited the outer backfill loop.
389 let barrier = match pending_barrier.take() {
390 Some(barrier) => barrier,
391 None => bail!("BUG: current_backfill loop exited without a barrier"),
392 };
393
394 // Process barrier:
395 // - consume snapshot rows left in builder.
396 // - consume upstream buffer chunk
397 // - handle mutations
398 // - switch snapshot
399
400 // consume snapshot rows left in builder.
401 // NOTE(kwannoel): `zip_eq_debug` does not work here,
402 // we encounter "higher-ranked lifetime error".
403 for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| {
404 let chunk = b.consume_all().map(|chunk| {
405 let ops = vec![Op::Insert; chunk.capacity()];
406 StreamChunk::from_parts(ops, chunk)
407 });
408 (vnode, chunk)
409 }) {
410 if let Some(chunk) = chunk {
411 let chunk_cardinality = chunk.cardinality() as u64;
412 // Raise the current position.
413 // As snapshot read streams are ordered by pk, so we can
414 // just use the last row to update `current_pos`.
415 update_pos_by_vnode(
416 *vnode,
417 &chunk,
418 &pk_in_output_indices,
419 &mut backfill_state,
420 chunk_cardinality,
421 )?;
422
423 cur_barrier_snapshot_processed_rows += chunk_cardinality;
424 total_snapshot_processed_rows += chunk_cardinality;
425 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
426 }
427 }
428
429 // consume upstream buffer chunk
430 for chunk in upstream_chunk_buffer.drain(..) {
431 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
432 // FIXME: Replace with `snapshot_is_processed`
433 // Flush downstream.
434 // If no current_pos, means no snapshot processed yet.
435 // Also means we don't need propagate any updates <= current_pos.
436 if backfill_state.has_progress() {
437 yield Message::Chunk(mapping_chunk(
438 mark_chunk_ref_by_vnode(
439 &chunk,
440 &backfill_state,
441 &pk_in_output_indices,
442 &upstream_table,
443 &pk_order,
444 )?,
445 &self.output_indices,
446 ));
447 }
448
449 // Replicate
450 upstream_table.write_chunk(chunk);
451 }
452
453 upstream_table
454 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
455 .await?;
456
457 metrics
458 .backfill_snapshot_read_row_count
459 .inc_by(cur_barrier_snapshot_processed_rows);
460 metrics
461 .backfill_upstream_output_row_count
462 .inc_by(cur_barrier_upstream_processed_rows);
463
464 // Update snapshot read epoch.
465 snapshot_read_epoch = barrier.epoch.prev;
466
467 // TODO(kwannoel): Not sure if this holds for arrangement backfill.
468 // May need to revisit it.
469 // Need to check it after scale-in / scale-out.
470 self.progress.update(
471 barrier.epoch,
472 snapshot_read_epoch,
473 total_snapshot_processed_rows,
474 );
475
476 // Persist state on barrier
477 persist_state_per_vnode(
478 barrier.epoch,
479 &mut self.state_table,
480 &mut backfill_state,
481 #[cfg(debug_assertions)]
482 state_len,
483 vnodes.iter_vnodes(),
484 )
485 .await?;
486
487 tracing::trace!(
488 barrier = ?barrier,
489 "barrier persisted"
490 );
491
492 // handle mutations
493 if let Some(mutation) = barrier.mutation.as_deref() {
494 use crate::executor::Mutation;
495 match mutation {
496 Mutation::Pause => {
497 paused = true;
498 }
499 Mutation::Resume => {
500 paused = false;
501 }
502 Mutation::Throttle(actor_to_apply) => {
503 let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
504 if let Some(new_rate_limit) = new_rate_limit_entry {
505 let new_rate_limit = (*new_rate_limit).into();
506 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
507 if old_rate_limit != new_rate_limit {
508 tracing::info!(
509 old_rate_limit = ?old_rate_limit,
510 new_rate_limit = ?new_rate_limit,
511 upstream_table_id = upstream_table_id,
512 actor_id = self.actor_id,
513 "backfill rate limit changed",
514 );
515 builders = upstream_table
516 .vnodes()
517 .iter_vnodes()
518 .map(|vnode| {
519 let builder = create_builder(
520 new_rate_limit,
521 self.chunk_size,
522 snapshot_data_types.clone(),
523 );
524 (vnode, builder)
525 })
526 .collect();
527 }
528 }
529 }
530 _ => {}
531 }
532 }
533
534 yield Message::Barrier(barrier);
535
536 // We will switch snapshot at the start of the next iteration of the backfill loop.
537 // Unless snapshot read is already completed.
538 if snapshot_read_complete {
539 break 'backfill_loop;
540 }
541 }
542 }
543
544 tracing::debug!("snapshot read finished, wait to commit state on next barrier");
545
546 // Update our progress as finished in state table.
547
548 // Wait for first barrier to come after backfill is finished.
549 // So we can update our progress + persist the status.
550 while let Some(Ok(msg)) = upstream.next().await {
551 if let Some(msg) = mapping_message(msg, &self.output_indices) {
552 // If not finished then we need to update state, otherwise no need.
553 if let Message::Barrier(barrier) = &msg {
554 if is_completely_finished {
555 // If already finished, no need to persist any state. But we need to advance the epoch anyway
556 self.state_table
557 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
558 .await?;
559 } else {
560 // If snapshot was empty, we do not need to backfill,
561 // but we still need to persist the finished state.
562 // We currently persist it on the second barrier here rather than first.
563 // This is because we can't update state table in first epoch,
564 // since it expects to have been initialized in previous epoch
565 // (there's no epoch before the first epoch).
566 for vnode in upstream_table.vnodes().iter_vnodes() {
567 backfill_state
568 .finish_progress(vnode, upstream_table.pk_indices().len());
569 }
570
571 persist_state_per_vnode(
572 barrier.epoch,
573 &mut self.state_table,
574 &mut backfill_state,
575 #[cfg(debug_assertions)]
576 state_len,
577 vnodes.iter_vnodes(),
578 )
579 .await?;
580 }
581
582 self.progress
583 .finish(barrier.epoch, total_snapshot_processed_rows);
584 yield msg;
585 break;
586 }
587 // Allow other messages to pass through.
588 // We won't yield twice here, since if there's a barrier,
589 // we will always break out of the loop.
590 yield msg;
591 }
592 }
593
594 tracing::debug!("backfill finished");
595
596 // After progress finished + state persisted,
597 // we can forward messages directly to the downstream,
598 // as backfill is finished.
599 #[for_await]
600 for msg in upstream {
601 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
602 if let Message::Barrier(barrier) = &msg {
603 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
604 self.state_table
605 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
606 .await?;
607 }
608 yield msg;
609 }
610 }
611 }
612
613 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
614 async fn make_snapshot_stream<'a>(
615 upstream_table: &'a ReplicatedStateTable<S, SD>,
616 backfill_state: BackfillState,
617 paused: bool,
618 rate_limiter: &'a MonitoredRateLimiter,
619 ) {
620 if paused {
621 #[for_await]
622 for _ in tokio_stream::pending() {
623 bail!("BUG: paused stream should not yield");
624 }
625 } else {
626 // Checked the rate limit is not zero.
627 #[for_await]
628 for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
629 let r = r?;
630 rate_limiter.wait(1).await;
631 yield r;
632 }
633 }
634 }
635
636 fn handle_snapshot_chunk(
637 chunk: DataChunk,
638 vnode: VirtualNode,
639 pk_in_output_indices: &[usize],
640 backfill_state: &mut BackfillState,
641 cur_barrier_snapshot_processed_rows: &mut u64,
642 total_snapshot_processed_rows: &mut u64,
643 output_indices: &[usize],
644 ) -> StreamExecutorResult<StreamChunk> {
645 let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
646 // Raise the current position.
647 // As snapshot read streams are ordered by pk, so we can
648 // just use the last row to update `current_pos`.
649 let snapshot_row_count_delta = chunk.cardinality() as u64;
650 update_pos_by_vnode(
651 vnode,
652 &chunk,
653 pk_in_output_indices,
654 backfill_state,
655 snapshot_row_count_delta,
656 )?;
657
658 let chunk_cardinality = chunk.cardinality() as u64;
659 *cur_barrier_snapshot_processed_rows += chunk_cardinality;
660 *total_snapshot_processed_rows += chunk_cardinality;
661 Ok(mapping_chunk(chunk, output_indices))
662 }
663
664 /// Read snapshot per vnode.
665 /// These streams should be sorted in storage layer.
666 /// 1. Get row iterator / vnode.
667 /// 2. Merge it with `select_all`.
668 /// 3. Change it into a chunk iterator with `iter_chunks`.
669 /// This means it should fetch a row from each iterator to form a chunk.
670 ///
671 /// We interleave at chunk per vnode level rather than rows.
672 /// This is so that we can compute `current_pos` once per chunk, since they correspond to 1
673 /// vnode.
674 ///
675 /// The stream contains pairs of `(VirtualNode, StreamChunk)`.
676 /// The `VirtualNode` is the vnode that the chunk belongs to.
677 /// The `StreamChunk` is the chunk that contains the rows from the vnode.
678 /// If it's `None`, it means the vnode has no more rows for this snapshot read.
679 ///
680 /// The `snapshot_read_epoch` is supplied as a parameter for `state_table`.
681 /// It is required to ensure we read a fully-checkpointed snapshot the **first time**.
682 ///
683 /// The rows from upstream snapshot read will be buffered inside the `builder`.
684 /// If snapshot is dropped before its rows are consumed,
685 /// remaining data in `builder` must be flushed manually.
686 /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
687 /// present, Then when we flush we contain duplicate rows.
688 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
689 async fn snapshot_read_per_vnode(
690 upstream_table: &ReplicatedStateTable<S, SD>,
691 backfill_state: BackfillState,
692 ) {
693 let mut iterators = vec![];
694 for vnode in upstream_table.vnodes().iter_vnodes() {
695 let backfill_progress = backfill_state.get_progress(&vnode)?;
696 let current_pos = match backfill_progress {
697 BackfillProgressPerVnode::NotStarted => None,
698 BackfillProgressPerVnode::Completed { .. } => {
699 continue;
700 }
701 BackfillProgressPerVnode::InProgress { current_pos, .. } => {
702 Some(current_pos.clone())
703 }
704 };
705
706 let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
707 if range_bounds.is_none() {
708 continue;
709 }
710 let range_bounds = range_bounds.unwrap();
711
712 tracing::trace!(
713 vnode = ?vnode,
714 current_pos = ?current_pos,
715 range_bounds = ?range_bounds,
716 "iter_with_vnode_and_output_indices"
717 );
718 let vnode_row_iter = upstream_table
719 .iter_with_vnode_and_output_indices(
720 vnode,
721 &range_bounds,
722 PrefetchOptions::prefetch_for_small_range_scan(),
723 )
724 .await?;
725
726 let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));
727
728 let vnode_row_iter = Box::pin(vnode_row_iter);
729
730 iterators.push(vnode_row_iter);
731 }
732
733 // TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial.
734 let vnode_row_iter = select_all(iterators);
735
736 #[for_await]
737 for vnode_and_row in vnode_row_iter {
738 yield Some(vnode_and_row?);
739 }
740 yield None;
741 return Ok(());
742 }
743}
744
745impl<S, SD> Execute for ArrangementBackfillExecutor<S, SD>
746where
747 S: StateStore,
748 SD: ValueRowSerde,
749{
750 fn execute(self: Box<Self>) -> BoxedMessageStream {
751 self.execute_inner().boxed()
752 }
753}