risingwave_stream/executor/backfill/
no_shuffle_backfill.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use futures::stream;
17use futures::stream::select_with_strategy;
18use risingwave_common::array::{DataChunk, Op};
19use risingwave_common::hash::VnodeBitmapExt;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::{bail, row};
22use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
23use risingwave_hummock_sdk::HummockReadEpoch;
24use risingwave_pb::common::ThrottleType;
25use risingwave_storage::store::PrefetchOptions;
26use risingwave_storage::table::batch_table::BatchTable;
27
28use crate::executor::backfill::utils;
29use crate::executor::backfill::utils::{
30    METADATA_STATE_LEN, compute_bounds, construct_initial_finished_state, create_builder,
31    get_new_pos, mapping_chunk, mapping_message, mark_chunk,
32};
33use crate::executor::prelude::*;
34use crate::task::{CreateMviewProgressReporter, FragmentId};
35
36/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
37/// We can decode that into `BackfillState` on recovery.
38#[derive(Debug, Eq, PartialEq)]
39pub struct BackfillState {
40    current_pos: Option<OwnedRow>,
41    old_state: Option<Vec<Datum>>,
42    is_finished: bool,
43    row_count: u64,
44}
45
46/// An implementation of the [RFC: Use Backfill To Let Mv On Mv Stream Again](https://github.com/risingwavelabs/rfcs/pull/13).
47/// `BackfillExecutor` is used to create a materialized view on another materialized view.
48///
49/// It can only buffer chunks between two barriers instead of unbundled memory usage of
50/// `RearrangedChainExecutor`.
51///
52/// It uses the latest epoch to read the snapshot of the upstream mv during two barriers and all the
53/// `StreamChunk` of the snapshot read will forward to the downstream.
54///
55/// It uses `current_pos` to record the progress of the backfill (the pk of the upstream mv) and
56/// `current_pos` is initiated as an empty `Row`.
57///
58/// All upstream messages during the two barriers interval will be buffered and decide to forward or
59/// ignore based on the `current_pos` at the end of the later barrier. Once `current_pos` reaches
60/// the end of the upstream mv pk, the backfill would finish.
61///
62/// Notice:
63/// The pk we are talking about here refers to the storage primary key.
64/// We rely on the scheduler to schedule the `BackfillExecutor` together with the upstream mv/table
65/// in the same worker, so that we can read uncommitted data from the upstream table without
66/// waiting.
67pub struct BackfillExecutor<S: StateStore> {
68    /// Upstream table
69    upstream_table: BatchTable<S>,
70    /// Upstream with the same schema with the upstream table.
71    upstream: Executor,
72
73    /// Internal state table for persisting state of backfill state.
74    state_table: Option<StateTable<S>>,
75
76    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
77    output_indices: Vec<usize>,
78
79    /// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it.
80    progress: CreateMviewProgressReporter,
81
82    actor_id: ActorId,
83
84    metrics: Arc<StreamingMetrics>,
85
86    chunk_size: usize,
87
88    rate_limiter: MonitoredRateLimiter,
89
90    /// Fragment id of the fragment this backfill node belongs to.
91    fragment_id: FragmentId,
92}
93
94impl<S> BackfillExecutor<S>
95where
96    S: StateStore,
97{
98    #[allow(clippy::too_many_arguments)]
99    pub fn new(
100        upstream_table: BatchTable<S>,
101        upstream: Executor,
102        state_table: Option<StateTable<S>>,
103        output_indices: Vec<usize>,
104        progress: CreateMviewProgressReporter,
105        metrics: Arc<StreamingMetrics>,
106        chunk_size: usize,
107        rate_limit: RateLimit,
108        fragment_id: FragmentId,
109    ) -> Self {
110        let actor_id = progress.actor_id();
111        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
112        Self {
113            upstream_table,
114            upstream,
115            state_table,
116            output_indices,
117            progress,
118            actor_id,
119            metrics,
120            chunk_size,
121            rate_limiter,
122            fragment_id,
123        }
124    }
125
126    #[try_stream(ok = Message, error = StreamExecutorError)]
127    async fn execute_inner(mut self) {
128        // The primary key columns.
129        // We receive a pruned chunk from the upstream table,
130        // which will only contain output columns of the scan on the upstream table.
131        // The pk indices specify the pk columns of the pruned chunk.
132        let pk_indices = self.upstream_table.pk_in_output_indices().unwrap();
133
134        let state_len = pk_indices.len() + METADATA_STATE_LEN;
135
136        let pk_order = self.upstream_table.pk_serializer().get_order_types();
137
138        let upstream_table_id = self.upstream_table.table_id();
139
140        let mut upstream = self.upstream.execute();
141
142        // Poll the upstream to get the first barrier.
143        let first_barrier = expect_first_barrier(&mut upstream).await?;
144        let mut global_pause = first_barrier.is_pause_on_startup();
145        let mut backfill_paused = first_barrier.is_backfill_pause_on_startup(self.fragment_id);
146        let first_epoch = first_barrier.epoch;
147        let init_epoch = first_barrier.epoch.prev;
148        // The first barrier message should be propagated.
149        yield Message::Barrier(first_barrier);
150
151        if let Some(state_table) = self.state_table.as_mut() {
152            state_table.init_epoch(first_epoch).await?;
153        }
154
155        let BackfillState {
156            mut current_pos,
157            is_finished,
158            row_count,
159            mut old_state,
160        } = Self::recover_backfill_state(self.state_table.as_ref(), pk_indices.len()).await?;
161        tracing::trace!(is_finished, row_count, "backfill state recovered");
162
163        let data_types = self.upstream_table.schema().data_types();
164
165        // Chunk builder will be instantiated with min(rate_limit, self.chunk_size) as the chunk's max size.
166        let mut builder = create_builder(
167            self.rate_limiter.rate_limit(),
168            self.chunk_size,
169            data_types.clone(),
170        );
171
172        // Use this buffer to construct state,
173        // which will then be persisted.
174        let mut current_state: Vec<Datum> = vec![None; state_len];
175
176        // If no need backfill, but state was still "unfinished" we need to finish it.
177        // So we just update the state + progress to meta at the next barrier to finish progress,
178        // and forward other messages.
179        //
180        // Reason for persisting on second barrier rather than first:
181        // We can't update meta with progress as finished until state_table
182        // has been updated.
183        // We also can't update state_table in first epoch, since state_table
184        // expects to have been initialized in previous epoch.
185
186        // The epoch used to snapshot read upstream mv.
187        let mut snapshot_read_epoch = init_epoch;
188
189        // Keep track of rows from the snapshot.
190        let mut total_snapshot_processed_rows: u64 = row_count;
191
192        // Backfill Algorithm:
193        //
194        //   backfill_stream
195        //  /               \
196        // upstream       snapshot
197        //
198        // We construct a backfill stream with upstream as its left input and mv snapshot read
199        // stream as its right input. When a chunk comes from upstream, we will buffer it.
200        //
201        // When a barrier comes from upstream:
202        //  - Update the `snapshot_read_epoch`.
203        //  - For each row of the upstream chunk buffer, forward it to downstream if its pk <=
204        //    `current_pos`, otherwise ignore it.
205        //  - reconstruct the whole backfill stream with upstream and new mv snapshot read stream
206        //    with the `snapshot_read_epoch`.
207        //
208        // When a chunk comes from snapshot, we forward it to the downstream and raise
209        // `current_pos`.
210        //
211        // When we reach the end of the snapshot read stream, it means backfill has been
212        // finished.
213        //
214        // Once the backfill loop ends, we forward the upstream directly to the downstream.
215        if !is_finished {
216            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
217            let mut pending_barrier: Option<Barrier> = None;
218
219            let metrics = self
220                .metrics
221                .new_backfill_metrics(upstream_table_id, self.actor_id);
222
223            'backfill_loop: loop {
224                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
225                let mut cur_barrier_upstream_processed_rows: u64 = 0;
226                let mut snapshot_read_complete = false;
227                let mut has_snapshot_read = false;
228
229                // We should not buffer rows from previous epoch, else we can have duplicates.
230                assert!(upstream_chunk_buffer.is_empty());
231
232                {
233                    let left_upstream = upstream.by_ref().map(Either::Left);
234                    let paused = global_pause
235                        || backfill_paused
236                        || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
237                    let right_snapshot = pin!(
238                        Self::make_snapshot_stream(
239                            &self.upstream_table,
240                            snapshot_read_epoch,
241                            current_pos.clone(),
242                            paused,
243                            &self.rate_limiter,
244                        )
245                        .map(Either::Right)
246                    );
247
248                    // Prefer to select upstream, so we can stop snapshot stream as soon as the
249                    // barrier comes.
250                    let mut backfill_stream =
251                        select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
252                            stream::PollNext::Left
253                        });
254
255                    #[for_await]
256                    for either in &mut backfill_stream {
257                        match either {
258                            // Upstream
259                            Either::Left(msg) => {
260                                match msg? {
261                                    Message::Barrier(barrier) => {
262                                        // We have to process barrier outside of the loop.
263                                        // This is because the backfill stream holds a mutable
264                                        // reference to our chunk builder.
265                                        // We want to create another mutable reference
266                                        // to flush remaining chunks from the chunk builder
267                                        // on barrier.
268                                        // Hence we break here and process it after this block.
269                                        pending_barrier = Some(barrier);
270                                        break;
271                                    }
272                                    Message::Chunk(chunk) => {
273                                        // Buffer the upstream chunk.
274                                        upstream_chunk_buffer.push(chunk.compact_vis());
275                                    }
276                                    Message::Watermark(_) => {
277                                        // Ignore watermark during backfill.
278                                    }
279                                }
280                            }
281                            // Snapshot read
282                            Either::Right(msg) => {
283                                has_snapshot_read = true;
284                                match msg? {
285                                    None => {
286                                        // Consume remaining rows in the builder.
287                                        if let Some(data_chunk) = builder.consume_all() {
288                                            yield Message::Chunk(Self::handle_snapshot_chunk(
289                                                data_chunk,
290                                                &mut current_pos,
291                                                &mut cur_barrier_snapshot_processed_rows,
292                                                &mut total_snapshot_processed_rows,
293                                                &pk_indices,
294                                                &self.output_indices,
295                                            ));
296                                        }
297
298                                        // End of the snapshot read stream.
299                                        // We should not mark the chunk anymore,
300                                        // otherwise, we will ignore some rows
301                                        // in the buffer. Here we choose to never mark the chunk.
302                                        // Consume with the renaming stream buffer chunk without
303                                        // mark.
304                                        for chunk in upstream_chunk_buffer.drain(..) {
305                                            let chunk_cardinality = chunk.cardinality() as u64;
306                                            cur_barrier_upstream_processed_rows +=
307                                                chunk_cardinality;
308                                            yield Message::Chunk(mapping_chunk(
309                                                chunk,
310                                                &self.output_indices,
311                                            ));
312                                        }
313                                        metrics
314                                            .backfill_snapshot_read_row_count
315                                            .inc_by(cur_barrier_snapshot_processed_rows);
316                                        metrics
317                                            .backfill_upstream_output_row_count
318                                            .inc_by(cur_barrier_upstream_processed_rows);
319                                        break 'backfill_loop;
320                                    }
321                                    Some(record) => {
322                                        // Buffer the snapshot read row.
323                                        if let Some(data_chunk) = builder.append_one_row(record) {
324                                            yield Message::Chunk(Self::handle_snapshot_chunk(
325                                                data_chunk,
326                                                &mut current_pos,
327                                                &mut cur_barrier_snapshot_processed_rows,
328                                                &mut total_snapshot_processed_rows,
329                                                &pk_indices,
330                                                &self.output_indices,
331                                            ));
332                                        }
333                                    }
334                                }
335                            }
336                        }
337                    }
338
339                    // Before processing barrier, if did not snapshot read,
340                    // do a snapshot read first.
341                    // This is so we don't lose the tombstone iteration progress.
342                    // If paused, we also can't read any snapshot records.
343                    if !has_snapshot_read && !paused {
344                        assert!(
345                            builder.is_empty(),
346                            "Builder should be empty if no snapshot read"
347                        );
348                        let (_, snapshot) = backfill_stream.into_inner();
349                        #[for_await]
350                        for msg in snapshot {
351                            let Either::Right(msg) = msg else {
352                                bail!("BUG: snapshot_read contains upstream messages");
353                            };
354                            match msg? {
355                                None => {
356                                    // End of the snapshot read stream.
357                                    // We let the barrier handling logic take care of upstream updates.
358                                    // But we still want to exit backfill loop, so we mark snapshot read complete.
359                                    snapshot_read_complete = true;
360                                    break;
361                                }
362                                Some(row) => {
363                                    let chunk = DataChunk::from_rows(&[row], &data_types);
364                                    yield Message::Chunk(Self::handle_snapshot_chunk(
365                                        chunk,
366                                        &mut current_pos,
367                                        &mut cur_barrier_snapshot_processed_rows,
368                                        &mut total_snapshot_processed_rows,
369                                        &pk_indices,
370                                        &self.output_indices,
371                                    ));
372                                    break;
373                                }
374                            }
375                        }
376                    }
377                }
378                // When we break out of inner backfill_stream loop, it means we have a barrier.
379                // If there are no updates and there are no snapshots left,
380                // we already finished backfill and should have exited the outer backfill loop.
381                let barrier = match pending_barrier.take() {
382                    Some(barrier) => barrier,
383                    None => bail!("BUG: current_backfill loop exited without a barrier"),
384                };
385
386                // Process barrier:
387                // - consume snapshot rows left in builder
388                // - consume upstream buffer chunk
389                // - switch snapshot
390
391                // Consume snapshot rows left in builder
392                let chunk = builder.consume_all();
393                if let Some(chunk) = chunk {
394                    yield Message::Chunk(Self::handle_snapshot_chunk(
395                        chunk,
396                        &mut current_pos,
397                        &mut cur_barrier_snapshot_processed_rows,
398                        &mut total_snapshot_processed_rows,
399                        &pk_indices,
400                        &self.output_indices,
401                    ));
402                }
403
404                // Consume upstream buffer chunk
405                // If no current_pos, means we did not process any snapshot
406                // yet. In that case
407                // we can just ignore the upstream buffer chunk, but still need to clean it.
408                if let Some(current_pos) = &current_pos {
409                    for chunk in upstream_chunk_buffer.drain(..) {
410                        cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
411                        yield Message::Chunk(mapping_chunk(
412                            mark_chunk(chunk, current_pos, &pk_indices, pk_order),
413                            &self.output_indices,
414                        ));
415                    }
416                } else {
417                    upstream_chunk_buffer.clear()
418                }
419
420                metrics
421                    .backfill_snapshot_read_row_count
422                    .inc_by(cur_barrier_snapshot_processed_rows);
423                metrics
424                    .backfill_upstream_output_row_count
425                    .inc_by(cur_barrier_upstream_processed_rows);
426
427                // Update snapshot read epoch.
428                snapshot_read_epoch = barrier.epoch.prev;
429
430                self.progress.update(
431                    barrier.epoch,
432                    snapshot_read_epoch,
433                    total_snapshot_processed_rows,
434                );
435
436                // Persist state on barrier
437                Self::persist_state(
438                    barrier.epoch,
439                    &mut self.state_table,
440                    false,
441                    &current_pos,
442                    total_snapshot_processed_rows,
443                    &mut old_state,
444                    &mut current_state,
445                )
446                .await?;
447
448                tracing::trace!(
449                    epoch = ?barrier.epoch,
450                    ?current_pos,
451                    total_snapshot_processed_rows,
452                    "Backfill state persisted"
453                );
454
455                // Update snapshot read chunk builder.
456                if let Some(mutation) = barrier.mutation.as_deref() {
457                    match mutation {
458                        Mutation::Pause => {
459                            global_pause = true;
460                        }
461                        Mutation::Resume => {
462                            global_pause = false;
463                        }
464                        Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
465                            if fragment_ids.contains(&self.fragment_id) {
466                                backfill_paused = false;
467                            }
468                        }
469                        Mutation::Throttle(fragment_to_apply) => {
470                            if let Some(entry) = fragment_to_apply.get(&self.fragment_id)
471                                && entry.throttle_type() == ThrottleType::Backfill
472                            {
473                                let new_rate_limit = entry.rate_limit.into();
474                                let old_rate_limit = self.rate_limiter.update(new_rate_limit);
475                                if old_rate_limit != new_rate_limit {
476                                    tracing::info!(
477                                        old_rate_limit = ?old_rate_limit,
478                                        new_rate_limit = ?new_rate_limit,
479                                        %upstream_table_id,
480                                        actor_id = %self.actor_id,
481                                        "backfill rate limit changed",
482                                    );
483                                    // The builder is emptied above via `DataChunkBuilder::consume_all`.
484                                    assert!(
485                                        builder.is_empty(),
486                                        "builder should already be emptied"
487                                    );
488                                    builder = create_builder(
489                                        new_rate_limit,
490                                        self.chunk_size,
491                                        self.upstream_table.schema().data_types(),
492                                    );
493                                }
494                            }
495                        }
496                        _ => (),
497                    }
498                }
499
500                yield Message::Barrier(barrier);
501
502                if snapshot_read_complete {
503                    break 'backfill_loop;
504                }
505
506                // We will switch snapshot at the start of the next iteration of the backfill loop.
507            }
508        }
509
510        tracing::trace!("Backfill has finished, waiting for barrier");
511
512        // Wait for first barrier to come after backfill is finished.
513        // So we can update our progress + persist the status.
514        while let Some(Ok(msg)) = upstream.next().await {
515            if let Some(msg) = mapping_message(msg, &self.output_indices) {
516                // If not finished then we need to update state, otherwise no need.
517                if let Message::Barrier(barrier) = &msg {
518                    if is_finished {
519                        // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
520                        if let Some(table) = &mut self.state_table {
521                            table
522                                .commit_assert_no_update_vnode_bitmap(barrier.epoch)
523                                .await?;
524                        }
525                    } else {
526                        // If snapshot was empty, we do not need to backfill,
527                        // but we still need to persist the finished state.
528                        // We currently persist it on the second barrier here rather than first.
529                        // This is because we can't update state table in first epoch,
530                        // since it expects to have been initialized in previous epoch
531                        // (there's no epoch before the first epoch).
532                        if current_pos.is_none() {
533                            current_pos = Some(construct_initial_finished_state(pk_indices.len()))
534                        }
535
536                        // We will update current_pos at least once,
537                        // since snapshot read has to be non-empty,
538                        // Or snapshot was empty and we construct a placeholder state.
539                        debug_assert_ne!(current_pos, None);
540
541                        Self::persist_state(
542                            barrier.epoch,
543                            &mut self.state_table,
544                            true,
545                            &current_pos,
546                            total_snapshot_processed_rows,
547                            &mut old_state,
548                            &mut current_state,
549                        )
550                        .await?;
551                        tracing::trace!(
552                            epoch = ?barrier.epoch,
553                            ?current_pos,
554                            total_snapshot_processed_rows,
555                            "Backfill position persisted after completion"
556                        );
557                    }
558
559                    // For both backfill finished before recovery,
560                    // and backfill which just finished, we need to update mview tracker,
561                    // it does not persist this information.
562                    self.progress
563                        .finish(barrier.epoch, total_snapshot_processed_rows);
564                    tracing::trace!(
565                        epoch = ?barrier.epoch,
566                        "Updated CreateMaterializedTracker"
567                    );
568                    yield msg;
569                    break;
570                }
571                // Allow other messages to pass through.
572                // We won't yield twice here, since if there's a barrier,
573                // we will always break out of the loop.
574                yield msg;
575            }
576        }
577
578        tracing::trace!(
579            "Backfill has already finished and forward messages directly to the downstream"
580        );
581
582        // After progress finished + state persisted,
583        // we can forward messages directly to the downstream,
584        // as backfill is finished.
585        // We don't need to report backfill progress any longer, as it has finished.
586        // It will always be at 100%.
587        #[for_await]
588        for msg in upstream {
589            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
590                if let Message::Barrier(barrier) = &msg {
591                    // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
592                    if let Some(table) = &mut self.state_table {
593                        table
594                            .commit_assert_no_update_vnode_bitmap(barrier.epoch)
595                            .await?;
596                    }
597                }
598
599                yield msg;
600            }
601        }
602    }
603
604    async fn recover_backfill_state(
605        state_table: Option<&StateTable<S>>,
606        pk_len: usize,
607    ) -> StreamExecutorResult<BackfillState> {
608        let Some(state_table) = state_table else {
609            // If no state table, but backfill is present, it must be from an old cluster.
610            // In that case backfill must be finished, otherwise it won't have been persisted.
611            return Ok(BackfillState {
612                current_pos: None,
613                is_finished: true,
614                row_count: 0,
615                old_state: None,
616            });
617        };
618        let mut vnodes = state_table.vnodes().iter_vnodes_scalar();
619        let first_vnode = vnodes.next().unwrap();
620        let key: &[Datum] = &[Some(first_vnode.into())];
621        let row = state_table.get_row(key).await?;
622        let expected_state = Self::deserialize_backfill_state(row, pk_len);
623
624        // All vnode partitions should have same state (no scale-in supported).
625        for vnode in vnodes {
626            let key: &[Datum] = &[Some(vnode.into())];
627            let row = state_table.get_row(key).await?;
628            let state = Self::deserialize_backfill_state(row, pk_len);
629            assert_eq!(state.is_finished, expected_state.is_finished);
630        }
631        Ok(expected_state)
632    }
633
634    fn deserialize_backfill_state(row: Option<OwnedRow>, pk_len: usize) -> BackfillState {
635        let Some(row) = row else {
636            return BackfillState {
637                current_pos: None,
638                is_finished: false,
639                row_count: 0,
640                old_state: None,
641            };
642        };
643        let row = row.into_inner();
644        let mut old_state = vec![None; pk_len + METADATA_STATE_LEN];
645        old_state[1..row.len() + 1].clone_from_slice(&row);
646        let current_pos = Some((&row[0..pk_len]).into_owned_row());
647        let is_finished = row[pk_len].clone().is_some_and(|d| d.into_bool());
648        let row_count = row
649            .get(pk_len + 1)
650            .cloned()
651            .unwrap_or(None)
652            .map_or(0, |d| d.into_int64() as u64);
653        BackfillState {
654            current_pos,
655            is_finished,
656            row_count,
657            old_state: Some(old_state),
658        }
659    }
660
661    #[try_stream(ok = Option<OwnedRow>, error = StreamExecutorError)]
662    async fn make_snapshot_stream<'a>(
663        upstream_table: &'a BatchTable<S>,
664        epoch: u64,
665        current_pos: Option<OwnedRow>,
666        paused: bool,
667        rate_limiter: &'a MonitoredRateLimiter,
668    ) {
669        if paused {
670            #[for_await]
671            for _ in tokio_stream::pending() {
672                bail!("BUG: paused stream should not yield");
673            }
674        } else {
675            // Checked the rate limit is not zero.
676            #[for_await]
677            for r in
678                Self::snapshot_read(upstream_table, HummockReadEpoch::NoWait(epoch), current_pos)
679            {
680                rate_limiter.wait(1).await;
681                yield Some(r?);
682            }
683        }
684        yield None;
685    }
686
687    /// Snapshot read the upstream mv.
688    /// The rows from upstream snapshot read will be buffered inside the `builder`.
689    /// If snapshot is dropped before its rows are consumed,
690    /// remaining data in `builder` must be flushed manually.
691    /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
692    /// present, Then when we flush we contain duplicate rows.
693    #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
694    pub async fn snapshot_read(
695        upstream_table: &BatchTable<S>,
696        epoch: HummockReadEpoch,
697        current_pos: Option<OwnedRow>,
698    ) {
699        let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
700        let range_bounds = match range_bounds {
701            None => {
702                return Ok(());
703            }
704            Some(range_bounds) => range_bounds,
705        };
706
707        // We use uncommitted read here, because we have already scheduled the `BackfillExecutor`
708        // together with the upstream mv.
709        let row_iter = upstream_table
710            .batch_iter_with_pk_bounds(
711                epoch,
712                row::empty(),
713                range_bounds,
714                true,
715                // Here we only use small range prefetch because every barrier change, the executor will recreate a new iterator. So we do not need prefetch too much data.
716                PrefetchOptions::prefetch_for_small_range_scan(),
717            )
718            .await?;
719
720        #[for_await]
721        for row in row_iter {
722            yield row?;
723        }
724    }
725
726    async fn persist_state(
727        epoch: EpochPair,
728        table: &mut Option<StateTable<S>>,
729        is_finished: bool,
730        current_pos: &Option<OwnedRow>,
731        row_count: u64,
732        old_state: &mut Option<Vec<Datum>>,
733        current_state: &mut [Datum],
734    ) -> StreamExecutorResult<()> {
735        // Backwards compatibility with no state table in backfill.
736        let Some(table) = table else { return Ok(()) };
737        utils::persist_state(
738            epoch,
739            table,
740            is_finished,
741            current_pos,
742            row_count,
743            old_state,
744            current_state,
745        )
746        .await
747    }
748
749    /// 1. Converts from data chunk to stream chunk.
750    /// 2. Update the current position.
751    /// 3. Update Metrics
752    /// 4. Map the chunk according to output indices, return
753    ///    the stream chunk and do wrapping outside.
754    fn handle_snapshot_chunk(
755        data_chunk: DataChunk,
756        current_pos: &mut Option<OwnedRow>,
757        cur_barrier_snapshot_processed_rows: &mut u64,
758        total_snapshot_processed_rows: &mut u64,
759        pk_indices: &[usize],
760        output_indices: &[usize],
761    ) -> StreamChunk {
762        let ops = vec![Op::Insert; data_chunk.capacity()];
763        let chunk = StreamChunk::from_parts(ops, data_chunk);
764        // Raise the current position.
765        // As snapshot read streams are ordered by pk, so we can
766        // just use the last row to update `current_pos`.
767        *current_pos = Some(get_new_pos(&chunk, pk_indices));
768
769        let chunk_cardinality = chunk.cardinality() as u64;
770        *cur_barrier_snapshot_processed_rows += chunk_cardinality;
771        *total_snapshot_processed_rows += chunk_cardinality;
772
773        mapping_chunk(chunk, output_indices)
774    }
775}
776
777impl<S> Execute for BackfillExecutor<S>
778where
779    S: StateStore,
780{
781    fn execute(self: Box<Self>) -> BoxedMessageStream {
782        self.execute_inner().boxed()
783    }
784}