risingwave_stream/executor/backfill/
no_shuffle_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use futures::stream;
17use futures::stream::select_with_strategy;
18use risingwave_common::array::{DataChunk, Op};
19use risingwave_common::hash::VnodeBitmapExt;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::{bail, row};
22use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
23use risingwave_hummock_sdk::HummockReadEpoch;
24use risingwave_storage::store::PrefetchOptions;
25use risingwave_storage::table::batch_table::BatchTable;
26
27use crate::executor::backfill::utils;
28use crate::executor::backfill::utils::{
29    METADATA_STATE_LEN, compute_bounds, construct_initial_finished_state, create_builder,
30    get_new_pos, mapping_chunk, mapping_message, mark_chunk,
31};
32use crate::executor::prelude::*;
33use crate::task::{CreateMviewProgressReporter, FragmentId};
34
35/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
36/// We can decode that into `BackfillState` on recovery.
37#[derive(Debug, Eq, PartialEq)]
38pub struct BackfillState {
39    current_pos: Option<OwnedRow>,
40    old_state: Option<Vec<Datum>>,
41    is_finished: bool,
42    row_count: u64,
43}
44
45/// An implementation of the [RFC: Use Backfill To Let Mv On Mv Stream Again](https://github.com/risingwavelabs/rfcs/pull/13).
46/// `BackfillExecutor` is used to create a materialized view on another materialized view.
47///
48/// It can only buffer chunks between two barriers instead of unbundled memory usage of
49/// `RearrangedChainExecutor`.
50///
51/// It uses the latest epoch to read the snapshot of the upstream mv during two barriers and all the
52/// `StreamChunk` of the snapshot read will forward to the downstream.
53///
54/// It uses `current_pos` to record the progress of the backfill (the pk of the upstream mv) and
55/// `current_pos` is initiated as an empty `Row`.
56///
57/// All upstream messages during the two barriers interval will be buffered and decide to forward or
58/// ignore based on the `current_pos` at the end of the later barrier. Once `current_pos` reaches
59/// the end of the upstream mv pk, the backfill would finish.
60///
61/// Notice:
62/// The pk we are talking about here refers to the storage primary key.
63/// We rely on the scheduler to schedule the `BackfillExecutor` together with the upstream mv/table
64/// in the same worker, so that we can read uncommitted data from the upstream table without
65/// waiting.
66pub struct BackfillExecutor<S: StateStore> {
67    /// Upstream table
68    upstream_table: BatchTable<S>,
69    /// Upstream with the same schema with the upstream table.
70    upstream: Executor,
71
72    /// Internal state table for persisting state of backfill state.
73    state_table: Option<StateTable<S>>,
74
75    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
76    output_indices: Vec<usize>,
77
78    /// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it.
79    progress: CreateMviewProgressReporter,
80
81    actor_id: ActorId,
82
83    metrics: Arc<StreamingMetrics>,
84
85    chunk_size: usize,
86
87    rate_limiter: MonitoredRateLimiter,
88
89    /// Fragment id of the fragment this backfill node belongs to.
90    fragment_id: FragmentId,
91}
92
93impl<S> BackfillExecutor<S>
94where
95    S: StateStore,
96{
97    #[allow(clippy::too_many_arguments)]
98    pub fn new(
99        upstream_table: BatchTable<S>,
100        upstream: Executor,
101        state_table: Option<StateTable<S>>,
102        output_indices: Vec<usize>,
103        progress: CreateMviewProgressReporter,
104        metrics: Arc<StreamingMetrics>,
105        chunk_size: usize,
106        rate_limit: RateLimit,
107        fragment_id: FragmentId,
108    ) -> Self {
109        let actor_id = progress.actor_id();
110        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
111        Self {
112            upstream_table,
113            upstream,
114            state_table,
115            output_indices,
116            progress,
117            actor_id,
118            metrics,
119            chunk_size,
120            rate_limiter,
121            fragment_id,
122        }
123    }
124
125    #[try_stream(ok = Message, error = StreamExecutorError)]
126    async fn execute_inner(mut self) {
127        // The primary key columns.
128        // We receive a pruned chunk from the upstream table,
129        // which will only contain output columns of the scan on the upstream table.
130        // The pk indices specify the pk columns of the pruned chunk.
131        let pk_indices = self.upstream_table.pk_in_output_indices().unwrap();
132
133        let state_len = pk_indices.len() + METADATA_STATE_LEN;
134
135        let pk_order = self.upstream_table.pk_serializer().get_order_types();
136
137        let upstream_table_id = self.upstream_table.table_id().table_id;
138
139        let mut upstream = self.upstream.execute();
140
141        // Poll the upstream to get the first barrier.
142        let first_barrier = expect_first_barrier(&mut upstream).await?;
143        let mut global_pause = first_barrier.is_pause_on_startup();
144        let mut backfill_paused = first_barrier.is_backfill_pause_on_startup(self.fragment_id);
145        let first_epoch = first_barrier.epoch;
146        let init_epoch = first_barrier.epoch.prev;
147        // The first barrier message should be propagated.
148        yield Message::Barrier(first_barrier);
149
150        if let Some(state_table) = self.state_table.as_mut() {
151            state_table.init_epoch(first_epoch).await?;
152        }
153
154        let BackfillState {
155            mut current_pos,
156            is_finished,
157            row_count,
158            mut old_state,
159        } = Self::recover_backfill_state(self.state_table.as_ref(), pk_indices.len()).await?;
160        tracing::trace!(is_finished, row_count, "backfill state recovered");
161
162        let data_types = self.upstream_table.schema().data_types();
163
164        // Chunk builder will be instantiated with min(rate_limit, self.chunk_size) as the chunk's max size.
165        let mut builder = create_builder(
166            self.rate_limiter.rate_limit(),
167            self.chunk_size,
168            data_types.clone(),
169        );
170
171        // Use this buffer to construct state,
172        // which will then be persisted.
173        let mut current_state: Vec<Datum> = vec![None; state_len];
174
175        // If no need backfill, but state was still "unfinished" we need to finish it.
176        // So we just update the state + progress to meta at the next barrier to finish progress,
177        // and forward other messages.
178        //
179        // Reason for persisting on second barrier rather than first:
180        // We can't update meta with progress as finished until state_table
181        // has been updated.
182        // We also can't update state_table in first epoch, since state_table
183        // expects to have been initialized in previous epoch.
184
185        // The epoch used to snapshot read upstream mv.
186        let mut snapshot_read_epoch = init_epoch;
187
188        // Keep track of rows from the snapshot.
189        let mut total_snapshot_processed_rows: u64 = row_count;
190
191        // Backfill Algorithm:
192        //
193        //   backfill_stream
194        //  /               \
195        // upstream       snapshot
196        //
197        // We construct a backfill stream with upstream as its left input and mv snapshot read
198        // stream as its right input. When a chunk comes from upstream, we will buffer it.
199        //
200        // When a barrier comes from upstream:
201        //  - Update the `snapshot_read_epoch`.
202        //  - For each row of the upstream chunk buffer, forward it to downstream if its pk <=
203        //    `current_pos`, otherwise ignore it.
204        //  - reconstruct the whole backfill stream with upstream and new mv snapshot read stream
205        //    with the `snapshot_read_epoch`.
206        //
207        // When a chunk comes from snapshot, we forward it to the downstream and raise
208        // `current_pos`.
209        //
210        // When we reach the end of the snapshot read stream, it means backfill has been
211        // finished.
212        //
213        // Once the backfill loop ends, we forward the upstream directly to the downstream.
214        if !is_finished {
215            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
216            let mut pending_barrier: Option<Barrier> = None;
217
218            let metrics = self
219                .metrics
220                .new_backfill_metrics(upstream_table_id, self.actor_id);
221
222            'backfill_loop: loop {
223                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
224                let mut cur_barrier_upstream_processed_rows: u64 = 0;
225                let mut snapshot_read_complete = false;
226                let mut has_snapshot_read = false;
227
228                // We should not buffer rows from previous epoch, else we can have duplicates.
229                assert!(upstream_chunk_buffer.is_empty());
230
231                {
232                    let left_upstream = upstream.by_ref().map(Either::Left);
233                    let paused = global_pause
234                        || backfill_paused
235                        || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
236                    let right_snapshot = pin!(
237                        Self::make_snapshot_stream(
238                            &self.upstream_table,
239                            snapshot_read_epoch,
240                            current_pos.clone(),
241                            paused,
242                            &self.rate_limiter,
243                        )
244                        .map(Either::Right)
245                    );
246
247                    // Prefer to select upstream, so we can stop snapshot stream as soon as the
248                    // barrier comes.
249                    let mut backfill_stream =
250                        select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
251                            stream::PollNext::Left
252                        });
253
254                    #[for_await]
255                    for either in &mut backfill_stream {
256                        match either {
257                            // Upstream
258                            Either::Left(msg) => {
259                                match msg? {
260                                    Message::Barrier(barrier) => {
261                                        // We have to process barrier outside of the loop.
262                                        // This is because the backfill stream holds a mutable
263                                        // reference to our chunk builder.
264                                        // We want to create another mutable reference
265                                        // to flush remaining chunks from the chunk builder
266                                        // on barrier.
267                                        // Hence we break here and process it after this block.
268                                        pending_barrier = Some(barrier);
269                                        break;
270                                    }
271                                    Message::Chunk(chunk) => {
272                                        // Buffer the upstream chunk.
273                                        upstream_chunk_buffer.push(chunk.compact());
274                                    }
275                                    Message::Watermark(_) => {
276                                        // Ignore watermark during backfill.
277                                    }
278                                }
279                            }
280                            // Snapshot read
281                            Either::Right(msg) => {
282                                has_snapshot_read = true;
283                                match msg? {
284                                    None => {
285                                        // Consume remaining rows in the builder.
286                                        if let Some(data_chunk) = builder.consume_all() {
287                                            yield Message::Chunk(Self::handle_snapshot_chunk(
288                                                data_chunk,
289                                                &mut current_pos,
290                                                &mut cur_barrier_snapshot_processed_rows,
291                                                &mut total_snapshot_processed_rows,
292                                                &pk_indices,
293                                                &self.output_indices,
294                                            ));
295                                        }
296
297                                        // End of the snapshot read stream.
298                                        // We should not mark the chunk anymore,
299                                        // otherwise, we will ignore some rows
300                                        // in the buffer. Here we choose to never mark the chunk.
301                                        // Consume with the renaming stream buffer chunk without
302                                        // mark.
303                                        for chunk in upstream_chunk_buffer.drain(..) {
304                                            let chunk_cardinality = chunk.cardinality() as u64;
305                                            cur_barrier_upstream_processed_rows +=
306                                                chunk_cardinality;
307                                            yield Message::Chunk(mapping_chunk(
308                                                chunk,
309                                                &self.output_indices,
310                                            ));
311                                        }
312                                        metrics
313                                            .backfill_snapshot_read_row_count
314                                            .inc_by(cur_barrier_snapshot_processed_rows);
315                                        metrics
316                                            .backfill_upstream_output_row_count
317                                            .inc_by(cur_barrier_upstream_processed_rows);
318                                        break 'backfill_loop;
319                                    }
320                                    Some(record) => {
321                                        // Buffer the snapshot read row.
322                                        if let Some(data_chunk) = builder.append_one_row(record) {
323                                            yield Message::Chunk(Self::handle_snapshot_chunk(
324                                                data_chunk,
325                                                &mut current_pos,
326                                                &mut cur_barrier_snapshot_processed_rows,
327                                                &mut total_snapshot_processed_rows,
328                                                &pk_indices,
329                                                &self.output_indices,
330                                            ));
331                                        }
332                                    }
333                                }
334                            }
335                        }
336                    }
337
338                    // Before processing barrier, if did not snapshot read,
339                    // do a snapshot read first.
340                    // This is so we don't lose the tombstone iteration progress.
341                    // If paused, we also can't read any snapshot records.
342                    if !has_snapshot_read && !paused {
343                        assert!(
344                            builder.is_empty(),
345                            "Builder should be empty if no snapshot read"
346                        );
347                        let (_, snapshot) = backfill_stream.into_inner();
348                        #[for_await]
349                        for msg in snapshot {
350                            let Either::Right(msg) = msg else {
351                                bail!("BUG: snapshot_read contains upstream messages");
352                            };
353                            match msg? {
354                                None => {
355                                    // End of the snapshot read stream.
356                                    // We let the barrier handling logic take care of upstream updates.
357                                    // But we still want to exit backfill loop, so we mark snapshot read complete.
358                                    snapshot_read_complete = true;
359                                    break;
360                                }
361                                Some(row) => {
362                                    let chunk = DataChunk::from_rows(&[row], &data_types);
363                                    yield Message::Chunk(Self::handle_snapshot_chunk(
364                                        chunk,
365                                        &mut current_pos,
366                                        &mut cur_barrier_snapshot_processed_rows,
367                                        &mut total_snapshot_processed_rows,
368                                        &pk_indices,
369                                        &self.output_indices,
370                                    ));
371                                    break;
372                                }
373                            }
374                        }
375                    }
376                }
377                // When we break out of inner backfill_stream loop, it means we have a barrier.
378                // If there are no updates and there are no snapshots left,
379                // we already finished backfill and should have exited the outer backfill loop.
380                let barrier = match pending_barrier.take() {
381                    Some(barrier) => barrier,
382                    None => bail!("BUG: current_backfill loop exited without a barrier"),
383                };
384
385                // Process barrier:
386                // - consume snapshot rows left in builder
387                // - consume upstream buffer chunk
388                // - switch snapshot
389
390                // Consume snapshot rows left in builder
391                let chunk = builder.consume_all();
392                if let Some(chunk) = chunk {
393                    yield Message::Chunk(Self::handle_snapshot_chunk(
394                        chunk,
395                        &mut current_pos,
396                        &mut cur_barrier_snapshot_processed_rows,
397                        &mut total_snapshot_processed_rows,
398                        &pk_indices,
399                        &self.output_indices,
400                    ));
401                }
402
403                // Consume upstream buffer chunk
404                // If no current_pos, means we did not process any snapshot
405                // yet. In that case
406                // we can just ignore the upstream buffer chunk, but still need to clean it.
407                if let Some(current_pos) = &current_pos {
408                    for chunk in upstream_chunk_buffer.drain(..) {
409                        cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
410                        yield Message::Chunk(mapping_chunk(
411                            mark_chunk(chunk, current_pos, &pk_indices, pk_order),
412                            &self.output_indices,
413                        ));
414                    }
415                } else {
416                    upstream_chunk_buffer.clear()
417                }
418
419                metrics
420                    .backfill_snapshot_read_row_count
421                    .inc_by(cur_barrier_snapshot_processed_rows);
422                metrics
423                    .backfill_upstream_output_row_count
424                    .inc_by(cur_barrier_upstream_processed_rows);
425
426                // Update snapshot read epoch.
427                snapshot_read_epoch = barrier.epoch.prev;
428
429                self.progress.update(
430                    barrier.epoch,
431                    snapshot_read_epoch,
432                    total_snapshot_processed_rows,
433                );
434
435                // Persist state on barrier
436                Self::persist_state(
437                    barrier.epoch,
438                    &mut self.state_table,
439                    false,
440                    &current_pos,
441                    total_snapshot_processed_rows,
442                    &mut old_state,
443                    &mut current_state,
444                )
445                .await?;
446
447                tracing::trace!(
448                    epoch = ?barrier.epoch,
449                    ?current_pos,
450                    total_snapshot_processed_rows,
451                    "Backfill state persisted"
452                );
453
454                // Update snapshot read chunk builder.
455                if let Some(mutation) = barrier.mutation.as_deref() {
456                    match mutation {
457                        Mutation::Pause => {
458                            global_pause = true;
459                        }
460                        Mutation::Resume => {
461                            global_pause = false;
462                        }
463                        Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
464                            if fragment_ids.contains(&self.fragment_id) {
465                                backfill_paused = false;
466                            }
467                        }
468                        Mutation::Throttle(actor_to_apply) => {
469                            let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
470                            if let Some(new_rate_limit) = new_rate_limit_entry {
471                                let new_rate_limit = (*new_rate_limit).into();
472                                let old_rate_limit = self.rate_limiter.update(new_rate_limit);
473                                if old_rate_limit != new_rate_limit {
474                                    tracing::info!(
475                                        old_rate_limit = ?old_rate_limit,
476                                        new_rate_limit = ?new_rate_limit,
477                                        upstream_table_id = upstream_table_id,
478                                        actor_id = self.actor_id,
479                                        "backfill rate limit changed",
480                                    );
481                                    // The builder is emptied above via `DataChunkBuilder::consume_all`.
482                                    assert!(
483                                        builder.is_empty(),
484                                        "builder should already be emptied"
485                                    );
486                                    builder = create_builder(
487                                        new_rate_limit,
488                                        self.chunk_size,
489                                        self.upstream_table.schema().data_types(),
490                                    );
491                                }
492                            }
493                        }
494                        _ => (),
495                    }
496                }
497
498                yield Message::Barrier(barrier);
499
500                if snapshot_read_complete {
501                    break 'backfill_loop;
502                }
503
504                // We will switch snapshot at the start of the next iteration of the backfill loop.
505            }
506        }
507
508        tracing::trace!("Backfill has finished, waiting for barrier");
509
510        // Wait for first barrier to come after backfill is finished.
511        // So we can update our progress + persist the status.
512        while let Some(Ok(msg)) = upstream.next().await {
513            if let Some(msg) = mapping_message(msg, &self.output_indices) {
514                // If not finished then we need to update state, otherwise no need.
515                if let Message::Barrier(barrier) = &msg {
516                    if is_finished {
517                        // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
518                        if let Some(table) = &mut self.state_table {
519                            table
520                                .commit_assert_no_update_vnode_bitmap(barrier.epoch)
521                                .await?;
522                        }
523                    } else {
524                        // If snapshot was empty, we do not need to backfill,
525                        // but we still need to persist the finished state.
526                        // We currently persist it on the second barrier here rather than first.
527                        // This is because we can't update state table in first epoch,
528                        // since it expects to have been initialized in previous epoch
529                        // (there's no epoch before the first epoch).
530                        if current_pos.is_none() {
531                            current_pos = Some(construct_initial_finished_state(pk_indices.len()))
532                        }
533
534                        // We will update current_pos at least once,
535                        // since snapshot read has to be non-empty,
536                        // Or snapshot was empty and we construct a placeholder state.
537                        debug_assert_ne!(current_pos, None);
538
539                        Self::persist_state(
540                            barrier.epoch,
541                            &mut self.state_table,
542                            true,
543                            &current_pos,
544                            total_snapshot_processed_rows,
545                            &mut old_state,
546                            &mut current_state,
547                        )
548                        .await?;
549                        tracing::trace!(
550                            epoch = ?barrier.epoch,
551                            ?current_pos,
552                            total_snapshot_processed_rows,
553                            "Backfill position persisted after completion"
554                        );
555                    }
556
557                    // For both backfill finished before recovery,
558                    // and backfill which just finished, we need to update mview tracker,
559                    // it does not persist this information.
560                    self.progress
561                        .finish(barrier.epoch, total_snapshot_processed_rows);
562                    tracing::trace!(
563                        epoch = ?barrier.epoch,
564                        "Updated CreateMaterializedTracker"
565                    );
566                    yield msg;
567                    break;
568                }
569                // Allow other messages to pass through.
570                // We won't yield twice here, since if there's a barrier,
571                // we will always break out of the loop.
572                yield msg;
573            }
574        }
575
576        tracing::trace!(
577            "Backfill has already finished and forward messages directly to the downstream"
578        );
579
580        // After progress finished + state persisted,
581        // we can forward messages directly to the downstream,
582        // as backfill is finished.
583        // We don't need to report backfill progress any longer, as it has finished.
584        // It will always be at 100%.
585        #[for_await]
586        for msg in upstream {
587            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
588                if let Message::Barrier(barrier) = &msg {
589                    // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
590                    if let Some(table) = &mut self.state_table {
591                        table
592                            .commit_assert_no_update_vnode_bitmap(barrier.epoch)
593                            .await?;
594                    }
595                }
596
597                yield msg;
598            }
599        }
600    }
601
602    async fn recover_backfill_state(
603        state_table: Option<&StateTable<S>>,
604        pk_len: usize,
605    ) -> StreamExecutorResult<BackfillState> {
606        let Some(state_table) = state_table else {
607            // If no state table, but backfill is present, it must be from an old cluster.
608            // In that case backfill must be finished, otherwise it won't have been persisted.
609            return Ok(BackfillState {
610                current_pos: None,
611                is_finished: true,
612                row_count: 0,
613                old_state: None,
614            });
615        };
616        let mut vnodes = state_table.vnodes().iter_vnodes_scalar();
617        let first_vnode = vnodes.next().unwrap();
618        let key: &[Datum] = &[Some(first_vnode.into())];
619        let row = state_table.get_row(key).await?;
620        let expected_state = Self::deserialize_backfill_state(row, pk_len);
621
622        // All vnode partitions should have same state (no scale-in supported).
623        for vnode in vnodes {
624            let key: &[Datum] = &[Some(vnode.into())];
625            let row = state_table.get_row(key).await?;
626            let state = Self::deserialize_backfill_state(row, pk_len);
627            assert_eq!(state.is_finished, expected_state.is_finished);
628        }
629        Ok(expected_state)
630    }
631
632    fn deserialize_backfill_state(row: Option<OwnedRow>, pk_len: usize) -> BackfillState {
633        let Some(row) = row else {
634            return BackfillState {
635                current_pos: None,
636                is_finished: false,
637                row_count: 0,
638                old_state: None,
639            };
640        };
641        let row = row.into_inner();
642        let mut old_state = vec![None; pk_len + METADATA_STATE_LEN];
643        old_state[1..row.len() + 1].clone_from_slice(&row);
644        let current_pos = Some((&row[0..pk_len]).into_owned_row());
645        let is_finished = row[pk_len].clone().is_some_and(|d| d.into_bool());
646        let row_count = row
647            .get(pk_len + 1)
648            .cloned()
649            .unwrap_or(None)
650            .map_or(0, |d| d.into_int64() as u64);
651        BackfillState {
652            current_pos,
653            is_finished,
654            row_count,
655            old_state: Some(old_state),
656        }
657    }
658
659    #[try_stream(ok = Option<OwnedRow>, error = StreamExecutorError)]
660    async fn make_snapshot_stream<'a>(
661        upstream_table: &'a BatchTable<S>,
662        epoch: u64,
663        current_pos: Option<OwnedRow>,
664        paused: bool,
665        rate_limiter: &'a MonitoredRateLimiter,
666    ) {
667        if paused {
668            #[for_await]
669            for _ in tokio_stream::pending() {
670                bail!("BUG: paused stream should not yield");
671            }
672        } else {
673            // Checked the rate limit is not zero.
674            #[for_await]
675            for r in
676                Self::snapshot_read(upstream_table, HummockReadEpoch::NoWait(epoch), current_pos)
677            {
678                rate_limiter.wait(1).await;
679                yield Some(r?);
680            }
681        }
682        yield None;
683    }
684
685    /// Snapshot read the upstream mv.
686    /// The rows from upstream snapshot read will be buffered inside the `builder`.
687    /// If snapshot is dropped before its rows are consumed,
688    /// remaining data in `builder` must be flushed manually.
689    /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
690    /// present, Then when we flush we contain duplicate rows.
691    #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
692    pub async fn snapshot_read(
693        upstream_table: &BatchTable<S>,
694        epoch: HummockReadEpoch,
695        current_pos: Option<OwnedRow>,
696    ) {
697        let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
698        let range_bounds = match range_bounds {
699            None => {
700                return Ok(());
701            }
702            Some(range_bounds) => range_bounds,
703        };
704
705        // We use uncommitted read here, because we have already scheduled the `BackfillExecutor`
706        // together with the upstream mv.
707        let row_iter = upstream_table
708            .batch_iter_with_pk_bounds(
709                epoch,
710                row::empty(),
711                range_bounds,
712                true,
713                // Here we only use small range prefetch because every barrier change, the executor will recreate a new iterator. So we do not need prefetch too much data.
714                PrefetchOptions::prefetch_for_small_range_scan(),
715            )
716            .await?;
717
718        #[for_await]
719        for row in row_iter {
720            yield row?;
721        }
722    }
723
724    async fn persist_state(
725        epoch: EpochPair,
726        table: &mut Option<StateTable<S>>,
727        is_finished: bool,
728        current_pos: &Option<OwnedRow>,
729        row_count: u64,
730        old_state: &mut Option<Vec<Datum>>,
731        current_state: &mut [Datum],
732    ) -> StreamExecutorResult<()> {
733        // Backwards compatibility with no state table in backfill.
734        let Some(table) = table else { return Ok(()) };
735        utils::persist_state(
736            epoch,
737            table,
738            is_finished,
739            current_pos,
740            row_count,
741            old_state,
742            current_state,
743        )
744        .await
745    }
746
747    /// 1. Converts from data chunk to stream chunk.
748    /// 2. Update the current position.
749    /// 3. Update Metrics
750    /// 4. Map the chunk according to output indices, return
751    ///    the stream chunk and do wrapping outside.
752    fn handle_snapshot_chunk(
753        data_chunk: DataChunk,
754        current_pos: &mut Option<OwnedRow>,
755        cur_barrier_snapshot_processed_rows: &mut u64,
756        total_snapshot_processed_rows: &mut u64,
757        pk_indices: &[usize],
758        output_indices: &[usize],
759    ) -> StreamChunk {
760        let ops = vec![Op::Insert; data_chunk.capacity()];
761        let chunk = StreamChunk::from_parts(ops, data_chunk);
762        // Raise the current position.
763        // As snapshot read streams are ordered by pk, so we can
764        // just use the last row to update `current_pos`.
765        *current_pos = Some(get_new_pos(&chunk, pk_indices));
766
767        let chunk_cardinality = chunk.cardinality() as u64;
768        *cur_barrier_snapshot_processed_rows += chunk_cardinality;
769        *total_snapshot_processed_rows += chunk_cardinality;
770
771        mapping_chunk(chunk, output_indices)
772    }
773}
774
775impl<S> Execute for BackfillExecutor<S>
776where
777    S: StateStore,
778{
779    fn execute(self: Box<Self>) -> BoxedMessageStream {
780        self.execute_inner().boxed()
781    }
782}