risingwave_stream/executor/backfill/arrangement_backfill.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use either::Either;
18use futures::stream::{select_all, select_with_strategy};
19use futures::{TryStreamExt, stream};
20use itertools::Itertools;
21use risingwave_common::array::{DataChunk, Op};
22use risingwave_common::bail;
23use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
26use risingwave_storage::row_serde::value_serde::ValueRowSerde;
27use risingwave_storage::store::PrefetchOptions;
28
29use crate::common::table::state_table::ReplicatedStateTable;
30#[cfg(debug_assertions)]
31use crate::executor::backfill::utils::METADATA_STATE_LEN;
32use crate::executor::backfill::utils::{
33 BackfillProgressPerVnode, BackfillState, compute_bounds, create_builder,
34 get_progress_per_vnode, mapping_chunk, mapping_message, mark_chunk_ref_by_vnode,
35 persist_state_per_vnode, update_pos_by_vnode,
36};
37use crate::executor::prelude::*;
38use crate::task::{CreateMviewProgressReporter, FragmentId};
39
40type Builders = HashMap<VirtualNode, DataChunkBuilder>;
41
42/// Similar to [`super::no_shuffle_backfill::BackfillExecutor`].
43/// Main differences:
44/// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled
45/// independently.
46/// - To synchronize upstream shared buffer, it is initialized with a [`ReplicatedStateTable`].
47pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
48 /// Upstream table
49 upstream_table: ReplicatedStateTable<S, SD>,
50
51 /// Upstream with the same schema with the upstream table.
52 upstream: Executor,
53
54 /// Internal state table for persisting state of backfill state.
55 state_table: StateTable<S>,
56
57 /// The column indices need to be forwarded to the downstream from the upstream and table scan.
58 output_indices: Vec<usize>,
59
60 progress: CreateMviewProgressReporter,
61
62 actor_id: ActorId,
63
64 metrics: Arc<StreamingMetrics>,
65
66 chunk_size: usize,
67
68 rate_limiter: MonitoredRateLimiter,
69
70 /// Fragment id of the fragment this backfill node belongs to.
71 fragment_id: FragmentId,
72}
73
74impl<S, SD> ArrangementBackfillExecutor<S, SD>
75where
76 S: StateStore,
77 SD: ValueRowSerde,
78{
79 #[allow(clippy::too_many_arguments)]
80 #[allow(dead_code)]
81 pub fn new(
82 upstream_table: ReplicatedStateTable<S, SD>,
83 upstream: Executor,
84 state_table: StateTable<S>,
85 output_indices: Vec<usize>,
86 progress: CreateMviewProgressReporter,
87 metrics: Arc<StreamingMetrics>,
88 chunk_size: usize,
89 rate_limit: RateLimit,
90 fragment_id: FragmentId,
91 ) -> Self {
92 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
93 Self {
94 upstream_table,
95 upstream,
96 state_table,
97 output_indices,
98 actor_id: progress.actor_id(),
99 progress,
100 metrics,
101 chunk_size,
102 rate_limiter,
103 fragment_id,
104 }
105 }
106
107 #[try_stream(ok = Message, error = StreamExecutorError)]
108 async fn execute_inner(mut self) {
109 tracing::debug!("backfill executor started");
110 // The primary key columns, in the output columns of the upstream_table scan.
111 // Table scan scans a subset of the columns of the upstream table.
112 let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
113 #[cfg(debug_assertions)]
114 let state_len = self.upstream_table.pk_indices().len() + METADATA_STATE_LEN;
115 let pk_order = self.upstream_table.pk_serde().get_order_types().to_vec();
116 let upstream_table_id = self.upstream_table.table_id();
117 let mut upstream_table = self.upstream_table;
118 let vnodes = upstream_table.vnodes().clone();
119
120 // These builders will build data chunks.
121 // We must supply them with the full datatypes which correspond to
122 // pk + output_indices.
123 let snapshot_data_types = self
124 .upstream
125 .schema()
126 .fields()
127 .iter()
128 .map(|field| field.data_type.clone())
129 .collect_vec();
130 let mut builders: Builders = upstream_table
131 .vnodes()
132 .iter_vnodes()
133 .map(|vnode| {
134 let builder = create_builder(
135 self.rate_limiter.rate_limit(),
136 self.chunk_size,
137 snapshot_data_types.clone(),
138 );
139 (vnode, builder)
140 })
141 .collect();
142
143 let mut upstream = self.upstream.execute();
144
145 // Poll the upstream to get the first barrier.
146 let first_barrier = expect_first_barrier(&mut upstream).await?;
147 let mut global_pause = first_barrier.is_pause_on_startup();
148 let mut backfill_paused = first_barrier.is_backfill_pause_on_startup(self.fragment_id);
149 let first_epoch = first_barrier.epoch;
150 let is_newly_added = first_barrier.is_newly_added(self.actor_id);
151 // The first barrier message should be propagated.
152 yield Message::Barrier(first_barrier);
153
154 self.state_table.init_epoch(first_epoch).await?;
155
156 let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?;
157
158 let is_completely_finished = progress_per_vnode.iter().all(|(_, p)| {
159 matches!(
160 p.current_state(),
161 &BackfillProgressPerVnode::Completed { .. }
162 )
163 });
164 if is_completely_finished {
165 assert!(!is_newly_added);
166 }
167
168 upstream_table.init_epoch(first_epoch).await?;
169
170 let mut backfill_state: BackfillState = progress_per_vnode.into();
171
172 let to_backfill = !is_completely_finished;
173
174 // If no need backfill, but state was still "unfinished" we need to finish it.
175 // So we just update the state + progress to meta at the next barrier to finish progress,
176 // and forward other messages.
177 //
178 // Reason for persisting on second barrier rather than first:
179 // We can't update meta with progress as finished until state_table
180 // has been updated.
181 // We also can't update state_table in first epoch, since state_table
182 // expects to have been initialized in previous epoch.
183
184 // The epoch used to snapshot read upstream mv.
185 let mut snapshot_read_epoch;
186
187 // Keep track of rows from the snapshot.
188 let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();
189
190 // Arrangement Backfill Algorithm:
191 //
192 // backfill_stream
193 // / \
194 // upstream snapshot
195 //
196 // We construct a backfill stream with upstream as its left input and mv snapshot read
197 // stream as its right input. When a chunk comes from upstream, we will buffer it.
198 //
199 // When a barrier comes from upstream:
200 // Immediately break out of backfill loop.
201 // - For each row of the upstream chunk buffer, compute vnode.
202 // - Get the `current_pos` corresponding to the vnode. Forward it to downstream if its pk
203 // <= `current_pos`, otherwise ignore it.
204 // - Flush all buffered upstream_chunks to replicated state table.
205 // - Update the `snapshot_read_epoch`.
206 // - Reconstruct the whole backfill stream with upstream and new mv snapshot read stream
207 // with the `snapshot_read_epoch`.
208 //
209 // When a chunk comes from snapshot, we forward it to the downstream and raise
210 // `current_pos`.
211 //
212 // When we reach the end of the snapshot read stream, it means backfill has been
213 // finished.
214 //
215 // Once the backfill loop ends, we forward the upstream directly to the downstream.
216 if to_backfill {
217 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
218 let mut pending_barrier: Option<Barrier> = None;
219
220 let metrics = self
221 .metrics
222 .new_backfill_metrics(upstream_table_id, self.actor_id);
223
224 'backfill_loop: loop {
225 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
226 let mut cur_barrier_upstream_processed_rows: u64 = 0;
227 let mut snapshot_read_complete = false;
228 let mut has_snapshot_read = false;
229
230 // NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be
231 // dropped. Then we can write to `upstream_table` on barrier in the
232 // next block.
233 {
234 let left_upstream = upstream.by_ref().map(Either::Left);
235
236 // Check if stream paused
237 let paused = global_pause
238 || backfill_paused
239 || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
240 // Create the snapshot stream
241 let right_snapshot = pin!(
242 Self::make_snapshot_stream(
243 &upstream_table,
244 backfill_state.clone(), // FIXME: Use mutable reference instead.
245 paused,
246 &self.rate_limiter,
247 )
248 .map(Either::Right)
249 );
250
251 // Prefer to select upstream, so we can stop snapshot stream as soon as the
252 // barrier comes.
253 let mut backfill_stream =
254 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
255 stream::PollNext::Left
256 });
257
258 #[for_await]
259 for either in &mut backfill_stream {
260 match either {
261 // Upstream
262 Either::Left(msg) => {
263 match msg? {
264 Message::Barrier(barrier) => {
265 // We have to process the barrier outside of the loop.
266 // This is because our state_table reference is still live
267 // here, we have to break the loop to drop it,
268 // so we can do replication of upstream state_table.
269 pending_barrier = Some(barrier);
270
271 // Break the for loop and start a new snapshot read stream.
272 break;
273 }
274 Message::Chunk(chunk) => {
275 // Buffer the upstream chunk.
276 upstream_chunk_buffer.push(chunk.compact());
277 }
278 Message::Watermark(_) => {
279 // Ignore watermark during backfill.
280 }
281 }
282 }
283 // Snapshot read
284 Either::Right(msg) => {
285 has_snapshot_read = true;
286 match msg? {
287 None => {
288 // Consume remaining rows in the builder.
289 for (vnode, builder) in &mut builders {
290 if let Some(data_chunk) = builder.consume_all() {
291 let chunk = Self::handle_snapshot_chunk(
292 data_chunk,
293 *vnode,
294 &pk_in_output_indices,
295 &mut backfill_state,
296 &mut cur_barrier_snapshot_processed_rows,
297 &mut total_snapshot_processed_rows,
298 &self.output_indices,
299 )?;
300 tracing::trace!(
301 source = "snapshot",
302 state = "finish_backfill_stream",
303 action = "drain_snapshot_buffers",
304 ?vnode,
305 "{:#?}",
306 chunk,
307 );
308 yield Message::Chunk(chunk);
309 }
310 }
311
312 // End of the snapshot read stream.
313 // We should not mark the chunk anymore,
314 // otherwise, we will ignore some rows
315 // in the buffer. Here we choose to never mark the chunk.
316 // Consume with the renaming stream buffer chunk without
317 // mark.
318 for chunk in upstream_chunk_buffer.drain(..) {
319 let chunk_cardinality = chunk.cardinality() as u64;
320 cur_barrier_upstream_processed_rows +=
321 chunk_cardinality;
322 let chunk = mapping_chunk(chunk, &self.output_indices);
323 tracing::trace!(
324 source = "upstream",
325 state = "finish_backfill_stream",
326 action = "drain_upstream_buffer",
327 "{:#?}",
328 chunk,
329 );
330 yield Message::Chunk(chunk);
331 }
332 metrics
333 .backfill_snapshot_read_row_count
334 .inc_by(cur_barrier_snapshot_processed_rows);
335 metrics
336 .backfill_upstream_output_row_count
337 .inc_by(cur_barrier_upstream_processed_rows);
338 break 'backfill_loop;
339 }
340 Some((vnode, row)) => {
341 let builder = builders.get_mut(&vnode).unwrap();
342 if let Some(chunk) = builder.append_one_row(row) {
343 let chunk = Self::handle_snapshot_chunk(
344 chunk,
345 vnode,
346 &pk_in_output_indices,
347 &mut backfill_state,
348 &mut cur_barrier_snapshot_processed_rows,
349 &mut total_snapshot_processed_rows,
350 &self.output_indices,
351 )?;
352 tracing::trace!(
353 source = "snapshot",
354 state = "process_backfill_stream",
355 action = "drain_full_snapshot_buffer",
356 "{:#?}",
357 chunk,
358 );
359 yield Message::Chunk(chunk);
360 }
361 }
362 }
363 }
364 }
365 }
366
367 // Before processing barrier, if did not snapshot read,
368 // do a snapshot read first.
369 // This is so we don't lose the tombstone iteration progress.
370 // Or if s3 read latency is high, we don't fail to read from s3.
371 //
372 // If paused, we can't read any snapshot records, skip this.
373 //
374 // If rate limit is set, respect the rate limit, check if we can read,
375 // If we can't, skip it. If no rate limit set, we can read.
376 let rate_limit_ready = self.rate_limiter.check(1).is_ok();
377 if !has_snapshot_read && !paused && rate_limit_ready {
378 debug_assert!(builders.values().all(|b| b.is_empty()));
379 let (_, snapshot) = backfill_stream.into_inner();
380 #[for_await]
381 for msg in snapshot {
382 let Either::Right(msg) = msg else {
383 bail!("BUG: snapshot_read contains upstream messages");
384 };
385 match msg? {
386 None => {
387 // End of the snapshot read stream.
388 // We let the barrier handling logic take care of upstream updates.
389 // But we still want to exit backfill loop, so we mark snapshot read complete.
390 snapshot_read_complete = true;
391 break;
392 }
393 Some((vnode, row)) => {
394 let builder = builders.get_mut(&vnode).unwrap();
395 if let Some(chunk) = builder.append_one_row(row) {
396 let chunk = Self::handle_snapshot_chunk(
397 chunk,
398 vnode,
399 &pk_in_output_indices,
400 &mut backfill_state,
401 &mut cur_barrier_snapshot_processed_rows,
402 &mut total_snapshot_processed_rows,
403 &self.output_indices,
404 )?;
405 tracing::trace!(
406 source = "snapshot",
407 state = "process_backfill_stream",
408 action = "snapshot_read_at_least_one",
409 "{:#?}",
410 chunk,
411 );
412 yield Message::Chunk(chunk);
413 }
414
415 break;
416 }
417 }
418 }
419 }
420 }
421
422 // Process barrier
423 // When we break out of inner backfill_stream loop, it means we have a barrier.
424 // If there are no updates and there are no snapshots left,
425 // we already finished backfill and should have exited the outer backfill loop.
426 let barrier = match pending_barrier.take() {
427 Some(barrier) => barrier,
428 None => bail!("BUG: current_backfill loop exited without a barrier"),
429 };
430
431 // Process barrier:
432 // - consume snapshot rows left in builder.
433 // - consume upstream buffer chunk
434 // - handle mutations
435 // - switch snapshot
436
437 // consume snapshot rows left in builder.
438 // NOTE(kwannoel): `zip_eq_debug` does not work here,
439 // we encounter "higher-ranked lifetime error".
440 for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| {
441 let chunk = b.consume_all().map(|chunk| {
442 let ops = vec![Op::Insert; chunk.capacity()];
443 StreamChunk::from_parts(ops, chunk)
444 });
445 (vnode, chunk)
446 }) {
447 if let Some(chunk) = chunk {
448 let chunk_cardinality = chunk.cardinality() as u64;
449 // Raise the current position.
450 // As snapshot read streams are ordered by pk, so we can
451 // just use the last row to update `current_pos`.
452 update_pos_by_vnode(
453 *vnode,
454 &chunk,
455 &pk_in_output_indices,
456 &mut backfill_state,
457 chunk_cardinality,
458 )?;
459
460 cur_barrier_snapshot_processed_rows += chunk_cardinality;
461 total_snapshot_processed_rows += chunk_cardinality;
462 let chunk = mapping_chunk(chunk, &self.output_indices);
463 tracing::trace!(
464 source = "snapshot",
465 state = "process_barrier",
466 action = "consume_remaining_snapshot",
467 "{:#?}",
468 chunk,
469 );
470 yield Message::Chunk(chunk);
471 }
472 }
473
474 // consume upstream buffer chunk
475 for chunk in upstream_chunk_buffer.drain(..) {
476 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
477 // FIXME: Replace with `snapshot_is_processed`
478 // Flush downstream.
479 // If no current_pos, means no snapshot processed yet.
480 // Also means we don't need propagate any updates <= current_pos.
481 if backfill_state.has_progress() {
482 let chunk = mapping_chunk(
483 mark_chunk_ref_by_vnode(
484 &chunk,
485 &backfill_state,
486 &pk_in_output_indices,
487 &upstream_table,
488 &pk_order,
489 )?,
490 &self.output_indices,
491 );
492 tracing::trace!(
493 source = "upstream",
494 state = "process_barrier",
495 action = "consume_remaining_upstream",
496 "{:#?}",
497 chunk,
498 );
499 yield Message::Chunk(chunk);
500 }
501
502 // Replicate
503 upstream_table.write_chunk(chunk);
504 }
505
506 upstream_table
507 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
508 .await?;
509
510 metrics
511 .backfill_snapshot_read_row_count
512 .inc_by(cur_barrier_snapshot_processed_rows);
513 metrics
514 .backfill_upstream_output_row_count
515 .inc_by(cur_barrier_upstream_processed_rows);
516
517 // Update snapshot read epoch.
518 snapshot_read_epoch = barrier.epoch.prev;
519
520 // TODO(kwannoel): Not sure if this holds for arrangement backfill.
521 // May need to revisit it.
522 // Need to check it after scale-in / scale-out.
523 self.progress.update(
524 barrier.epoch,
525 snapshot_read_epoch,
526 total_snapshot_processed_rows,
527 );
528
529 // Persist state on barrier
530 persist_state_per_vnode(
531 barrier.epoch,
532 &mut self.state_table,
533 &mut backfill_state,
534 #[cfg(debug_assertions)]
535 state_len,
536 vnodes.iter_vnodes(),
537 )
538 .await?;
539
540 tracing::trace!(
541 barrier = ?barrier,
542 "barrier persisted"
543 );
544
545 // handle mutations
546 if let Some(mutation) = barrier.mutation.as_deref() {
547 use crate::executor::Mutation;
548 match mutation {
549 Mutation::Pause => {
550 global_pause = true;
551 }
552 Mutation::Resume => {
553 global_pause = false;
554 }
555 Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
556 if fragment_ids.contains(&self.fragment_id) {
557 backfill_paused = false;
558 }
559 }
560 Mutation::Throttle(actor_to_apply) => {
561 let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
562 if let Some(new_rate_limit) = new_rate_limit_entry {
563 let new_rate_limit = (*new_rate_limit).into();
564 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
565 if old_rate_limit != new_rate_limit {
566 tracing::info!(
567 old_rate_limit = ?old_rate_limit,
568 new_rate_limit = ?new_rate_limit,
569 upstream_table_id = upstream_table_id,
570 actor_id = self.actor_id,
571 "backfill rate limit changed",
572 );
573 builders = upstream_table
574 .vnodes()
575 .iter_vnodes()
576 .map(|vnode| {
577 let builder = create_builder(
578 new_rate_limit,
579 self.chunk_size,
580 snapshot_data_types.clone(),
581 );
582 (vnode, builder)
583 })
584 .collect();
585 }
586 }
587 }
588 _ => {}
589 }
590 }
591
592 yield Message::Barrier(barrier);
593
594 // We will switch snapshot at the start of the next iteration of the backfill loop.
595 // Unless snapshot read is already completed.
596 if snapshot_read_complete {
597 break 'backfill_loop;
598 }
599 }
600 }
601
602 tracing::debug!("snapshot read finished, wait to commit state on next barrier");
603
604 // Update our progress as finished in state table.
605
606 // Wait for first barrier to come after backfill is finished.
607 // So we can update our progress + persist the status.
608 while let Some(Ok(msg)) = upstream.next().await {
609 if let Some(msg) = mapping_message(msg, &self.output_indices) {
610 // If not finished then we need to update state, otherwise no need.
611 if let Message::Barrier(barrier) = &msg {
612 if is_completely_finished {
613 // If already finished, no need to persist any state. But we need to advance the epoch anyway
614 self.state_table
615 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
616 .await?;
617 } else {
618 // If snapshot was empty, we do not need to backfill,
619 // but we still need to persist the finished state.
620 // We currently persist it on the second barrier here rather than first.
621 // This is because we can't update state table in first epoch,
622 // since it expects to have been initialized in previous epoch
623 // (there's no epoch before the first epoch).
624 for vnode in upstream_table.vnodes().iter_vnodes() {
625 backfill_state
626 .finish_progress(vnode, upstream_table.pk_indices().len());
627 }
628
629 persist_state_per_vnode(
630 barrier.epoch,
631 &mut self.state_table,
632 &mut backfill_state,
633 #[cfg(debug_assertions)]
634 state_len,
635 vnodes.iter_vnodes(),
636 )
637 .await?;
638 }
639
640 self.progress
641 .finish(barrier.epoch, total_snapshot_processed_rows);
642 yield msg;
643 break;
644 }
645 // Allow other messages to pass through.
646 // We won't yield twice here, since if there's a barrier,
647 // we will always break out of the loop.
648 yield msg;
649 }
650 }
651
652 tracing::debug!("backfill finished");
653
654 // After progress finished + state persisted,
655 // we can forward messages directly to the downstream,
656 // as backfill is finished.
657 #[for_await]
658 for msg in upstream {
659 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
660 if let Message::Barrier(barrier) = &msg {
661 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
662 self.state_table
663 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
664 .await?;
665 }
666 yield msg;
667 }
668 }
669 }
670
671 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
672 async fn make_snapshot_stream<'a>(
673 upstream_table: &'a ReplicatedStateTable<S, SD>,
674 backfill_state: BackfillState,
675 paused: bool,
676 rate_limiter: &'a MonitoredRateLimiter,
677 ) {
678 if paused {
679 #[for_await]
680 for _ in tokio_stream::pending() {
681 bail!("BUG: paused stream should not yield");
682 }
683 } else {
684 // Checked the rate limit is not zero.
685 #[for_await]
686 for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
687 let r = r?;
688 rate_limiter.wait(1).await;
689 yield r;
690 }
691 }
692 }
693
694 fn handle_snapshot_chunk(
695 chunk: DataChunk,
696 vnode: VirtualNode,
697 pk_in_output_indices: &[usize],
698 backfill_state: &mut BackfillState,
699 cur_barrier_snapshot_processed_rows: &mut u64,
700 total_snapshot_processed_rows: &mut u64,
701 output_indices: &[usize],
702 ) -> StreamExecutorResult<StreamChunk> {
703 let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
704 // Raise the current position.
705 // As snapshot read streams are ordered by pk, so we can
706 // just use the last row to update `current_pos`.
707 let snapshot_row_count_delta = chunk.cardinality() as u64;
708 update_pos_by_vnode(
709 vnode,
710 &chunk,
711 pk_in_output_indices,
712 backfill_state,
713 snapshot_row_count_delta,
714 )?;
715
716 let chunk_cardinality = chunk.cardinality() as u64;
717 *cur_barrier_snapshot_processed_rows += chunk_cardinality;
718 *total_snapshot_processed_rows += chunk_cardinality;
719 Ok(mapping_chunk(chunk, output_indices))
720 }
721
722 /// Read snapshot per vnode.
723 /// These streams should be sorted in storage layer.
724 /// 1. Get row iterator / vnode.
725 /// 2. Merge it with `select_all`.
726 /// 3. Change it into a chunk iterator with `iter_chunks`.
727 /// This means it should fetch a row from each iterator to form a chunk.
728 ///
729 /// We interleave at chunk per vnode level rather than rows.
730 /// This is so that we can compute `current_pos` once per chunk, since they correspond to 1
731 /// vnode.
732 ///
733 /// The stream contains pairs of `(VirtualNode, StreamChunk)`.
734 /// The `VirtualNode` is the vnode that the chunk belongs to.
735 /// The `StreamChunk` is the chunk that contains the rows from the vnode.
736 /// If it's `None`, it means the vnode has no more rows for this snapshot read.
737 ///
738 /// The `snapshot_read_epoch` is supplied as a parameter for `state_table`.
739 /// It is required to ensure we read a fully-checkpointed snapshot the **first time**.
740 ///
741 /// The rows from upstream snapshot read will be buffered inside the `builder`.
742 /// If snapshot is dropped before its rows are consumed,
743 /// remaining data in `builder` must be flushed manually.
744 /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
745 /// present, Then when we flush we contain duplicate rows.
746 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
747 async fn snapshot_read_per_vnode(
748 upstream_table: &ReplicatedStateTable<S, SD>,
749 backfill_state: BackfillState,
750 ) {
751 let mut iterators = vec![];
752 for vnode in upstream_table.vnodes().iter_vnodes() {
753 let backfill_progress = backfill_state.get_progress(&vnode)?;
754 let current_pos = match backfill_progress {
755 BackfillProgressPerVnode::NotStarted => None,
756 BackfillProgressPerVnode::Completed { .. } => {
757 continue;
758 }
759 BackfillProgressPerVnode::InProgress { current_pos, .. } => {
760 Some(current_pos.clone())
761 }
762 };
763
764 let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
765 if range_bounds.is_none() {
766 continue;
767 }
768 let range_bounds = range_bounds.unwrap();
769
770 tracing::trace!(
771 vnode = ?vnode,
772 current_pos = ?current_pos,
773 range_bounds = ?range_bounds,
774 "iter_with_vnode_and_output_indices"
775 );
776 let vnode_row_iter = upstream_table
777 .iter_with_vnode_and_output_indices(
778 vnode,
779 &range_bounds,
780 PrefetchOptions::prefetch_for_small_range_scan(),
781 )
782 .await?;
783
784 let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));
785
786 let vnode_row_iter = Box::pin(vnode_row_iter);
787
788 iterators.push(vnode_row_iter);
789 }
790
791 // TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial.
792 let vnode_row_iter = select_all(iterators);
793
794 #[for_await]
795 for vnode_and_row in vnode_row_iter {
796 yield Some(vnode_and_row?);
797 }
798 yield None;
799 return Ok(());
800 }
801}
802
803impl<S, SD> Execute for ArrangementBackfillExecutor<S, SD>
804where
805 S: StateStore,
806 SD: ValueRowSerde,
807{
808 fn execute(self: Box<Self>) -> BoxedMessageStream {
809 self.execute_inner().boxed()
810 }
811}