risingwave_stream/executor/backfill/
no_shuffle_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use futures::stream;
17use futures::stream::select_with_strategy;
18use risingwave_common::array::{DataChunk, Op};
19use risingwave_common::hash::VnodeBitmapExt;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::{bail, row};
22use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
23use risingwave_hummock_sdk::HummockReadEpoch;
24use risingwave_storage::store::PrefetchOptions;
25use risingwave_storage::table::batch_table::BatchTable;
26
27use crate::executor::backfill::utils;
28use crate::executor::backfill::utils::{
29    METADATA_STATE_LEN, compute_bounds, construct_initial_finished_state, create_builder,
30    get_new_pos, mapping_chunk, mapping_message, mark_chunk,
31};
32use crate::executor::prelude::*;
33use crate::task::CreateMviewProgressReporter;
34
35/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
36/// We can decode that into `BackfillState` on recovery.
37#[derive(Debug, Eq, PartialEq)]
38pub struct BackfillState {
39    current_pos: Option<OwnedRow>,
40    old_state: Option<Vec<Datum>>,
41    is_finished: bool,
42    row_count: u64,
43}
44
45/// An implementation of the [RFC: Use Backfill To Let Mv On Mv Stream Again](https://github.com/risingwavelabs/rfcs/pull/13).
46/// `BackfillExecutor` is used to create a materialized view on another materialized view.
47///
48/// It can only buffer chunks between two barriers instead of unbundled memory usage of
49/// `RearrangedChainExecutor`.
50///
51/// It uses the latest epoch to read the snapshot of the upstream mv during two barriers and all the
52/// `StreamChunk` of the snapshot read will forward to the downstream.
53///
54/// It uses `current_pos` to record the progress of the backfill (the pk of the upstream mv) and
55/// `current_pos` is initiated as an empty `Row`.
56///
57/// All upstream messages during the two barriers interval will be buffered and decide to forward or
58/// ignore based on the `current_pos` at the end of the later barrier. Once `current_pos` reaches
59/// the end of the upstream mv pk, the backfill would finish.
60///
61/// Notice:
62/// The pk we are talking about here refers to the storage primary key.
63/// We rely on the scheduler to schedule the `BackfillExecutor` together with the upstream mv/table
64/// in the same worker, so that we can read uncommitted data from the upstream table without
65/// waiting.
66pub struct BackfillExecutor<S: StateStore> {
67    /// Upstream table
68    upstream_table: BatchTable<S>,
69    /// Upstream with the same schema with the upstream table.
70    upstream: Executor,
71
72    /// Internal state table for persisting state of backfill state.
73    state_table: Option<StateTable<S>>,
74
75    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
76    output_indices: Vec<usize>,
77
78    /// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it.
79    progress: CreateMviewProgressReporter,
80
81    actor_id: ActorId,
82
83    metrics: Arc<StreamingMetrics>,
84
85    chunk_size: usize,
86
87    rate_limiter: MonitoredRateLimiter,
88}
89
90impl<S> BackfillExecutor<S>
91where
92    S: StateStore,
93{
94    #[allow(clippy::too_many_arguments)]
95    pub fn new(
96        upstream_table: BatchTable<S>,
97        upstream: Executor,
98        state_table: Option<StateTable<S>>,
99        output_indices: Vec<usize>,
100        progress: CreateMviewProgressReporter,
101        metrics: Arc<StreamingMetrics>,
102        chunk_size: usize,
103        rate_limit: RateLimit,
104    ) -> Self {
105        let actor_id = progress.actor_id();
106        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
107        Self {
108            upstream_table,
109            upstream,
110            state_table,
111            output_indices,
112            progress,
113            actor_id,
114            metrics,
115            chunk_size,
116            rate_limiter,
117        }
118    }
119
120    #[try_stream(ok = Message, error = StreamExecutorError)]
121    async fn execute_inner(mut self) {
122        // The primary key columns.
123        // We receive a pruned chunk from the upstream table,
124        // which will only contain output columns of the scan on the upstream table.
125        // The pk indices specify the pk columns of the pruned chunk.
126        let pk_indices = self.upstream_table.pk_in_output_indices().unwrap();
127
128        let state_len = pk_indices.len() + METADATA_STATE_LEN;
129
130        let pk_order = self.upstream_table.pk_serializer().get_order_types();
131
132        let upstream_table_id = self.upstream_table.table_id().table_id;
133
134        let mut upstream = self.upstream.execute();
135
136        // Poll the upstream to get the first barrier.
137        let first_barrier = expect_first_barrier(&mut upstream).await?;
138        let mut paused = first_barrier.is_pause_on_startup();
139        let first_epoch = first_barrier.epoch;
140        let init_epoch = first_barrier.epoch.prev;
141        // The first barrier message should be propagated.
142        yield Message::Barrier(first_barrier);
143
144        if let Some(state_table) = self.state_table.as_mut() {
145            state_table.init_epoch(first_epoch).await?;
146        }
147
148        let BackfillState {
149            mut current_pos,
150            is_finished,
151            row_count,
152            mut old_state,
153        } = Self::recover_backfill_state(self.state_table.as_ref(), pk_indices.len()).await?;
154        tracing::trace!(is_finished, row_count, "backfill state recovered");
155
156        let data_types = self.upstream_table.schema().data_types();
157
158        // Chunk builder will be instantiated with min(rate_limit, self.chunk_size) as the chunk's max size.
159        let mut builder = create_builder(
160            self.rate_limiter.rate_limit(),
161            self.chunk_size,
162            data_types.clone(),
163        );
164
165        // Use this buffer to construct state,
166        // which will then be persisted.
167        let mut current_state: Vec<Datum> = vec![None; state_len];
168
169        // If no need backfill, but state was still "unfinished" we need to finish it.
170        // So we just update the state + progress to meta at the next barrier to finish progress,
171        // and forward other messages.
172        //
173        // Reason for persisting on second barrier rather than first:
174        // We can't update meta with progress as finished until state_table
175        // has been updated.
176        // We also can't update state_table in first epoch, since state_table
177        // expects to have been initialized in previous epoch.
178
179        // The epoch used to snapshot read upstream mv.
180        let mut snapshot_read_epoch = init_epoch;
181
182        // Keep track of rows from the snapshot.
183        let mut total_snapshot_processed_rows: u64 = row_count;
184
185        // Backfill Algorithm:
186        //
187        //   backfill_stream
188        //  /               \
189        // upstream       snapshot
190        //
191        // We construct a backfill stream with upstream as its left input and mv snapshot read
192        // stream as its right input. When a chunk comes from upstream, we will buffer it.
193        //
194        // When a barrier comes from upstream:
195        //  - Update the `snapshot_read_epoch`.
196        //  - For each row of the upstream chunk buffer, forward it to downstream if its pk <=
197        //    `current_pos`, otherwise ignore it.
198        //  - reconstruct the whole backfill stream with upstream and new mv snapshot read stream
199        //    with the `snapshot_read_epoch`.
200        //
201        // When a chunk comes from snapshot, we forward it to the downstream and raise
202        // `current_pos`.
203        //
204        // When we reach the end of the snapshot read stream, it means backfill has been
205        // finished.
206        //
207        // Once the backfill loop ends, we forward the upstream directly to the downstream.
208        if !is_finished {
209            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
210            let mut pending_barrier: Option<Barrier> = None;
211
212            let metrics = self
213                .metrics
214                .new_backfill_metrics(upstream_table_id, self.actor_id);
215
216            'backfill_loop: loop {
217                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
218                let mut cur_barrier_upstream_processed_rows: u64 = 0;
219                let mut snapshot_read_complete = false;
220                let mut has_snapshot_read = false;
221
222                // We should not buffer rows from previous epoch, else we can have duplicates.
223                assert!(upstream_chunk_buffer.is_empty());
224
225                {
226                    let left_upstream = upstream.by_ref().map(Either::Left);
227                    let paused =
228                        paused || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
229                    let right_snapshot = pin!(
230                        Self::make_snapshot_stream(
231                            &self.upstream_table,
232                            snapshot_read_epoch,
233                            current_pos.clone(),
234                            paused,
235                            &self.rate_limiter,
236                        )
237                        .map(Either::Right)
238                    );
239
240                    // Prefer to select upstream, so we can stop snapshot stream as soon as the
241                    // barrier comes.
242                    let mut backfill_stream =
243                        select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
244                            stream::PollNext::Left
245                        });
246
247                    #[for_await]
248                    for either in &mut backfill_stream {
249                        match either {
250                            // Upstream
251                            Either::Left(msg) => {
252                                match msg? {
253                                    Message::Barrier(barrier) => {
254                                        // We have to process barrier outside of the loop.
255                                        // This is because the backfill stream holds a mutable
256                                        // reference to our chunk builder.
257                                        // We want to create another mutable reference
258                                        // to flush remaining chunks from the chunk builder
259                                        // on barrier.
260                                        // Hence we break here and process it after this block.
261                                        pending_barrier = Some(barrier);
262                                        break;
263                                    }
264                                    Message::Chunk(chunk) => {
265                                        // Buffer the upstream chunk.
266                                        upstream_chunk_buffer.push(chunk.compact());
267                                    }
268                                    Message::Watermark(_) => {
269                                        // Ignore watermark during backfill.
270                                    }
271                                }
272                            }
273                            // Snapshot read
274                            Either::Right(msg) => {
275                                has_snapshot_read = true;
276                                match msg? {
277                                    None => {
278                                        // Consume remaining rows in the builder.
279                                        if let Some(data_chunk) = builder.consume_all() {
280                                            yield Message::Chunk(Self::handle_snapshot_chunk(
281                                                data_chunk,
282                                                &mut current_pos,
283                                                &mut cur_barrier_snapshot_processed_rows,
284                                                &mut total_snapshot_processed_rows,
285                                                &pk_indices,
286                                                &self.output_indices,
287                                            ));
288                                        }
289
290                                        // End of the snapshot read stream.
291                                        // We should not mark the chunk anymore,
292                                        // otherwise, we will ignore some rows
293                                        // in the buffer. Here we choose to never mark the chunk.
294                                        // Consume with the renaming stream buffer chunk without
295                                        // mark.
296                                        for chunk in upstream_chunk_buffer.drain(..) {
297                                            let chunk_cardinality = chunk.cardinality() as u64;
298                                            cur_barrier_upstream_processed_rows +=
299                                                chunk_cardinality;
300                                            yield Message::Chunk(mapping_chunk(
301                                                chunk,
302                                                &self.output_indices,
303                                            ));
304                                        }
305                                        metrics
306                                            .backfill_snapshot_read_row_count
307                                            .inc_by(cur_barrier_snapshot_processed_rows);
308                                        metrics
309                                            .backfill_upstream_output_row_count
310                                            .inc_by(cur_barrier_upstream_processed_rows);
311                                        break 'backfill_loop;
312                                    }
313                                    Some(record) => {
314                                        // Buffer the snapshot read row.
315                                        if let Some(data_chunk) = builder.append_one_row(record) {
316                                            yield Message::Chunk(Self::handle_snapshot_chunk(
317                                                data_chunk,
318                                                &mut current_pos,
319                                                &mut cur_barrier_snapshot_processed_rows,
320                                                &mut total_snapshot_processed_rows,
321                                                &pk_indices,
322                                                &self.output_indices,
323                                            ));
324                                        }
325                                    }
326                                }
327                            }
328                        }
329                    }
330
331                    // Before processing barrier, if did not snapshot read,
332                    // do a snapshot read first.
333                    // This is so we don't lose the tombstone iteration progress.
334                    // If paused, we also can't read any snapshot records.
335                    if !has_snapshot_read && !paused {
336                        assert!(
337                            builder.is_empty(),
338                            "Builder should be empty if no snapshot read"
339                        );
340                        let (_, snapshot) = backfill_stream.into_inner();
341                        #[for_await]
342                        for msg in snapshot {
343                            let Either::Right(msg) = msg else {
344                                bail!("BUG: snapshot_read contains upstream messages");
345                            };
346                            match msg? {
347                                None => {
348                                    // End of the snapshot read stream.
349                                    // We let the barrier handling logic take care of upstream updates.
350                                    // But we still want to exit backfill loop, so we mark snapshot read complete.
351                                    snapshot_read_complete = true;
352                                    break;
353                                }
354                                Some(row) => {
355                                    let chunk = DataChunk::from_rows(&[row], &data_types);
356                                    yield Message::Chunk(Self::handle_snapshot_chunk(
357                                        chunk,
358                                        &mut current_pos,
359                                        &mut cur_barrier_snapshot_processed_rows,
360                                        &mut total_snapshot_processed_rows,
361                                        &pk_indices,
362                                        &self.output_indices,
363                                    ));
364                                    break;
365                                }
366                            }
367                        }
368                    }
369                }
370                // When we break out of inner backfill_stream loop, it means we have a barrier.
371                // If there are no updates and there are no snapshots left,
372                // we already finished backfill and should have exited the outer backfill loop.
373                let barrier = match pending_barrier.take() {
374                    Some(barrier) => barrier,
375                    None => bail!("BUG: current_backfill loop exited without a barrier"),
376                };
377
378                // Process barrier:
379                // - consume snapshot rows left in builder
380                // - consume upstream buffer chunk
381                // - switch snapshot
382
383                // Consume snapshot rows left in builder
384                let chunk = builder.consume_all();
385                if let Some(chunk) = chunk {
386                    yield Message::Chunk(Self::handle_snapshot_chunk(
387                        chunk,
388                        &mut current_pos,
389                        &mut cur_barrier_snapshot_processed_rows,
390                        &mut total_snapshot_processed_rows,
391                        &pk_indices,
392                        &self.output_indices,
393                    ));
394                }
395
396                // Consume upstream buffer chunk
397                // If no current_pos, means we did not process any snapshot
398                // yet. In that case
399                // we can just ignore the upstream buffer chunk, but still need to clean it.
400                if let Some(current_pos) = &current_pos {
401                    for chunk in upstream_chunk_buffer.drain(..) {
402                        cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
403                        yield Message::Chunk(mapping_chunk(
404                            mark_chunk(chunk, current_pos, &pk_indices, pk_order),
405                            &self.output_indices,
406                        ));
407                    }
408                } else {
409                    upstream_chunk_buffer.clear()
410                }
411
412                metrics
413                    .backfill_snapshot_read_row_count
414                    .inc_by(cur_barrier_snapshot_processed_rows);
415                metrics
416                    .backfill_upstream_output_row_count
417                    .inc_by(cur_barrier_upstream_processed_rows);
418
419                // Update snapshot read epoch.
420                snapshot_read_epoch = barrier.epoch.prev;
421
422                self.progress.update(
423                    barrier.epoch,
424                    snapshot_read_epoch,
425                    total_snapshot_processed_rows,
426                );
427
428                // Persist state on barrier
429                Self::persist_state(
430                    barrier.epoch,
431                    &mut self.state_table,
432                    false,
433                    &current_pos,
434                    total_snapshot_processed_rows,
435                    &mut old_state,
436                    &mut current_state,
437                )
438                .await?;
439
440                tracing::trace!(
441                    epoch = ?barrier.epoch,
442                    ?current_pos,
443                    total_snapshot_processed_rows,
444                    "Backfill state persisted"
445                );
446
447                // Update snapshot read chunk builder.
448                if let Some(mutation) = barrier.mutation.as_deref() {
449                    match mutation {
450                        Mutation::Pause => {
451                            paused = true;
452                        }
453                        Mutation::Resume => {
454                            paused = false;
455                        }
456                        Mutation::Throttle(actor_to_apply) => {
457                            let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
458                            if let Some(new_rate_limit) = new_rate_limit_entry {
459                                let new_rate_limit = (*new_rate_limit).into();
460                                let old_rate_limit = self.rate_limiter.update(new_rate_limit);
461                                if old_rate_limit != new_rate_limit {
462                                    tracing::info!(
463                                        old_rate_limit = ?old_rate_limit,
464                                        new_rate_limit = ?new_rate_limit,
465                                        upstream_table_id = upstream_table_id,
466                                        actor_id = self.actor_id,
467                                        "backfill rate limit changed",
468                                    );
469                                    // The builder is emptied above via `DataChunkBuilder::consume_all`.
470                                    assert!(
471                                        builder.is_empty(),
472                                        "builder should already be emptied"
473                                    );
474                                    builder = create_builder(
475                                        new_rate_limit,
476                                        self.chunk_size,
477                                        self.upstream_table.schema().data_types(),
478                                    );
479                                }
480                            }
481                        }
482                        _ => (),
483                    }
484                }
485
486                yield Message::Barrier(barrier);
487
488                if snapshot_read_complete {
489                    break 'backfill_loop;
490                }
491
492                // We will switch snapshot at the start of the next iteration of the backfill loop.
493            }
494        }
495
496        tracing::trace!("Backfill has finished, waiting for barrier");
497
498        // Wait for first barrier to come after backfill is finished.
499        // So we can update our progress + persist the status.
500        while let Some(Ok(msg)) = upstream.next().await {
501            if let Some(msg) = mapping_message(msg, &self.output_indices) {
502                // If not finished then we need to update state, otherwise no need.
503                if let Message::Barrier(barrier) = &msg {
504                    if is_finished {
505                        // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
506                        if let Some(table) = &mut self.state_table {
507                            table
508                                .commit_assert_no_update_vnode_bitmap(barrier.epoch)
509                                .await?;
510                        }
511                    } else {
512                        // If snapshot was empty, we do not need to backfill,
513                        // but we still need to persist the finished state.
514                        // We currently persist it on the second barrier here rather than first.
515                        // This is because we can't update state table in first epoch,
516                        // since it expects to have been initialized in previous epoch
517                        // (there's no epoch before the first epoch).
518                        if current_pos.is_none() {
519                            current_pos = Some(construct_initial_finished_state(pk_indices.len()))
520                        }
521
522                        // We will update current_pos at least once,
523                        // since snapshot read has to be non-empty,
524                        // Or snapshot was empty and we construct a placeholder state.
525                        debug_assert_ne!(current_pos, None);
526
527                        Self::persist_state(
528                            barrier.epoch,
529                            &mut self.state_table,
530                            true,
531                            &current_pos,
532                            total_snapshot_processed_rows,
533                            &mut old_state,
534                            &mut current_state,
535                        )
536                        .await?;
537                        tracing::trace!(
538                            epoch = ?barrier.epoch,
539                            ?current_pos,
540                            total_snapshot_processed_rows,
541                            "Backfill position persisted after completion"
542                        );
543                    }
544
545                    // For both backfill finished before recovery,
546                    // and backfill which just finished, we need to update mview tracker,
547                    // it does not persist this information.
548                    self.progress
549                        .finish(barrier.epoch, total_snapshot_processed_rows);
550                    tracing::trace!(
551                        epoch = ?barrier.epoch,
552                        "Updated CreateMaterializedTracker"
553                    );
554                    yield msg;
555                    break;
556                }
557                // Allow other messages to pass through.
558                // We won't yield twice here, since if there's a barrier,
559                // we will always break out of the loop.
560                yield msg;
561            }
562        }
563
564        tracing::trace!(
565            "Backfill has already finished and forward messages directly to the downstream"
566        );
567
568        // After progress finished + state persisted,
569        // we can forward messages directly to the downstream,
570        // as backfill is finished.
571        // We don't need to report backfill progress any longer, as it has finished.
572        // It will always be at 100%.
573        #[for_await]
574        for msg in upstream {
575            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
576                if let Message::Barrier(barrier) = &msg {
577                    // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
578                    if let Some(table) = &mut self.state_table {
579                        table
580                            .commit_assert_no_update_vnode_bitmap(barrier.epoch)
581                            .await?;
582                    }
583                }
584
585                yield msg;
586            }
587        }
588    }
589
590    async fn recover_backfill_state(
591        state_table: Option<&StateTable<S>>,
592        pk_len: usize,
593    ) -> StreamExecutorResult<BackfillState> {
594        let Some(state_table) = state_table else {
595            // If no state table, but backfill is present, it must be from an old cluster.
596            // In that case backfill must be finished, otherwise it won't have been persisted.
597            return Ok(BackfillState {
598                current_pos: None,
599                is_finished: true,
600                row_count: 0,
601                old_state: None,
602            });
603        };
604        let mut vnodes = state_table.vnodes().iter_vnodes_scalar();
605        let first_vnode = vnodes.next().unwrap();
606        let key: &[Datum] = &[Some(first_vnode.into())];
607        let row = state_table.get_row(key).await?;
608        let expected_state = Self::deserialize_backfill_state(row, pk_len);
609
610        // All vnode partitions should have same state (no scale-in supported).
611        for vnode in vnodes {
612            let key: &[Datum] = &[Some(vnode.into())];
613            let row = state_table.get_row(key).await?;
614            let state = Self::deserialize_backfill_state(row, pk_len);
615            assert_eq!(state.is_finished, expected_state.is_finished);
616        }
617        Ok(expected_state)
618    }
619
620    fn deserialize_backfill_state(row: Option<OwnedRow>, pk_len: usize) -> BackfillState {
621        let Some(row) = row else {
622            return BackfillState {
623                current_pos: None,
624                is_finished: false,
625                row_count: 0,
626                old_state: None,
627            };
628        };
629        let row = row.into_inner();
630        let mut old_state = vec![None; pk_len + METADATA_STATE_LEN];
631        old_state[1..row.len() + 1].clone_from_slice(&row);
632        let current_pos = Some((&row[0..pk_len]).into_owned_row());
633        let is_finished = row[pk_len].clone().is_some_and(|d| d.into_bool());
634        let row_count = row
635            .get(pk_len + 1)
636            .cloned()
637            .unwrap_or(None)
638            .map_or(0, |d| d.into_int64() as u64);
639        BackfillState {
640            current_pos,
641            is_finished,
642            row_count,
643            old_state: Some(old_state),
644        }
645    }
646
647    #[try_stream(ok = Option<OwnedRow>, error = StreamExecutorError)]
648    async fn make_snapshot_stream<'a>(
649        upstream_table: &'a BatchTable<S>,
650        epoch: u64,
651        current_pos: Option<OwnedRow>,
652        paused: bool,
653        rate_limiter: &'a MonitoredRateLimiter,
654    ) {
655        if paused {
656            #[for_await]
657            for _ in tokio_stream::pending() {
658                bail!("BUG: paused stream should not yield");
659            }
660        } else {
661            // Checked the rate limit is not zero.
662            #[for_await]
663            for r in
664                Self::snapshot_read(upstream_table, HummockReadEpoch::NoWait(epoch), current_pos)
665            {
666                rate_limiter.wait(1).await;
667                yield Some(r?);
668            }
669        }
670        yield None;
671    }
672
673    /// Snapshot read the upstream mv.
674    /// The rows from upstream snapshot read will be buffered inside the `builder`.
675    /// If snapshot is dropped before its rows are consumed,
676    /// remaining data in `builder` must be flushed manually.
677    /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
678    /// present, Then when we flush we contain duplicate rows.
679    #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
680    pub async fn snapshot_read(
681        upstream_table: &BatchTable<S>,
682        epoch: HummockReadEpoch,
683        current_pos: Option<OwnedRow>,
684    ) {
685        let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
686        let range_bounds = match range_bounds {
687            None => {
688                return Ok(());
689            }
690            Some(range_bounds) => range_bounds,
691        };
692
693        // We use uncommitted read here, because we have already scheduled the `BackfillExecutor`
694        // together with the upstream mv.
695        let row_iter = upstream_table
696            .batch_iter_with_pk_bounds(
697                epoch,
698                row::empty(),
699                range_bounds,
700                true,
701                // Here we only use small range prefetch because every barrier change, the executor will recreate a new iterator. So we do not need prefetch too much data.
702                PrefetchOptions::prefetch_for_small_range_scan(),
703            )
704            .await?;
705
706        #[for_await]
707        for row in row_iter {
708            yield row?;
709        }
710    }
711
712    async fn persist_state(
713        epoch: EpochPair,
714        table: &mut Option<StateTable<S>>,
715        is_finished: bool,
716        current_pos: &Option<OwnedRow>,
717        row_count: u64,
718        old_state: &mut Option<Vec<Datum>>,
719        current_state: &mut [Datum],
720    ) -> StreamExecutorResult<()> {
721        // Backwards compatibility with no state table in backfill.
722        let Some(table) = table else { return Ok(()) };
723        utils::persist_state(
724            epoch,
725            table,
726            is_finished,
727            current_pos,
728            row_count,
729            old_state,
730            current_state,
731        )
732        .await
733    }
734
735    /// 1. Converts from data chunk to stream chunk.
736    /// 2. Update the current position.
737    /// 3. Update Metrics
738    /// 4. Map the chunk according to output indices, return
739    ///    the stream chunk and do wrapping outside.
740    fn handle_snapshot_chunk(
741        data_chunk: DataChunk,
742        current_pos: &mut Option<OwnedRow>,
743        cur_barrier_snapshot_processed_rows: &mut u64,
744        total_snapshot_processed_rows: &mut u64,
745        pk_indices: &[usize],
746        output_indices: &[usize],
747    ) -> StreamChunk {
748        let ops = vec![Op::Insert; data_chunk.capacity()];
749        let chunk = StreamChunk::from_parts(ops, data_chunk);
750        // Raise the current position.
751        // As snapshot read streams are ordered by pk, so we can
752        // just use the last row to update `current_pos`.
753        *current_pos = Some(get_new_pos(&chunk, pk_indices));
754
755        let chunk_cardinality = chunk.cardinality() as u64;
756        *cur_barrier_snapshot_processed_rows += chunk_cardinality;
757        *total_snapshot_processed_rows += chunk_cardinality;
758
759        mapping_chunk(chunk, output_indices)
760    }
761}
762
763impl<S> Execute for BackfillExecutor<S>
764where
765    S: StateStore,
766{
767    fn execute(self: Box<Self>) -> BoxedMessageStream {
768        self.execute_inner().boxed()
769    }
770}