risingwave_stream/executor/backfill/
arrangement_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use either::Either;
18use futures::stream::{select_all, select_with_strategy};
19use futures::{TryStreamExt, stream};
20use itertools::Itertools;
21use risingwave_common::array::{DataChunk, Op};
22use risingwave_common::bail;
23use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
26use risingwave_storage::row_serde::value_serde::ValueRowSerde;
27use risingwave_storage::store::PrefetchOptions;
28
29use crate::common::table::state_table::ReplicatedStateTable;
30#[cfg(debug_assertions)]
31use crate::executor::backfill::utils::METADATA_STATE_LEN;
32use crate::executor::backfill::utils::{
33    BackfillProgressPerVnode, BackfillState, compute_bounds, create_builder,
34    get_progress_per_vnode, mapping_chunk, mapping_message, mark_chunk_ref_by_vnode,
35    persist_state_per_vnode, update_pos_by_vnode,
36};
37use crate::executor::prelude::*;
38use crate::task::{CreateMviewProgressReporter, FragmentId};
39
40type Builders = HashMap<VirtualNode, DataChunkBuilder>;
41
42/// Similar to [`super::no_shuffle_backfill::BackfillExecutor`].
43/// Main differences:
44/// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled
45///   independently.
46/// - To synchronize upstream shared buffer, it is initialized with a [`ReplicatedStateTable`].
47pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
48    /// Upstream table
49    upstream_table: ReplicatedStateTable<S, SD>,
50
51    /// Upstream with the same schema with the upstream table.
52    upstream: Executor,
53
54    /// Internal state table for persisting state of backfill state.
55    state_table: StateTable<S>,
56
57    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
58    output_indices: Vec<usize>,
59
60    progress: CreateMviewProgressReporter,
61
62    actor_id: ActorId,
63
64    metrics: Arc<StreamingMetrics>,
65
66    chunk_size: usize,
67
68    rate_limiter: MonitoredRateLimiter,
69
70    /// Fragment id of the fragment this backfill node belongs to.
71    fragment_id: FragmentId,
72}
73
74impl<S, SD> ArrangementBackfillExecutor<S, SD>
75where
76    S: StateStore,
77    SD: ValueRowSerde,
78{
79    #[allow(clippy::too_many_arguments)]
80    #[allow(dead_code)]
81    pub fn new(
82        upstream_table: ReplicatedStateTable<S, SD>,
83        upstream: Executor,
84        state_table: StateTable<S>,
85        output_indices: Vec<usize>,
86        progress: CreateMviewProgressReporter,
87        metrics: Arc<StreamingMetrics>,
88        chunk_size: usize,
89        rate_limit: RateLimit,
90        fragment_id: FragmentId,
91    ) -> Self {
92        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
93        Self {
94            upstream_table,
95            upstream,
96            state_table,
97            output_indices,
98            actor_id: progress.actor_id(),
99            progress,
100            metrics,
101            chunk_size,
102            rate_limiter,
103            fragment_id,
104        }
105    }
106
107    #[try_stream(ok = Message, error = StreamExecutorError)]
108    async fn execute_inner(mut self) {
109        tracing::debug!("backfill executor started");
110        // The primary key columns, in the output columns of the upstream_table scan.
111        // Table scan scans a subset of the columns of the upstream table.
112        let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
113        #[cfg(debug_assertions)]
114        let state_len = self.upstream_table.pk_indices().len() + METADATA_STATE_LEN;
115        let pk_order = self.upstream_table.pk_serde().get_order_types().to_vec();
116        let upstream_table_id = self.upstream_table.table_id();
117        let mut upstream_table = self.upstream_table;
118        let vnodes = upstream_table.vnodes().clone();
119
120        // These builders will build data chunks.
121        // We must supply them with the full datatypes which correspond to
122        // pk + output_indices.
123        let snapshot_data_types = self
124            .upstream
125            .schema()
126            .fields()
127            .iter()
128            .map(|field| field.data_type.clone())
129            .collect_vec();
130        let mut builders: Builders = upstream_table
131            .vnodes()
132            .iter_vnodes()
133            .map(|vnode| {
134                let builder = create_builder(
135                    self.rate_limiter.rate_limit(),
136                    self.chunk_size,
137                    snapshot_data_types.clone(),
138                );
139                (vnode, builder)
140            })
141            .collect();
142
143        let mut upstream = self.upstream.execute();
144
145        // Poll the upstream to get the first barrier.
146        let first_barrier = expect_first_barrier(&mut upstream).await?;
147        let mut global_pause = first_barrier.is_pause_on_startup();
148        let mut backfill_paused = first_barrier.is_backfill_pause_on_startup(self.fragment_id);
149        let first_epoch = first_barrier.epoch;
150        let is_newly_added = first_barrier.is_newly_added(self.actor_id);
151        // The first barrier message should be propagated.
152        yield Message::Barrier(first_barrier);
153
154        self.state_table.init_epoch(first_epoch).await?;
155
156        let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?;
157
158        let is_completely_finished = progress_per_vnode.iter().all(|(_, p)| {
159            matches!(
160                p.current_state(),
161                &BackfillProgressPerVnode::Completed { .. }
162            )
163        });
164        if is_completely_finished {
165            assert!(!is_newly_added);
166        }
167
168        upstream_table.init_epoch(first_epoch).await?;
169
170        let mut backfill_state: BackfillState = progress_per_vnode.into();
171
172        let to_backfill = !is_completely_finished;
173
174        // If no need backfill, but state was still "unfinished" we need to finish it.
175        // So we just update the state + progress to meta at the next barrier to finish progress,
176        // and forward other messages.
177        //
178        // Reason for persisting on second barrier rather than first:
179        // We can't update meta with progress as finished until state_table
180        // has been updated.
181        // We also can't update state_table in first epoch, since state_table
182        // expects to have been initialized in previous epoch.
183
184        // The epoch used to snapshot read upstream mv.
185        let mut snapshot_read_epoch;
186
187        // Keep track of rows from the snapshot.
188        let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();
189
190        // Arrangement Backfill Algorithm:
191        //
192        //   backfill_stream
193        //  /               \
194        // upstream       snapshot
195        //
196        // We construct a backfill stream with upstream as its left input and mv snapshot read
197        // stream as its right input. When a chunk comes from upstream, we will buffer it.
198        //
199        // When a barrier comes from upstream:
200        //  Immediately break out of backfill loop.
201        //  - For each row of the upstream chunk buffer, compute vnode.
202        //  - Get the `current_pos` corresponding to the vnode. Forward it to downstream if its pk
203        //    <= `current_pos`, otherwise ignore it.
204        //  - Flush all buffered upstream_chunks to replicated state table.
205        //  - Update the `snapshot_read_epoch`.
206        //  - Reconstruct the whole backfill stream with upstream and new mv snapshot read stream
207        //    with the `snapshot_read_epoch`.
208        //
209        // When a chunk comes from snapshot, we forward it to the downstream and raise
210        // `current_pos`.
211        //
212        // When we reach the end of the snapshot read stream, it means backfill has been
213        // finished.
214        //
215        // Once the backfill loop ends, we forward the upstream directly to the downstream.
216        if to_backfill {
217            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
218            let mut pending_barrier: Option<Barrier> = None;
219
220            let metrics = self
221                .metrics
222                .new_backfill_metrics(upstream_table_id, self.actor_id);
223
224            'backfill_loop: loop {
225                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
226                let mut cur_barrier_upstream_processed_rows: u64 = 0;
227                let mut snapshot_read_complete = false;
228                let mut has_snapshot_read = false;
229
230                // NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be
231                // dropped. Then we can write to `upstream_table` on barrier in the
232                // next block.
233                {
234                    let left_upstream = upstream.by_ref().map(Either::Left);
235
236                    // Check if stream paused
237                    let paused = global_pause
238                        || backfill_paused
239                        || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
240                    // Create the snapshot stream
241                    let right_snapshot = pin!(
242                        Self::make_snapshot_stream(
243                            &upstream_table,
244                            backfill_state.clone(), // FIXME: Use mutable reference instead.
245                            paused,
246                            &self.rate_limiter,
247                        )
248                        .map(Either::Right)
249                    );
250
251                    // Prefer to select upstream, so we can stop snapshot stream as soon as the
252                    // barrier comes.
253                    let mut backfill_stream =
254                        select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
255                            stream::PollNext::Left
256                        });
257
258                    #[for_await]
259                    for either in &mut backfill_stream {
260                        match either {
261                            // Upstream
262                            Either::Left(msg) => {
263                                match msg? {
264                                    Message::Barrier(barrier) => {
265                                        // We have to process the barrier outside of the loop.
266                                        // This is because our state_table reference is still live
267                                        // here, we have to break the loop to drop it,
268                                        // so we can do replication of upstream state_table.
269                                        pending_barrier = Some(barrier);
270
271                                        // Break the for loop and start a new snapshot read stream.
272                                        break;
273                                    }
274                                    Message::Chunk(chunk) => {
275                                        // Buffer the upstream chunk.
276                                        upstream_chunk_buffer.push(chunk.compact());
277                                    }
278                                    Message::Watermark(_) => {
279                                        // Ignore watermark during backfill.
280                                    }
281                                }
282                            }
283                            // Snapshot read
284                            Either::Right(msg) => {
285                                has_snapshot_read = true;
286                                match msg? {
287                                    None => {
288                                        // Consume remaining rows in the builder.
289                                        for (vnode, builder) in &mut builders {
290                                            if let Some(data_chunk) = builder.consume_all() {
291                                                let chunk = Self::handle_snapshot_chunk(
292                                                    data_chunk,
293                                                    *vnode,
294                                                    &pk_in_output_indices,
295                                                    &mut backfill_state,
296                                                    &mut cur_barrier_snapshot_processed_rows,
297                                                    &mut total_snapshot_processed_rows,
298                                                    &self.output_indices,
299                                                )?;
300                                                tracing::trace!(
301                                                    source = "snapshot",
302                                                    state = "finish_backfill_stream",
303                                                    action = "drain_snapshot_buffers",
304                                                    ?vnode,
305                                                    "{:#?}",
306                                                    chunk,
307                                                );
308                                                yield Message::Chunk(chunk);
309                                            }
310                                        }
311
312                                        // End of the snapshot read stream.
313                                        // We should not mark the chunk anymore,
314                                        // otherwise, we will ignore some rows
315                                        // in the buffer. Here we choose to never mark the chunk.
316                                        // Consume with the renaming stream buffer chunk without
317                                        // mark.
318                                        for chunk in upstream_chunk_buffer.drain(..) {
319                                            let chunk_cardinality = chunk.cardinality() as u64;
320                                            cur_barrier_upstream_processed_rows +=
321                                                chunk_cardinality;
322                                            let chunk = mapping_chunk(chunk, &self.output_indices);
323                                            tracing::trace!(
324                                                source = "upstream",
325                                                state = "finish_backfill_stream",
326                                                action = "drain_upstream_buffer",
327                                                "{:#?}",
328                                                chunk,
329                                            );
330                                            yield Message::Chunk(chunk);
331                                        }
332                                        metrics
333                                            .backfill_snapshot_read_row_count
334                                            .inc_by(cur_barrier_snapshot_processed_rows);
335                                        metrics
336                                            .backfill_upstream_output_row_count
337                                            .inc_by(cur_barrier_upstream_processed_rows);
338                                        break 'backfill_loop;
339                                    }
340                                    Some((vnode, row)) => {
341                                        let builder = builders.get_mut(&vnode).unwrap();
342                                        if let Some(chunk) = builder.append_one_row(row) {
343                                            let chunk = Self::handle_snapshot_chunk(
344                                                chunk,
345                                                vnode,
346                                                &pk_in_output_indices,
347                                                &mut backfill_state,
348                                                &mut cur_barrier_snapshot_processed_rows,
349                                                &mut total_snapshot_processed_rows,
350                                                &self.output_indices,
351                                            )?;
352                                            tracing::trace!(
353                                                source = "snapshot",
354                                                state = "process_backfill_stream",
355                                                action = "drain_full_snapshot_buffer",
356                                                "{:#?}",
357                                                chunk,
358                                            );
359                                            yield Message::Chunk(chunk);
360                                        }
361                                    }
362                                }
363                            }
364                        }
365                    }
366
367                    // Before processing barrier, if did not snapshot read,
368                    // do a snapshot read first.
369                    // This is so we don't lose the tombstone iteration progress.
370                    // Or if s3 read latency is high, we don't fail to read from s3.
371                    //
372                    // If paused, we can't read any snapshot records, skip this.
373                    //
374                    // If rate limit is set, respect the rate limit, check if we can read,
375                    // If we can't, skip it. If no rate limit set, we can read.
376                    let rate_limit_ready = self.rate_limiter.check(1).is_ok();
377                    if !has_snapshot_read && !paused && rate_limit_ready {
378                        debug_assert!(builders.values().all(|b| b.is_empty()));
379                        let (_, snapshot) = backfill_stream.into_inner();
380                        #[for_await]
381                        for msg in snapshot {
382                            let Either::Right(msg) = msg else {
383                                bail!("BUG: snapshot_read contains upstream messages");
384                            };
385                            match msg? {
386                                None => {
387                                    // End of the snapshot read stream.
388                                    // We let the barrier handling logic take care of upstream updates.
389                                    // But we still want to exit backfill loop, so we mark snapshot read complete.
390                                    snapshot_read_complete = true;
391                                    break;
392                                }
393                                Some((vnode, row)) => {
394                                    let builder = builders.get_mut(&vnode).unwrap();
395                                    if let Some(chunk) = builder.append_one_row(row) {
396                                        let chunk = Self::handle_snapshot_chunk(
397                                            chunk,
398                                            vnode,
399                                            &pk_in_output_indices,
400                                            &mut backfill_state,
401                                            &mut cur_barrier_snapshot_processed_rows,
402                                            &mut total_snapshot_processed_rows,
403                                            &self.output_indices,
404                                        )?;
405                                        tracing::trace!(
406                                            source = "snapshot",
407                                            state = "process_backfill_stream",
408                                            action = "snapshot_read_at_least_one",
409                                            "{:#?}",
410                                            chunk,
411                                        );
412                                        yield Message::Chunk(chunk);
413                                    }
414
415                                    break;
416                                }
417                            }
418                        }
419                    }
420                }
421
422                // Process barrier
423                // When we break out of inner backfill_stream loop, it means we have a barrier.
424                // If there are no updates and there are no snapshots left,
425                // we already finished backfill and should have exited the outer backfill loop.
426                let barrier = match pending_barrier.take() {
427                    Some(barrier) => barrier,
428                    None => bail!("BUG: current_backfill loop exited without a barrier"),
429                };
430
431                // Process barrier:
432                // - consume snapshot rows left in builder.
433                // - consume upstream buffer chunk
434                // - handle mutations
435                // - switch snapshot
436
437                // consume snapshot rows left in builder.
438                // NOTE(kwannoel): `zip_eq_debug` does not work here,
439                // we encounter "higher-ranked lifetime error".
440                for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| {
441                    let chunk = b.consume_all().map(|chunk| {
442                        let ops = vec![Op::Insert; chunk.capacity()];
443                        StreamChunk::from_parts(ops, chunk)
444                    });
445                    (vnode, chunk)
446                }) {
447                    if let Some(chunk) = chunk {
448                        let chunk_cardinality = chunk.cardinality() as u64;
449                        // Raise the current position.
450                        // As snapshot read streams are ordered by pk, so we can
451                        // just use the last row to update `current_pos`.
452                        update_pos_by_vnode(
453                            *vnode,
454                            &chunk,
455                            &pk_in_output_indices,
456                            &mut backfill_state,
457                            chunk_cardinality,
458                        )?;
459
460                        cur_barrier_snapshot_processed_rows += chunk_cardinality;
461                        total_snapshot_processed_rows += chunk_cardinality;
462                        let chunk = mapping_chunk(chunk, &self.output_indices);
463                        tracing::trace!(
464                            source = "snapshot",
465                            state = "process_barrier",
466                            action = "consume_remaining_snapshot",
467                            "{:#?}",
468                            chunk,
469                        );
470                        yield Message::Chunk(chunk);
471                    }
472                }
473
474                // consume upstream buffer chunk
475                for chunk in upstream_chunk_buffer.drain(..) {
476                    cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
477                    // FIXME: Replace with `snapshot_is_processed`
478                    // Flush downstream.
479                    // If no current_pos, means no snapshot processed yet.
480                    // Also means we don't need propagate any updates <= current_pos.
481                    if backfill_state.has_progress() {
482                        let chunk = mapping_chunk(
483                            mark_chunk_ref_by_vnode(
484                                &chunk,
485                                &backfill_state,
486                                &pk_in_output_indices,
487                                &upstream_table,
488                                &pk_order,
489                            )?,
490                            &self.output_indices,
491                        );
492                        tracing::trace!(
493                            source = "upstream",
494                            state = "process_barrier",
495                            action = "consume_remaining_upstream",
496                            "{:#?}",
497                            chunk,
498                        );
499                        yield Message::Chunk(chunk);
500                    }
501
502                    // Replicate
503                    upstream_table.write_chunk(chunk);
504                }
505
506                upstream_table
507                    .commit_assert_no_update_vnode_bitmap(barrier.epoch)
508                    .await?;
509
510                metrics
511                    .backfill_snapshot_read_row_count
512                    .inc_by(cur_barrier_snapshot_processed_rows);
513                metrics
514                    .backfill_upstream_output_row_count
515                    .inc_by(cur_barrier_upstream_processed_rows);
516
517                // Update snapshot read epoch.
518                snapshot_read_epoch = barrier.epoch.prev;
519
520                // TODO(kwannoel): Not sure if this holds for arrangement backfill.
521                // May need to revisit it.
522                // Need to check it after scale-in / scale-out.
523                self.progress.update(
524                    barrier.epoch,
525                    snapshot_read_epoch,
526                    total_snapshot_processed_rows,
527                );
528
529                // Persist state on barrier
530                persist_state_per_vnode(
531                    barrier.epoch,
532                    &mut self.state_table,
533                    &mut backfill_state,
534                    #[cfg(debug_assertions)]
535                    state_len,
536                    vnodes.iter_vnodes(),
537                )
538                .await?;
539
540                tracing::trace!(
541                    barrier = ?barrier,
542                    "barrier persisted"
543                );
544
545                // handle mutations
546                if let Some(mutation) = barrier.mutation.as_deref() {
547                    use crate::executor::Mutation;
548                    match mutation {
549                        Mutation::Pause => {
550                            global_pause = true;
551                        }
552                        Mutation::Resume => {
553                            global_pause = false;
554                        }
555                        Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
556                            if fragment_ids.contains(&self.fragment_id) {
557                                backfill_paused = false;
558                            }
559                        }
560                        Mutation::Throttle(actor_to_apply) => {
561                            let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
562                            if let Some(new_rate_limit) = new_rate_limit_entry {
563                                let new_rate_limit = (*new_rate_limit).into();
564                                let old_rate_limit = self.rate_limiter.update(new_rate_limit);
565                                if old_rate_limit != new_rate_limit {
566                                    tracing::info!(
567                                        old_rate_limit = ?old_rate_limit,
568                                        new_rate_limit = ?new_rate_limit,
569                                        upstream_table_id = upstream_table_id,
570                                        actor_id = self.actor_id,
571                                        "backfill rate limit changed",
572                                    );
573                                    builders = upstream_table
574                                        .vnodes()
575                                        .iter_vnodes()
576                                        .map(|vnode| {
577                                            let builder = create_builder(
578                                                new_rate_limit,
579                                                self.chunk_size,
580                                                snapshot_data_types.clone(),
581                                            );
582                                            (vnode, builder)
583                                        })
584                                        .collect();
585                                }
586                            }
587                        }
588                        _ => {}
589                    }
590                }
591
592                yield Message::Barrier(barrier);
593
594                // We will switch snapshot at the start of the next iteration of the backfill loop.
595                // Unless snapshot read is already completed.
596                if snapshot_read_complete {
597                    break 'backfill_loop;
598                }
599            }
600        }
601
602        tracing::debug!("snapshot read finished, wait to commit state on next barrier");
603
604        // Update our progress as finished in state table.
605
606        // Wait for first barrier to come after backfill is finished.
607        // So we can update our progress + persist the status.
608        while let Some(Ok(msg)) = upstream.next().await {
609            if let Some(msg) = mapping_message(msg, &self.output_indices) {
610                // If not finished then we need to update state, otherwise no need.
611                if let Message::Barrier(barrier) = &msg {
612                    if is_completely_finished {
613                        // If already finished, no need to persist any state. But we need to advance the epoch anyway
614                        self.state_table
615                            .commit_assert_no_update_vnode_bitmap(barrier.epoch)
616                            .await?;
617                    } else {
618                        // If snapshot was empty, we do not need to backfill,
619                        // but we still need to persist the finished state.
620                        // We currently persist it on the second barrier here rather than first.
621                        // This is because we can't update state table in first epoch,
622                        // since it expects to have been initialized in previous epoch
623                        // (there's no epoch before the first epoch).
624                        for vnode in upstream_table.vnodes().iter_vnodes() {
625                            backfill_state
626                                .finish_progress(vnode, upstream_table.pk_indices().len());
627                        }
628
629                        persist_state_per_vnode(
630                            barrier.epoch,
631                            &mut self.state_table,
632                            &mut backfill_state,
633                            #[cfg(debug_assertions)]
634                            state_len,
635                            vnodes.iter_vnodes(),
636                        )
637                        .await?;
638                    }
639
640                    self.progress
641                        .finish(barrier.epoch, total_snapshot_processed_rows);
642                    yield msg;
643                    break;
644                }
645                // Allow other messages to pass through.
646                // We won't yield twice here, since if there's a barrier,
647                // we will always break out of the loop.
648                yield msg;
649            }
650        }
651
652        tracing::debug!("backfill finished");
653
654        // After progress finished + state persisted,
655        // we can forward messages directly to the downstream,
656        // as backfill is finished.
657        #[for_await]
658        for msg in upstream {
659            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
660                if let Message::Barrier(barrier) = &msg {
661                    // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
662                    self.state_table
663                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
664                        .await?;
665                }
666                yield msg;
667            }
668        }
669    }
670
671    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
672    async fn make_snapshot_stream<'a>(
673        upstream_table: &'a ReplicatedStateTable<S, SD>,
674        backfill_state: BackfillState,
675        paused: bool,
676        rate_limiter: &'a MonitoredRateLimiter,
677    ) {
678        if paused {
679            #[for_await]
680            for _ in tokio_stream::pending() {
681                bail!("BUG: paused stream should not yield");
682            }
683        } else {
684            // Checked the rate limit is not zero.
685            #[for_await]
686            for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
687                let r = r?;
688                rate_limiter.wait(1).await;
689                yield r;
690            }
691        }
692    }
693
694    fn handle_snapshot_chunk(
695        chunk: DataChunk,
696        vnode: VirtualNode,
697        pk_in_output_indices: &[usize],
698        backfill_state: &mut BackfillState,
699        cur_barrier_snapshot_processed_rows: &mut u64,
700        total_snapshot_processed_rows: &mut u64,
701        output_indices: &[usize],
702    ) -> StreamExecutorResult<StreamChunk> {
703        let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
704        // Raise the current position.
705        // As snapshot read streams are ordered by pk, so we can
706        // just use the last row to update `current_pos`.
707        let snapshot_row_count_delta = chunk.cardinality() as u64;
708        update_pos_by_vnode(
709            vnode,
710            &chunk,
711            pk_in_output_indices,
712            backfill_state,
713            snapshot_row_count_delta,
714        )?;
715
716        let chunk_cardinality = chunk.cardinality() as u64;
717        *cur_barrier_snapshot_processed_rows += chunk_cardinality;
718        *total_snapshot_processed_rows += chunk_cardinality;
719        Ok(mapping_chunk(chunk, output_indices))
720    }
721
722    /// Read snapshot per vnode.
723    /// These streams should be sorted in storage layer.
724    /// 1. Get row iterator / vnode.
725    /// 2. Merge it with `select_all`.
726    /// 3. Change it into a chunk iterator with `iter_chunks`.
727    /// This means it should fetch a row from each iterator to form a chunk.
728    ///
729    /// We interleave at chunk per vnode level rather than rows.
730    /// This is so that we can compute `current_pos` once per chunk, since they correspond to 1
731    /// vnode.
732    ///
733    /// The stream contains pairs of `(VirtualNode, StreamChunk)`.
734    /// The `VirtualNode` is the vnode that the chunk belongs to.
735    /// The `StreamChunk` is the chunk that contains the rows from the vnode.
736    /// If it's `None`, it means the vnode has no more rows for this snapshot read.
737    ///
738    /// The `snapshot_read_epoch` is supplied as a parameter for `state_table`.
739    /// It is required to ensure we read a fully-checkpointed snapshot the **first time**.
740    ///
741    /// The rows from upstream snapshot read will be buffered inside the `builder`.
742    /// If snapshot is dropped before its rows are consumed,
743    /// remaining data in `builder` must be flushed manually.
744    /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
745    /// present, Then when we flush we contain duplicate rows.
746    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
747    async fn snapshot_read_per_vnode(
748        upstream_table: &ReplicatedStateTable<S, SD>,
749        backfill_state: BackfillState,
750    ) {
751        let mut iterators = vec![];
752        for vnode in upstream_table.vnodes().iter_vnodes() {
753            let backfill_progress = backfill_state.get_progress(&vnode)?;
754            let current_pos = match backfill_progress {
755                BackfillProgressPerVnode::NotStarted => None,
756                BackfillProgressPerVnode::Completed { .. } => {
757                    continue;
758                }
759                BackfillProgressPerVnode::InProgress { current_pos, .. } => {
760                    Some(current_pos.clone())
761                }
762            };
763
764            let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
765            if range_bounds.is_none() {
766                continue;
767            }
768            let range_bounds = range_bounds.unwrap();
769
770            tracing::trace!(
771                vnode = ?vnode,
772                current_pos = ?current_pos,
773                range_bounds = ?range_bounds,
774                "iter_with_vnode_and_output_indices"
775            );
776            let vnode_row_iter = upstream_table
777                .iter_with_vnode_and_output_indices(
778                    vnode,
779                    &range_bounds,
780                    PrefetchOptions::prefetch_for_small_range_scan(),
781                )
782                .await?;
783
784            let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));
785
786            let vnode_row_iter = Box::pin(vnode_row_iter);
787
788            iterators.push(vnode_row_iter);
789        }
790
791        // TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial.
792        let vnode_row_iter = select_all(iterators);
793
794        #[for_await]
795        for vnode_and_row in vnode_row_iter {
796            yield Some(vnode_and_row?);
797        }
798        yield None;
799        return Ok(());
800    }
801}
802
803impl<S, SD> Execute for ArrangementBackfillExecutor<S, SD>
804where
805    S: StateStore,
806    SD: ValueRowSerde,
807{
808    fn execute(self: Box<Self>) -> BoxedMessageStream {
809        self.execute_inner().boxed()
810    }
811}