risingwave_stream/executor/backfill/
arrangement_backfill.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use either::Either;
18use futures::stream::{select_all, select_with_strategy};
19use futures::{TryStreamExt, stream};
20use itertools::Itertools;
21use risingwave_common::array::{DataChunk, Op};
22use risingwave_common::bail;
23use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
26use risingwave_pb::common::ThrottleType;
27use risingwave_storage::row_serde::value_serde::ValueRowSerde;
28use risingwave_storage::store::PrefetchOptions;
29
30use crate::common::table::state_table::ReplicatedStateTable;
31#[cfg(debug_assertions)]
32use crate::executor::backfill::utils::METADATA_STATE_LEN;
33use crate::executor::backfill::utils::{
34    BackfillProgressPerVnode, BackfillState, compute_bounds, create_builder,
35    get_progress_per_vnode, mapping_chunk, mapping_message, mark_chunk_ref_by_vnode,
36    persist_state_per_vnode, update_pos_by_vnode,
37};
38use crate::executor::prelude::*;
39use crate::task::{CreateMviewProgressReporter, FragmentId};
40
41type Builders = HashMap<VirtualNode, DataChunkBuilder>;
42
43/// Similar to [`super::no_shuffle_backfill::BackfillExecutor`].
44/// Main differences:
45/// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled
46///   independently.
47/// - To synchronize upstream shared buffer, it is initialized with a [`ReplicatedStateTable`].
48pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
49    /// Upstream table
50    upstream_table: ReplicatedStateTable<S, SD>,
51
52    /// Upstream with the same schema with the upstream table.
53    upstream: Executor,
54
55    /// Internal state table for persisting state of backfill state.
56    state_table: StateTable<S>,
57
58    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
59    output_indices: Vec<usize>,
60
61    progress: CreateMviewProgressReporter,
62
63    actor_id: ActorId,
64
65    metrics: Arc<StreamingMetrics>,
66
67    chunk_size: usize,
68
69    rate_limiter: MonitoredRateLimiter,
70
71    /// Fragment id of the fragment this backfill node belongs to.
72    fragment_id: FragmentId,
73}
74
75impl<S, SD> ArrangementBackfillExecutor<S, SD>
76where
77    S: StateStore,
78    SD: ValueRowSerde,
79{
80    #[allow(clippy::too_many_arguments)]
81    #[allow(dead_code)]
82    pub fn new(
83        upstream_table: ReplicatedStateTable<S, SD>,
84        upstream: Executor,
85        state_table: StateTable<S>,
86        output_indices: Vec<usize>,
87        progress: CreateMviewProgressReporter,
88        metrics: Arc<StreamingMetrics>,
89        chunk_size: usize,
90        rate_limit: RateLimit,
91        fragment_id: FragmentId,
92    ) -> Self {
93        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
94        Self {
95            upstream_table,
96            upstream,
97            state_table,
98            output_indices,
99            actor_id: progress.actor_id(),
100            progress,
101            metrics,
102            chunk_size,
103            rate_limiter,
104            fragment_id,
105        }
106    }
107
108    #[try_stream(ok = Message, error = StreamExecutorError)]
109    async fn execute_inner(mut self) {
110        tracing::debug!("backfill executor started");
111        // The primary key columns, in the output columns of the upstream_table scan.
112        // Table scan scans a subset of the columns of the upstream table.
113        let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
114        #[cfg(debug_assertions)]
115        let state_len = self.upstream_table.pk_indices().len() + METADATA_STATE_LEN;
116        let pk_order = self.upstream_table.pk_serde().get_order_types().to_vec();
117        let upstream_table_id = self.upstream_table.table_id();
118        let mut upstream_table = self.upstream_table;
119        let vnodes = upstream_table.vnodes().clone();
120
121        // These builders will build data chunks.
122        // We must supply them with the full datatypes which correspond to
123        // pk + output_indices.
124        let snapshot_data_types = self
125            .upstream
126            .schema()
127            .fields()
128            .iter()
129            .map(|field| field.data_type.clone())
130            .collect_vec();
131        let mut builders: Builders = upstream_table
132            .vnodes()
133            .iter_vnodes()
134            .map(|vnode| {
135                let builder = create_builder(
136                    self.rate_limiter.rate_limit(),
137                    self.chunk_size,
138                    snapshot_data_types.clone(),
139                );
140                (vnode, builder)
141            })
142            .collect();
143
144        let mut upstream = self.upstream.execute();
145
146        // Poll the upstream to get the first barrier.
147        let first_barrier = expect_first_barrier(&mut upstream).await?;
148        let mut global_pause = first_barrier.is_pause_on_startup();
149        let mut backfill_paused = first_barrier.is_backfill_pause_on_startup(self.fragment_id);
150        let first_epoch = first_barrier.epoch;
151        let is_newly_added = first_barrier.is_newly_added(self.actor_id);
152        // The first barrier message should be propagated.
153        yield Message::Barrier(first_barrier);
154
155        self.state_table.init_epoch(first_epoch).await?;
156
157        let progress_per_vnode = get_progress_per_vnode(&self.state_table).await?;
158
159        let is_completely_finished = progress_per_vnode.iter().all(|(_, p)| {
160            matches!(
161                p.current_state(),
162                &BackfillProgressPerVnode::Completed { .. }
163            )
164        });
165        if is_completely_finished {
166            assert!(!is_newly_added);
167        }
168
169        upstream_table.init_epoch(first_epoch).await?;
170
171        let mut backfill_state: BackfillState = progress_per_vnode.into();
172
173        let to_backfill = !is_completely_finished;
174
175        // If no need backfill, but state was still "unfinished" we need to finish it.
176        // So we just update the state + progress to meta at the next barrier to finish progress,
177        // and forward other messages.
178        //
179        // Reason for persisting on second barrier rather than first:
180        // We can't update meta with progress as finished until state_table
181        // has been updated.
182        // We also can't update state_table in first epoch, since state_table
183        // expects to have been initialized in previous epoch.
184
185        // The epoch used to snapshot read upstream mv.
186        let mut snapshot_read_epoch;
187
188        // Keep track of rows from the snapshot.
189        let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();
190
191        // Arrangement Backfill Algorithm:
192        //
193        //   backfill_stream
194        //  /               \
195        // upstream       snapshot
196        //
197        // We construct a backfill stream with upstream as its left input and mv snapshot read
198        // stream as its right input. When a chunk comes from upstream, we will buffer it.
199        //
200        // When a barrier comes from upstream:
201        //  Immediately break out of backfill loop.
202        //  - For each row of the upstream chunk buffer, compute vnode.
203        //  - Get the `current_pos` corresponding to the vnode. Forward it to downstream if its pk
204        //    <= `current_pos`, otherwise ignore it.
205        //  - Flush all buffered upstream_chunks to replicated state table.
206        //  - Update the `snapshot_read_epoch`.
207        //  - Reconstruct the whole backfill stream with upstream and new mv snapshot read stream
208        //    with the `snapshot_read_epoch`.
209        //
210        // When a chunk comes from snapshot, we forward it to the downstream and raise
211        // `current_pos`.
212        //
213        // When we reach the end of the snapshot read stream, it means backfill has been
214        // finished.
215        //
216        // Once the backfill loop ends, we forward the upstream directly to the downstream.
217        if to_backfill {
218            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
219            let mut pending_barrier: Option<Barrier> = None;
220
221            let metrics = self
222                .metrics
223                .new_backfill_metrics(upstream_table_id, self.actor_id);
224
225            'backfill_loop: loop {
226                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
227                let mut cur_barrier_upstream_processed_rows: u64 = 0;
228                let mut snapshot_read_complete = false;
229                let mut has_snapshot_read = false;
230
231                // NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be
232                // dropped. Then we can write to `upstream_table` on barrier in the
233                // next block.
234                {
235                    let left_upstream = upstream.by_ref().map(Either::Left);
236
237                    // Check if stream paused
238                    let paused = global_pause
239                        || backfill_paused
240                        || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
241                    // Create the snapshot stream
242                    let right_snapshot = pin!(
243                        Self::make_snapshot_stream(
244                            &upstream_table,
245                            backfill_state.clone(), // FIXME: Use mutable reference instead.
246                            paused,
247                            &self.rate_limiter,
248                        )
249                        .map(Either::Right)
250                    );
251
252                    // Prefer to select upstream, so we can stop snapshot stream as soon as the
253                    // barrier comes.
254                    let mut backfill_stream =
255                        select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
256                            stream::PollNext::Left
257                        });
258
259                    #[for_await]
260                    for either in &mut backfill_stream {
261                        match either {
262                            // Upstream
263                            Either::Left(msg) => {
264                                match msg? {
265                                    Message::Barrier(barrier) => {
266                                        // We have to process the barrier outside of the loop.
267                                        // This is because our state_table reference is still live
268                                        // here, we have to break the loop to drop it,
269                                        // so we can do replication of upstream state_table.
270                                        pending_barrier = Some(barrier);
271
272                                        // Break the for loop and start a new snapshot read stream.
273                                        break;
274                                    }
275                                    Message::Chunk(chunk) => {
276                                        // Buffer the upstream chunk.
277                                        upstream_chunk_buffer.push(chunk.compact_vis());
278                                    }
279                                    Message::Watermark(_) => {
280                                        // Ignore watermark during backfill.
281                                    }
282                                }
283                            }
284                            // Snapshot read
285                            Either::Right(msg) => {
286                                has_snapshot_read = true;
287                                match msg? {
288                                    None => {
289                                        // Consume remaining rows in the builder.
290                                        for (vnode, builder) in &mut builders {
291                                            if let Some(data_chunk) = builder.consume_all() {
292                                                let chunk = Self::handle_snapshot_chunk(
293                                                    data_chunk,
294                                                    *vnode,
295                                                    &pk_in_output_indices,
296                                                    &mut backfill_state,
297                                                    &mut cur_barrier_snapshot_processed_rows,
298                                                    &mut total_snapshot_processed_rows,
299                                                    &self.output_indices,
300                                                )?;
301                                                tracing::trace!(
302                                                    source = "snapshot",
303                                                    state = "finish_backfill_stream",
304                                                    action = "drain_snapshot_buffers",
305                                                    ?vnode,
306                                                    "{:#?}",
307                                                    chunk,
308                                                );
309                                                yield Message::Chunk(chunk);
310                                            }
311                                        }
312
313                                        // End of the snapshot read stream.
314                                        // We should not mark the chunk anymore,
315                                        // otherwise, we will ignore some rows
316                                        // in the buffer. Here we choose to never mark the chunk.
317                                        // Consume with the renaming stream buffer chunk without
318                                        // mark.
319                                        for chunk in upstream_chunk_buffer.drain(..) {
320                                            let chunk_cardinality = chunk.cardinality() as u64;
321                                            cur_barrier_upstream_processed_rows +=
322                                                chunk_cardinality;
323                                            let chunk = mapping_chunk(chunk, &self.output_indices);
324                                            tracing::trace!(
325                                                source = "upstream",
326                                                state = "finish_backfill_stream",
327                                                action = "drain_upstream_buffer",
328                                                "{:#?}",
329                                                chunk,
330                                            );
331                                            yield Message::Chunk(chunk);
332                                        }
333                                        metrics
334                                            .backfill_snapshot_read_row_count
335                                            .inc_by(cur_barrier_snapshot_processed_rows);
336                                        metrics
337                                            .backfill_upstream_output_row_count
338                                            .inc_by(cur_barrier_upstream_processed_rows);
339                                        break 'backfill_loop;
340                                    }
341                                    Some((vnode, row)) => {
342                                        let builder = builders.get_mut(&vnode).unwrap();
343                                        if let Some(chunk) = builder.append_one_row(row) {
344                                            let chunk = Self::handle_snapshot_chunk(
345                                                chunk,
346                                                vnode,
347                                                &pk_in_output_indices,
348                                                &mut backfill_state,
349                                                &mut cur_barrier_snapshot_processed_rows,
350                                                &mut total_snapshot_processed_rows,
351                                                &self.output_indices,
352                                            )?;
353                                            tracing::trace!(
354                                                source = "snapshot",
355                                                state = "process_backfill_stream",
356                                                action = "drain_full_snapshot_buffer",
357                                                "{:#?}",
358                                                chunk,
359                                            );
360                                            yield Message::Chunk(chunk);
361                                        }
362                                    }
363                                }
364                            }
365                        }
366                    }
367
368                    // Before processing barrier, if did not snapshot read,
369                    // do a snapshot read first.
370                    // This is so we don't lose the tombstone iteration progress.
371                    // Or if s3 read latency is high, we don't fail to read from s3.
372                    //
373                    // If paused, we can't read any snapshot records, skip this.
374                    //
375                    // If rate limit is set, respect the rate limit, check if we can read,
376                    // If we can't, skip it. If no rate limit set, we can read.
377                    let rate_limit_ready = self.rate_limiter.check(1).is_ok();
378                    if !has_snapshot_read && !paused && rate_limit_ready {
379                        debug_assert!(builders.values().all(|b| b.is_empty()));
380                        let (_, snapshot) = backfill_stream.into_inner();
381                        #[for_await]
382                        for msg in snapshot {
383                            let Either::Right(msg) = msg else {
384                                bail!("BUG: snapshot_read contains upstream messages");
385                            };
386                            match msg? {
387                                None => {
388                                    // End of the snapshot read stream.
389                                    // We let the barrier handling logic take care of upstream updates.
390                                    // But we still want to exit backfill loop, so we mark snapshot read complete.
391                                    snapshot_read_complete = true;
392                                    break;
393                                }
394                                Some((vnode, row)) => {
395                                    let builder = builders.get_mut(&vnode).unwrap();
396                                    if let Some(chunk) = builder.append_one_row(row) {
397                                        let chunk = Self::handle_snapshot_chunk(
398                                            chunk,
399                                            vnode,
400                                            &pk_in_output_indices,
401                                            &mut backfill_state,
402                                            &mut cur_barrier_snapshot_processed_rows,
403                                            &mut total_snapshot_processed_rows,
404                                            &self.output_indices,
405                                        )?;
406                                        tracing::trace!(
407                                            source = "snapshot",
408                                            state = "process_backfill_stream",
409                                            action = "snapshot_read_at_least_one",
410                                            "{:#?}",
411                                            chunk,
412                                        );
413                                        yield Message::Chunk(chunk);
414                                    }
415
416                                    break;
417                                }
418                            }
419                        }
420                    }
421                }
422
423                // Process barrier
424                // When we break out of inner backfill_stream loop, it means we have a barrier.
425                // If there are no updates and there are no snapshots left,
426                // we already finished backfill and should have exited the outer backfill loop.
427                let barrier = match pending_barrier.take() {
428                    Some(barrier) => barrier,
429                    None => bail!("BUG: current_backfill loop exited without a barrier"),
430                };
431
432                // Process barrier:
433                // - consume snapshot rows left in builder.
434                // - consume upstream buffer chunk
435                // - handle mutations
436                // - switch snapshot
437
438                // consume snapshot rows left in builder.
439                // NOTE(kwannoel): `zip_eq_debug` does not work here,
440                // we encounter "higher-ranked lifetime error".
441                for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| {
442                    let chunk = b.consume_all().map(|chunk| {
443                        let ops = vec![Op::Insert; chunk.capacity()];
444                        StreamChunk::from_parts(ops, chunk)
445                    });
446                    (vnode, chunk)
447                }) {
448                    if let Some(chunk) = chunk {
449                        let chunk_cardinality = chunk.cardinality() as u64;
450                        // Raise the current position.
451                        // As snapshot read streams are ordered by pk, so we can
452                        // just use the last row to update `current_pos`.
453                        update_pos_by_vnode(
454                            *vnode,
455                            &chunk,
456                            &pk_in_output_indices,
457                            &mut backfill_state,
458                            chunk_cardinality,
459                        )?;
460
461                        cur_barrier_snapshot_processed_rows += chunk_cardinality;
462                        total_snapshot_processed_rows += chunk_cardinality;
463                        let chunk = mapping_chunk(chunk, &self.output_indices);
464                        tracing::trace!(
465                            source = "snapshot",
466                            state = "process_barrier",
467                            action = "consume_remaining_snapshot",
468                            "{:#?}",
469                            chunk,
470                        );
471                        yield Message::Chunk(chunk);
472                    }
473                }
474
475                // consume upstream buffer chunk
476                for chunk in upstream_chunk_buffer.drain(..) {
477                    cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
478                    // FIXME: Replace with `snapshot_is_processed`
479                    // Flush downstream.
480                    // If no current_pos, means no snapshot processed yet.
481                    // Also means we don't need propagate any updates <= current_pos.
482                    if backfill_state.has_progress() {
483                        let chunk = mapping_chunk(
484                            mark_chunk_ref_by_vnode(
485                                &chunk,
486                                &backfill_state,
487                                &pk_in_output_indices,
488                                &upstream_table,
489                                &pk_order,
490                            )?,
491                            &self.output_indices,
492                        );
493                        tracing::trace!(
494                            source = "upstream",
495                            state = "process_barrier",
496                            action = "consume_remaining_upstream",
497                            "{:#?}",
498                            chunk,
499                        );
500                        yield Message::Chunk(chunk);
501                    }
502
503                    // Replicate
504                    upstream_table.write_chunk(chunk);
505                }
506
507                upstream_table
508                    .commit_assert_no_update_vnode_bitmap(barrier.epoch)
509                    .await?;
510
511                metrics
512                    .backfill_snapshot_read_row_count
513                    .inc_by(cur_barrier_snapshot_processed_rows);
514                metrics
515                    .backfill_upstream_output_row_count
516                    .inc_by(cur_barrier_upstream_processed_rows);
517
518                // Update snapshot read epoch.
519                snapshot_read_epoch = barrier.epoch.prev;
520
521                // TODO(kwannoel): Not sure if this holds for arrangement backfill.
522                // May need to revisit it.
523                // Need to check it after scale-in / scale-out.
524                self.progress.update(
525                    barrier.epoch,
526                    snapshot_read_epoch,
527                    total_snapshot_processed_rows,
528                );
529
530                // Persist state on barrier
531                persist_state_per_vnode(
532                    barrier.epoch,
533                    &mut self.state_table,
534                    &mut backfill_state,
535                    #[cfg(debug_assertions)]
536                    state_len,
537                    vnodes.iter_vnodes(),
538                )
539                .await?;
540
541                tracing::trace!(
542                    barrier = ?barrier,
543                    "barrier persisted"
544                );
545
546                // handle mutations
547                if let Some(mutation) = barrier.mutation.as_deref() {
548                    use crate::executor::Mutation;
549                    match mutation {
550                        Mutation::Pause => {
551                            global_pause = true;
552                        }
553                        Mutation::Resume => {
554                            global_pause = false;
555                        }
556                        Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
557                            if fragment_ids.contains(&self.fragment_id) {
558                                backfill_paused = false;
559                            }
560                        }
561                        Mutation::Throttle(fragment_to_apply) => {
562                            if let Some(entry) = fragment_to_apply.get(&self.fragment_id)
563                                && entry.throttle_type() == ThrottleType::Backfill
564                            {
565                                let new_rate_limit = entry.rate_limit.into();
566                                let old_rate_limit = self.rate_limiter.update(new_rate_limit);
567                                if old_rate_limit != new_rate_limit {
568                                    tracing::info!(
569                                        old_rate_limit = ?old_rate_limit,
570                                        new_rate_limit = ?new_rate_limit,
571                                        %upstream_table_id,
572                                        actor_id = %self.actor_id,
573                                        "backfill rate limit changed",
574                                    );
575                                    builders = upstream_table
576                                        .vnodes()
577                                        .iter_vnodes()
578                                        .map(|vnode| {
579                                            let builder = create_builder(
580                                                new_rate_limit,
581                                                self.chunk_size,
582                                                snapshot_data_types.clone(),
583                                            );
584                                            (vnode, builder)
585                                        })
586                                        .collect();
587                                }
588                            }
589                        }
590                        _ => {}
591                    }
592                }
593
594                yield Message::Barrier(barrier);
595
596                // We will switch snapshot at the start of the next iteration of the backfill loop.
597                // Unless snapshot read is already completed.
598                if snapshot_read_complete {
599                    break 'backfill_loop;
600                }
601            }
602        }
603
604        tracing::debug!("snapshot read finished, wait to commit state on next barrier");
605
606        // Update our progress as finished in state table.
607
608        // Wait for first barrier to come after backfill is finished.
609        // So we can update our progress + persist the status.
610        while let Some(Ok(msg)) = upstream.next().await {
611            if let Some(msg) = mapping_message(msg, &self.output_indices) {
612                // If not finished then we need to update state, otherwise no need.
613                if let Message::Barrier(barrier) = &msg {
614                    if is_completely_finished {
615                        // If already finished, no need to persist any state. But we need to advance the epoch anyway
616                        self.state_table
617                            .commit_assert_no_update_vnode_bitmap(barrier.epoch)
618                            .await?;
619                    } else {
620                        // If snapshot was empty, we do not need to backfill,
621                        // but we still need to persist the finished state.
622                        // We currently persist it on the second barrier here rather than first.
623                        // This is because we can't update state table in first epoch,
624                        // since it expects to have been initialized in previous epoch
625                        // (there's no epoch before the first epoch).
626                        for vnode in upstream_table.vnodes().iter_vnodes() {
627                            backfill_state
628                                .finish_progress(vnode, upstream_table.pk_indices().len());
629                        }
630
631                        persist_state_per_vnode(
632                            barrier.epoch,
633                            &mut self.state_table,
634                            &mut backfill_state,
635                            #[cfg(debug_assertions)]
636                            state_len,
637                            vnodes.iter_vnodes(),
638                        )
639                        .await?;
640                    }
641
642                    self.progress
643                        .finish(barrier.epoch, total_snapshot_processed_rows);
644                    yield msg;
645                    break;
646                }
647                // Allow other messages to pass through.
648                // We won't yield twice here, since if there's a barrier,
649                // we will always break out of the loop.
650                yield msg;
651            }
652        }
653
654        tracing::debug!("backfill finished");
655
656        // After progress finished + state persisted,
657        // we can forward messages directly to the downstream,
658        // as backfill is finished.
659        #[for_await]
660        for msg in upstream {
661            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
662                if let Message::Barrier(barrier) = &msg {
663                    // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
664                    self.state_table
665                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
666                        .await?;
667                }
668                yield msg;
669            }
670        }
671    }
672
673    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
674    async fn make_snapshot_stream<'a>(
675        upstream_table: &'a ReplicatedStateTable<S, SD>,
676        backfill_state: BackfillState,
677        paused: bool,
678        rate_limiter: &'a MonitoredRateLimiter,
679    ) {
680        if paused {
681            #[for_await]
682            for _ in tokio_stream::pending() {
683                bail!("BUG: paused stream should not yield");
684            }
685        } else {
686            // Checked the rate limit is not zero.
687            #[for_await]
688            for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
689                let r = r?;
690                rate_limiter.wait(1).await;
691                yield r;
692            }
693        }
694    }
695
696    fn handle_snapshot_chunk(
697        chunk: DataChunk,
698        vnode: VirtualNode,
699        pk_in_output_indices: &[usize],
700        backfill_state: &mut BackfillState,
701        cur_barrier_snapshot_processed_rows: &mut u64,
702        total_snapshot_processed_rows: &mut u64,
703        output_indices: &[usize],
704    ) -> StreamExecutorResult<StreamChunk> {
705        let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
706        // Raise the current position.
707        // As snapshot read streams are ordered by pk, so we can
708        // just use the last row to update `current_pos`.
709        let snapshot_row_count_delta = chunk.cardinality() as u64;
710        update_pos_by_vnode(
711            vnode,
712            &chunk,
713            pk_in_output_indices,
714            backfill_state,
715            snapshot_row_count_delta,
716        )?;
717
718        let chunk_cardinality = chunk.cardinality() as u64;
719        *cur_barrier_snapshot_processed_rows += chunk_cardinality;
720        *total_snapshot_processed_rows += chunk_cardinality;
721        Ok(mapping_chunk(chunk, output_indices))
722    }
723
724    /// Read snapshot per vnode.
725    /// These streams should be sorted in storage layer.
726    /// 1. Get row iterator / vnode.
727    /// 2. Merge it with `select_all`.
728    /// 3. Change it into a chunk iterator with `iter_chunks`.
729    /// This means it should fetch a row from each iterator to form a chunk.
730    ///
731    /// We interleave at chunk per vnode level rather than rows.
732    /// This is so that we can compute `current_pos` once per chunk, since they correspond to 1
733    /// vnode.
734    ///
735    /// The stream contains pairs of `(VirtualNode, StreamChunk)`.
736    /// The `VirtualNode` is the vnode that the chunk belongs to.
737    /// The `StreamChunk` is the chunk that contains the rows from the vnode.
738    /// If it's `None`, it means the vnode has no more rows for this snapshot read.
739    ///
740    /// The `snapshot_read_epoch` is supplied as a parameter for `state_table`.
741    /// It is required to ensure we read a fully-checkpointed snapshot the **first time**.
742    ///
743    /// The rows from upstream snapshot read will be buffered inside the `builder`.
744    /// If snapshot is dropped before its rows are consumed,
745    /// remaining data in `builder` must be flushed manually.
746    /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
747    /// present, Then when we flush we contain duplicate rows.
748    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
749    async fn snapshot_read_per_vnode(
750        upstream_table: &ReplicatedStateTable<S, SD>,
751        backfill_state: BackfillState,
752    ) {
753        let mut iterators = vec![];
754        for vnode in upstream_table.vnodes().iter_vnodes() {
755            let backfill_progress = backfill_state.get_progress(&vnode)?;
756            let current_pos = match backfill_progress {
757                BackfillProgressPerVnode::NotStarted => None,
758                BackfillProgressPerVnode::Completed { .. } => {
759                    continue;
760                }
761                BackfillProgressPerVnode::InProgress { current_pos, .. } => {
762                    Some(current_pos.clone())
763                }
764            };
765
766            let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
767            if range_bounds.is_none() {
768                continue;
769            }
770            let range_bounds = range_bounds.unwrap();
771
772            tracing::trace!(
773                vnode = ?vnode,
774                current_pos = ?current_pos,
775                range_bounds = ?range_bounds,
776                "iter_with_vnode_and_output_indices"
777            );
778            let vnode_row_iter = upstream_table
779                .iter_with_vnode_and_output_indices(
780                    vnode,
781                    &range_bounds,
782                    PrefetchOptions::prefetch_for_small_range_scan(),
783                )
784                .await?;
785
786            let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));
787
788            let vnode_row_iter = Box::pin(vnode_row_iter);
789
790            iterators.push(vnode_row_iter);
791        }
792
793        // TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial.
794        let vnode_row_iter = select_all(iterators);
795
796        #[for_await]
797        for vnode_and_row in vnode_row_iter {
798            yield Some(vnode_and_row?);
799        }
800        yield None;
801        return Ok(());
802    }
803}
804
805impl<S, SD> Execute for ArrangementBackfillExecutor<S, SD>
806where
807    S: StateStore,
808    SD: ValueRowSerde,
809{
810    fn execute(self: Box<Self>) -> BoxedMessageStream {
811        self.execute_inner().boxed()
812    }
813}