risingwave_stream/executor/backfill/arrangement_backfill.rs
1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use either::Either;
18use futures::stream::{select_all, select_with_strategy};
19use futures::{TryStreamExt, stream};
20use itertools::Itertools;
21use risingwave_common::array::{DataChunk, Op};
22use risingwave_common::bail;
23use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
26use risingwave_pb::common::ThrottleType;
27use risingwave_storage::row_serde::value_serde::ValueRowSerde;
28use risingwave_storage::store::PrefetchOptions;
29
30use crate::common::table::state_table::ReplicatedStateTable;
31#[cfg(debug_assertions)]
32use crate::executor::backfill::utils::METADATA_STATE_LEN;
33use crate::executor::backfill::utils::{
34 BackfillProgressPerVnode, BackfillState, compute_bounds, create_builder,
35 get_progress_per_vnode, mapping_chunk, mapping_message, mark_chunk_ref_by_vnode,
36 persist_state_per_vnode, update_pos_by_vnode,
37};
38use crate::executor::prelude::*;
39use crate::task::{CreateMviewProgressReporter, FragmentId};
40
41type Builders = HashMap<VirtualNode, DataChunkBuilder>;
42
43/// Similar to [`super::no_shuffle_backfill::BackfillExecutor`].
44/// Main differences:
45/// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled
46/// independently.
47/// - To synchronize upstream shared buffer, it is initialized with a [`ReplicatedStateTable`].
48pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
49 /// Upstream table
50 upstream_table: ReplicatedStateTable<S, SD>,
51
52 /// Upstream with the same schema with the upstream table.
53 upstream: Executor,
54
55 /// Internal state table for persisting state of backfill state.
56 state_table: StateTable<S>,
57
58 /// The column indices need to be forwarded to the downstream from the upstream and table scan.
59 output_indices: Vec<usize>,
60
61 progress: CreateMviewProgressReporter,
62
63 actor_id: ActorId,
64
65 metrics: Arc<StreamingMetrics>,
66
67 chunk_size: usize,
68
69 rate_limiter: MonitoredRateLimiter,
70
71 /// Fragment id of the fragment this backfill node belongs to.
72 fragment_id: FragmentId,
73}
74
75impl<S, SD> ArrangementBackfillExecutor<S, SD>
76where
77 S: StateStore,
78 SD: ValueRowSerde,
79{
80 #[allow(clippy::too_many_arguments)]
81 #[allow(dead_code)]
82 pub fn new(
83 upstream_table: ReplicatedStateTable<S, SD>,
84 upstream: Executor,
85 state_table: StateTable<S>,
86 output_indices: Vec<usize>,
87 progress: CreateMviewProgressReporter,
88 metrics: Arc<StreamingMetrics>,
89 chunk_size: usize,
90 rate_limit: RateLimit,
91 fragment_id: FragmentId,
92 ) -> Self {
93 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
94 Self {
95 upstream_table,
96 upstream,
97 state_table,
98 output_indices,
99 actor_id: progress.actor_id(),
100 progress,
101 metrics,
102 chunk_size,
103 rate_limiter,
104 fragment_id,
105 }
106 }
107
108 #[try_stream(ok = Message, error = StreamExecutorError)]
109 async fn execute_inner(mut self) {
110 tracing::debug!("backfill executor started");
111 // The primary key columns, in the output columns of the upstream_table scan.
112 // Table scan scans a subset of the columns of the upstream table.
113 let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
114 #[cfg(debug_assertions)]
115 let state_len = self.upstream_table.pk_indices().len() + METADATA_STATE_LEN;
116 let pk_order = self.upstream_table.pk_serde().get_order_types().to_vec();
117 let upstream_table_id = self.upstream_table.table_id();
118 let mut upstream_table = self.upstream_table;
119 let vnodes = upstream_table.vnodes().clone();
120
121 // These builders will build data chunks.
122 // We must supply them with the full datatypes which correspond to
123 // pk + output_indices.
124 let snapshot_data_types = self
125 .upstream
126 .schema()
127 .fields()
128 .iter()
129 .map(|field| field.data_type.clone())
130 .collect_vec();
131 let mut builders: Builders = upstream_table
132 .vnodes()
133 .iter_vnodes()
134 .map(|vnode| {
135 let builder = create_builder(
136 self.rate_limiter.rate_limit(),
137 self.chunk_size,
138 snapshot_data_types.clone(),
139 );
140 (vnode, builder)
141 })
142 .collect();
143
144 let mut upstream = self.upstream.execute();
145
146 // Poll the upstream to get the first barrier.
147 let first_barrier = expect_first_barrier(&mut upstream).await?;
148 let mut global_pause = first_barrier.is_pause_on_startup();
149 let mut backfill_paused = first_barrier.is_backfill_pause_on_startup(self.fragment_id);
150 let first_epoch = first_barrier.epoch;
151 let is_newly_added = first_barrier.is_newly_added(self.actor_id);
152 // The first barrier message should be propagated.
153 yield Message::Barrier(first_barrier);
154
155 self.state_table.init_epoch(first_epoch).await?;
156
157 let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?;
158
159 let is_completely_finished = progress_per_vnode.iter().all(|(_, p)| {
160 matches!(
161 p.current_state(),
162 &BackfillProgressPerVnode::Completed { .. }
163 )
164 });
165 if is_completely_finished {
166 assert!(!is_newly_added);
167 }
168
169 upstream_table.init_epoch(first_epoch).await?;
170
171 let mut backfill_state: BackfillState = progress_per_vnode.into();
172
173 let to_backfill = !is_completely_finished;
174
175 // If no need backfill, but state was still "unfinished" we need to finish it.
176 // So we just update the state + progress to meta at the next barrier to finish progress,
177 // and forward other messages.
178 //
179 // Reason for persisting on second barrier rather than first:
180 // We can't update meta with progress as finished until state_table
181 // has been updated.
182 // We also can't update state_table in first epoch, since state_table
183 // expects to have been initialized in previous epoch.
184
185 // The epoch used to snapshot read upstream mv.
186 let mut snapshot_read_epoch;
187
188 // Keep track of rows from the snapshot.
189 let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();
190
191 // Arrangement Backfill Algorithm:
192 //
193 // backfill_stream
194 // / \
195 // upstream snapshot
196 //
197 // We construct a backfill stream with upstream as its left input and mv snapshot read
198 // stream as its right input. When a chunk comes from upstream, we will buffer it.
199 //
200 // When a barrier comes from upstream:
201 // Immediately break out of backfill loop.
202 // - For each row of the upstream chunk buffer, compute vnode.
203 // - Get the `current_pos` corresponding to the vnode. Forward it to downstream if its pk
204 // <= `current_pos`, otherwise ignore it.
205 // - Flush all buffered upstream_chunks to replicated state table.
206 // - Update the `snapshot_read_epoch`.
207 // - Reconstruct the whole backfill stream with upstream and new mv snapshot read stream
208 // with the `snapshot_read_epoch`.
209 //
210 // When a chunk comes from snapshot, we forward it to the downstream and raise
211 // `current_pos`.
212 //
213 // When we reach the end of the snapshot read stream, it means backfill has been
214 // finished.
215 //
216 // Once the backfill loop ends, we forward the upstream directly to the downstream.
217 if to_backfill {
218 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
219 let mut pending_barrier: Option<Barrier> = None;
220
221 let metrics = self
222 .metrics
223 .new_backfill_metrics(upstream_table_id, self.actor_id);
224
225 'backfill_loop: loop {
226 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
227 let mut cur_barrier_upstream_processed_rows: u64 = 0;
228 let mut snapshot_read_complete = false;
229 let mut has_snapshot_read = false;
230
231 // NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be
232 // dropped. Then we can write to `upstream_table` on barrier in the
233 // next block.
234 {
235 let left_upstream = upstream.by_ref().map(Either::Left);
236
237 // Check if stream paused
238 let paused = global_pause
239 || backfill_paused
240 || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
241 // Create the snapshot stream
242 let right_snapshot = pin!(
243 Self::make_snapshot_stream(
244 &upstream_table,
245 backfill_state.clone(), // FIXME: Use mutable reference instead.
246 paused,
247 &self.rate_limiter,
248 )
249 .map(Either::Right)
250 );
251
252 // Prefer to select upstream, so we can stop snapshot stream as soon as the
253 // barrier comes.
254 let mut backfill_stream =
255 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
256 stream::PollNext::Left
257 });
258
259 #[for_await]
260 for either in &mut backfill_stream {
261 match either {
262 // Upstream
263 Either::Left(msg) => {
264 match msg? {
265 Message::Barrier(barrier) => {
266 // We have to process the barrier outside of the loop.
267 // This is because our state_table reference is still live
268 // here, we have to break the loop to drop it,
269 // so we can do replication of upstream state_table.
270 pending_barrier = Some(barrier);
271
272 // Break the for loop and start a new snapshot read stream.
273 break;
274 }
275 Message::Chunk(chunk) => {
276 // Buffer the upstream chunk.
277 upstream_chunk_buffer.push(chunk.compact_vis());
278 }
279 Message::Watermark(_) => {
280 // Ignore watermark during backfill.
281 }
282 }
283 }
284 // Snapshot read
285 Either::Right(msg) => {
286 has_snapshot_read = true;
287 match msg? {
288 None => {
289 // Consume remaining rows in the builder.
290 for (vnode, builder) in &mut builders {
291 if let Some(data_chunk) = builder.consume_all() {
292 let chunk = Self::handle_snapshot_chunk(
293 data_chunk,
294 *vnode,
295 &pk_in_output_indices,
296 &mut backfill_state,
297 &mut cur_barrier_snapshot_processed_rows,
298 &mut total_snapshot_processed_rows,
299 &self.output_indices,
300 )?;
301 tracing::trace!(
302 source = "snapshot",
303 state = "finish_backfill_stream",
304 action = "drain_snapshot_buffers",
305 ?vnode,
306 "{:#?}",
307 chunk,
308 );
309 yield Message::Chunk(chunk);
310 }
311 }
312
313 // End of the snapshot read stream.
314 // We should not mark the chunk anymore,
315 // otherwise, we will ignore some rows
316 // in the buffer. Here we choose to never mark the chunk.
317 // Consume with the renaming stream buffer chunk without
318 // mark.
319 for chunk in upstream_chunk_buffer.drain(..) {
320 let chunk_cardinality = chunk.cardinality() as u64;
321 cur_barrier_upstream_processed_rows +=
322 chunk_cardinality;
323 let chunk = mapping_chunk(chunk, &self.output_indices);
324 tracing::trace!(
325 source = "upstream",
326 state = "finish_backfill_stream",
327 action = "drain_upstream_buffer",
328 "{:#?}",
329 chunk,
330 );
331 yield Message::Chunk(chunk);
332 }
333 metrics
334 .backfill_snapshot_read_row_count
335 .inc_by(cur_barrier_snapshot_processed_rows);
336 metrics
337 .backfill_upstream_output_row_count
338 .inc_by(cur_barrier_upstream_processed_rows);
339 break 'backfill_loop;
340 }
341 Some((vnode, row)) => {
342 let builder = builders.get_mut(&vnode).unwrap();
343 if let Some(chunk) = builder.append_one_row(row) {
344 let chunk = Self::handle_snapshot_chunk(
345 chunk,
346 vnode,
347 &pk_in_output_indices,
348 &mut backfill_state,
349 &mut cur_barrier_snapshot_processed_rows,
350 &mut total_snapshot_processed_rows,
351 &self.output_indices,
352 )?;
353 tracing::trace!(
354 source = "snapshot",
355 state = "process_backfill_stream",
356 action = "drain_full_snapshot_buffer",
357 "{:#?}",
358 chunk,
359 );
360 yield Message::Chunk(chunk);
361 }
362 }
363 }
364 }
365 }
366 }
367
368 // Before processing barrier, if did not snapshot read,
369 // do a snapshot read first.
370 // This is so we don't lose the tombstone iteration progress.
371 // Or if s3 read latency is high, we don't fail to read from s3.
372 //
373 // If paused, we can't read any snapshot records, skip this.
374 //
375 // If rate limit is set, respect the rate limit, check if we can read,
376 // If we can't, skip it. If no rate limit set, we can read.
377 let rate_limit_ready = self.rate_limiter.check(1).is_ok();
378 if !has_snapshot_read && !paused && rate_limit_ready {
379 debug_assert!(builders.values().all(|b| b.is_empty()));
380 let (_, snapshot) = backfill_stream.into_inner();
381 #[for_await]
382 for msg in snapshot {
383 let Either::Right(msg) = msg else {
384 bail!("BUG: snapshot_read contains upstream messages");
385 };
386 match msg? {
387 None => {
388 // End of the snapshot read stream.
389 // We let the barrier handling logic take care of upstream updates.
390 // But we still want to exit backfill loop, so we mark snapshot read complete.
391 snapshot_read_complete = true;
392 break;
393 }
394 Some((vnode, row)) => {
395 let builder = builders.get_mut(&vnode).unwrap();
396 if let Some(chunk) = builder.append_one_row(row) {
397 let chunk = Self::handle_snapshot_chunk(
398 chunk,
399 vnode,
400 &pk_in_output_indices,
401 &mut backfill_state,
402 &mut cur_barrier_snapshot_processed_rows,
403 &mut total_snapshot_processed_rows,
404 &self.output_indices,
405 )?;
406 tracing::trace!(
407 source = "snapshot",
408 state = "process_backfill_stream",
409 action = "snapshot_read_at_least_one",
410 "{:#?}",
411 chunk,
412 );
413 yield Message::Chunk(chunk);
414 }
415
416 break;
417 }
418 }
419 }
420 }
421 }
422
423 // Process barrier
424 // When we break out of inner backfill_stream loop, it means we have a barrier.
425 // If there are no updates and there are no snapshots left,
426 // we already finished backfill and should have exited the outer backfill loop.
427 let barrier = match pending_barrier.take() {
428 Some(barrier) => barrier,
429 None => bail!("BUG: current_backfill loop exited without a barrier"),
430 };
431
432 // Process barrier:
433 // - consume snapshot rows left in builder.
434 // - consume upstream buffer chunk
435 // - handle mutations
436 // - switch snapshot
437
438 // consume snapshot rows left in builder.
439 // NOTE(kwannoel): `zip_eq_debug` does not work here,
440 // we encounter "higher-ranked lifetime error".
441 for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| {
442 let chunk = b.consume_all().map(|chunk| {
443 let ops = vec![Op::Insert; chunk.capacity()];
444 StreamChunk::from_parts(ops, chunk)
445 });
446 (vnode, chunk)
447 }) {
448 if let Some(chunk) = chunk {
449 let chunk_cardinality = chunk.cardinality() as u64;
450 // Raise the current position.
451 // As snapshot read streams are ordered by pk, so we can
452 // just use the last row to update `current_pos`.
453 update_pos_by_vnode(
454 *vnode,
455 &chunk,
456 &pk_in_output_indices,
457 &mut backfill_state,
458 chunk_cardinality,
459 )?;
460
461 cur_barrier_snapshot_processed_rows += chunk_cardinality;
462 total_snapshot_processed_rows += chunk_cardinality;
463 let chunk = mapping_chunk(chunk, &self.output_indices);
464 tracing::trace!(
465 source = "snapshot",
466 state = "process_barrier",
467 action = "consume_remaining_snapshot",
468 "{:#?}",
469 chunk,
470 );
471 yield Message::Chunk(chunk);
472 }
473 }
474
475 // consume upstream buffer chunk
476 for chunk in upstream_chunk_buffer.drain(..) {
477 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
478 // FIXME: Replace with `snapshot_is_processed`
479 // Flush downstream.
480 // If no current_pos, means no snapshot processed yet.
481 // Also means we don't need propagate any updates <= current_pos.
482 if backfill_state.has_progress() {
483 let chunk = mapping_chunk(
484 mark_chunk_ref_by_vnode(
485 &chunk,
486 &backfill_state,
487 &pk_in_output_indices,
488 &upstream_table,
489 &pk_order,
490 )?,
491 &self.output_indices,
492 );
493 tracing::trace!(
494 source = "upstream",
495 state = "process_barrier",
496 action = "consume_remaining_upstream",
497 "{:#?}",
498 chunk,
499 );
500 yield Message::Chunk(chunk);
501 }
502
503 // Replicate
504 upstream_table.write_chunk(chunk);
505 }
506
507 upstream_table
508 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
509 .await?;
510
511 metrics
512 .backfill_snapshot_read_row_count
513 .inc_by(cur_barrier_snapshot_processed_rows);
514 metrics
515 .backfill_upstream_output_row_count
516 .inc_by(cur_barrier_upstream_processed_rows);
517
518 // Update snapshot read epoch.
519 snapshot_read_epoch = barrier.epoch.prev;
520
521 // TODO(kwannoel): Not sure if this holds for arrangement backfill.
522 // May need to revisit it.
523 // Need to check it after scale-in / scale-out.
524 self.progress.update(
525 barrier.epoch,
526 snapshot_read_epoch,
527 total_snapshot_processed_rows,
528 );
529
530 // Persist state on barrier
531 persist_state_per_vnode(
532 barrier.epoch,
533 &mut self.state_table,
534 &mut backfill_state,
535 #[cfg(debug_assertions)]
536 state_len,
537 vnodes.iter_vnodes(),
538 )
539 .await?;
540
541 tracing::trace!(
542 barrier = ?barrier,
543 "barrier persisted"
544 );
545
546 // handle mutations
547 if let Some(mutation) = barrier.mutation.as_deref() {
548 use crate::executor::Mutation;
549 match mutation {
550 Mutation::Pause => {
551 global_pause = true;
552 }
553 Mutation::Resume => {
554 global_pause = false;
555 }
556 Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
557 if fragment_ids.contains(&self.fragment_id) {
558 backfill_paused = false;
559 }
560 }
561 Mutation::Throttle(fragment_to_apply) => {
562 if let Some(entry) = fragment_to_apply.get(&self.fragment_id)
563 && entry.throttle_type() == ThrottleType::Backfill
564 {
565 let new_rate_limit = entry.rate_limit.into();
566 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
567 if old_rate_limit != new_rate_limit {
568 tracing::info!(
569 old_rate_limit = ?old_rate_limit,
570 new_rate_limit = ?new_rate_limit,
571 %upstream_table_id,
572 actor_id = %self.actor_id,
573 "backfill rate limit changed",
574 );
575 builders = upstream_table
576 .vnodes()
577 .iter_vnodes()
578 .map(|vnode| {
579 let builder = create_builder(
580 new_rate_limit,
581 self.chunk_size,
582 snapshot_data_types.clone(),
583 );
584 (vnode, builder)
585 })
586 .collect();
587 }
588 }
589 }
590 _ => {}
591 }
592 }
593
594 yield Message::Barrier(barrier);
595
596 // We will switch snapshot at the start of the next iteration of the backfill loop.
597 // Unless snapshot read is already completed.
598 if snapshot_read_complete {
599 break 'backfill_loop;
600 }
601 }
602 }
603
604 tracing::debug!("snapshot read finished, wait to commit state on next barrier");
605
606 // Update our progress as finished in state table.
607
608 // Wait for first barrier to come after backfill is finished.
609 // So we can update our progress + persist the status.
610 while let Some(Ok(msg)) = upstream.next().await {
611 if let Some(msg) = mapping_message(msg, &self.output_indices) {
612 // If not finished then we need to update state, otherwise no need.
613 if let Message::Barrier(barrier) = &msg {
614 if is_completely_finished {
615 // If already finished, no need to persist any state. But we need to advance the epoch anyway
616 self.state_table
617 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
618 .await?;
619 } else {
620 // If snapshot was empty, we do not need to backfill,
621 // but we still need to persist the finished state.
622 // We currently persist it on the second barrier here rather than first.
623 // This is because we can't update state table in first epoch,
624 // since it expects to have been initialized in previous epoch
625 // (there's no epoch before the first epoch).
626 for vnode in upstream_table.vnodes().iter_vnodes() {
627 backfill_state
628 .finish_progress(vnode, upstream_table.pk_indices().len());
629 }
630
631 persist_state_per_vnode(
632 barrier.epoch,
633 &mut self.state_table,
634 &mut backfill_state,
635 #[cfg(debug_assertions)]
636 state_len,
637 vnodes.iter_vnodes(),
638 )
639 .await?;
640 }
641
642 self.progress
643 .finish(barrier.epoch, total_snapshot_processed_rows);
644 yield msg;
645 break;
646 }
647 // Allow other messages to pass through.
648 // We won't yield twice here, since if there's a barrier,
649 // we will always break out of the loop.
650 yield msg;
651 }
652 }
653
654 tracing::debug!("backfill finished");
655
656 // After progress finished + state persisted,
657 // we can forward messages directly to the downstream,
658 // as backfill is finished.
659 #[for_await]
660 for msg in upstream {
661 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
662 if let Message::Barrier(barrier) = &msg {
663 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
664 self.state_table
665 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
666 .await?;
667 }
668 yield msg;
669 }
670 }
671 }
672
673 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
674 async fn make_snapshot_stream<'a>(
675 upstream_table: &'a ReplicatedStateTable<S, SD>,
676 backfill_state: BackfillState,
677 paused: bool,
678 rate_limiter: &'a MonitoredRateLimiter,
679 ) {
680 if paused {
681 #[for_await]
682 for _ in tokio_stream::pending() {
683 bail!("BUG: paused stream should not yield");
684 }
685 } else {
686 // Checked the rate limit is not zero.
687 #[for_await]
688 for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
689 let r = r?;
690 rate_limiter.wait(1).await;
691 yield r;
692 }
693 }
694 }
695
696 fn handle_snapshot_chunk(
697 chunk: DataChunk,
698 vnode: VirtualNode,
699 pk_in_output_indices: &[usize],
700 backfill_state: &mut BackfillState,
701 cur_barrier_snapshot_processed_rows: &mut u64,
702 total_snapshot_processed_rows: &mut u64,
703 output_indices: &[usize],
704 ) -> StreamExecutorResult<StreamChunk> {
705 let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
706 // Raise the current position.
707 // As snapshot read streams are ordered by pk, so we can
708 // just use the last row to update `current_pos`.
709 let snapshot_row_count_delta = chunk.cardinality() as u64;
710 update_pos_by_vnode(
711 vnode,
712 &chunk,
713 pk_in_output_indices,
714 backfill_state,
715 snapshot_row_count_delta,
716 )?;
717
718 let chunk_cardinality = chunk.cardinality() as u64;
719 *cur_barrier_snapshot_processed_rows += chunk_cardinality;
720 *total_snapshot_processed_rows += chunk_cardinality;
721 Ok(mapping_chunk(chunk, output_indices))
722 }
723
724 /// Read snapshot per vnode.
725 /// These streams should be sorted in storage layer.
726 /// 1. Get row iterator / vnode.
727 /// 2. Merge it with `select_all`.
728 /// 3. Change it into a chunk iterator with `iter_chunks`.
729 /// This means it should fetch a row from each iterator to form a chunk.
730 ///
731 /// We interleave at chunk per vnode level rather than rows.
732 /// This is so that we can compute `current_pos` once per chunk, since they correspond to 1
733 /// vnode.
734 ///
735 /// The stream contains pairs of `(VirtualNode, StreamChunk)`.
736 /// The `VirtualNode` is the vnode that the chunk belongs to.
737 /// The `StreamChunk` is the chunk that contains the rows from the vnode.
738 /// If it's `None`, it means the vnode has no more rows for this snapshot read.
739 ///
740 /// The `snapshot_read_epoch` is supplied as a parameter for `state_table`.
741 /// It is required to ensure we read a fully-checkpointed snapshot the **first time**.
742 ///
743 /// The rows from upstream snapshot read will be buffered inside the `builder`.
744 /// If snapshot is dropped before its rows are consumed,
745 /// remaining data in `builder` must be flushed manually.
746 /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
747 /// present, Then when we flush we contain duplicate rows.
748 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
749 async fn snapshot_read_per_vnode(
750 upstream_table: &ReplicatedStateTable<S, SD>,
751 backfill_state: BackfillState,
752 ) {
753 let mut iterators = vec![];
754 for vnode in upstream_table.vnodes().iter_vnodes() {
755 let backfill_progress = backfill_state.get_progress(&vnode)?;
756 let current_pos = match backfill_progress {
757 BackfillProgressPerVnode::NotStarted => None,
758 BackfillProgressPerVnode::Completed { .. } => {
759 continue;
760 }
761 BackfillProgressPerVnode::InProgress { current_pos, .. } => {
762 Some(current_pos.clone())
763 }
764 };
765
766 let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
767 if range_bounds.is_none() {
768 continue;
769 }
770 let range_bounds = range_bounds.unwrap();
771
772 tracing::trace!(
773 vnode = ?vnode,
774 current_pos = ?current_pos,
775 range_bounds = ?range_bounds,
776 "iter_with_vnode_and_output_indices"
777 );
778 let vnode_row_iter = upstream_table
779 .iter_with_vnode_and_output_indices(
780 vnode,
781 &range_bounds,
782 PrefetchOptions::prefetch_for_small_range_scan(),
783 )
784 .await?;
785
786 let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));
787
788 let vnode_row_iter = Box::pin(vnode_row_iter);
789
790 iterators.push(vnode_row_iter);
791 }
792
793 // TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial.
794 let vnode_row_iter = select_all(iterators);
795
796 #[for_await]
797 for vnode_and_row in vnode_row_iter {
798 yield Some(vnode_and_row?);
799 }
800 yield None;
801 return Ok(());
802 }
803}
804
805impl<S, SD> Execute for ArrangementBackfillExecutor<S, SD>
806where
807 S: StateStore,
808 SD: ValueRowSerde,
809{
810 fn execute(self: Box<Self>) -> BoxedMessageStream {
811 self.execute_inner().boxed()
812 }
813}