risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::ChangeLogRow;
35use risingwave_storage::table::batch_table::BatchTable;
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
40use crate::executor::backfill::snapshot_backfill::state::{
41    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
42};
43use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
44use crate::executor::backfill::utils::{create_builder, mapping_message};
45use crate::executor::monitor::StreamingMetrics;
46use crate::executor::prelude::{StateTable, StreamExt, try_stream};
47use crate::executor::{
48    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
49    MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, expect_first_barrier,
50};
51use crate::task::CreateMviewProgressReporter;
52
53pub struct SnapshotBackfillExecutor<S: StateStore> {
54    /// Upstream table
55    upstream_table: BatchTable<S>,
56
57    /// Backfill progress table
58    progress_state_table: StateTable<S>,
59
60    /// Upstream with the same schema with the upstream table.
61    upstream: MergeExecutorInput,
62
63    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
64    output_indices: Vec<usize>,
65
66    progress: CreateMviewProgressReporter,
67
68    chunk_size: usize,
69    rate_limit: RateLimit,
70
71    barrier_rx: UnboundedReceiver<Barrier>,
72
73    actor_ctx: ActorContextRef,
74    metrics: Arc<StreamingMetrics>,
75
76    snapshot_epoch: Option<u64>,
77}
78
79impl<S: StateStore> SnapshotBackfillExecutor<S> {
80    #[expect(clippy::too_many_arguments)]
81    pub(crate) fn new(
82        upstream_table: BatchTable<S>,
83        progress_state_table: StateTable<S>,
84        upstream: MergeExecutorInput,
85        output_indices: Vec<usize>,
86        actor_ctx: ActorContextRef,
87        progress: CreateMviewProgressReporter,
88        chunk_size: usize,
89        rate_limit: RateLimit,
90        barrier_rx: UnboundedReceiver<Barrier>,
91        metrics: Arc<StreamingMetrics>,
92        snapshot_epoch: Option<u64>,
93    ) -> Self {
94        assert_eq!(&upstream.info.schema, upstream_table.schema());
95        if upstream_table.pk_in_output_indices().is_none() {
96            panic!(
97                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
98                upstream_table.pk_indices(),
99                upstream_table.output_indices(),
100                upstream_table.schema()
101            )
102        };
103        if !matches!(rate_limit, RateLimit::Disabled) {
104            debug!(
105                ?rate_limit,
106                "create snapshot backfill executor with rate limit"
107            );
108        }
109        Self {
110            upstream_table,
111            progress_state_table,
112            upstream,
113            output_indices,
114            progress,
115            chunk_size,
116            rate_limit,
117            barrier_rx,
118            actor_ctx,
119            metrics,
120            snapshot_epoch,
121        }
122    }
123
124    #[try_stream(ok = Message, error = StreamExecutorError)]
125    async fn execute_inner(mut self) {
126        debug!("snapshot backfill executor start");
127        let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
128        debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
129        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
130        debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
131        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
132            self.snapshot_epoch
133        {
134            if first_upstream_barrier.epoch != first_recv_barrier.epoch {
135                assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
136                Some(snapshot_epoch)
137            } else {
138                None
139            }
140        } else {
141            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
142            if cfg!(debug_assertions) {
143                panic!(
144                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
145                    first_upstream_barrier.epoch, first_recv_barrier.epoch
146                );
147            } else {
148                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
149                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
150                None
151            }
152        };
153        let first_recv_barrier_epoch = first_recv_barrier.epoch;
154        let initial_backfill_paused =
155            first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
156        yield Message::Barrier(first_recv_barrier);
157        let mut backfill_state = BackfillState::new(
158            self.progress_state_table,
159            first_recv_barrier_epoch,
160            self.upstream_table.pk_serializer().clone(),
161        )
162        .await?;
163
164        let (mut barrier_epoch, mut need_report_finish) = {
165            if let Some(snapshot_epoch) = should_snapshot_backfill {
166                let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
167                let actor_id_str = format!("{}", self.actor_ctx.id);
168
169                let consume_upstream_row_count = self
170                    .metrics
171                    .snapshot_backfill_consume_row_count
172                    .with_guarded_label_values(&[
173                        table_id_str.as_str(),
174                        actor_id_str.as_str(),
175                        "consume_upstream",
176                    ]);
177
178                let mut upstream_buffer = UpstreamBuffer::new(
179                    &mut self.upstream,
180                    first_upstream_barrier,
181                    consume_upstream_row_count,
182                );
183
184                // Phase 1: consume upstream snapshot
185                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
186                    < snapshot_epoch
187                {
188                    info!(
189                        table_id = %self.upstream_table.table_id(),
190                        snapshot_epoch,
191                        barrier_epoch = ?first_recv_barrier_epoch,
192                        "start consuming snapshot"
193                    );
194                    {
195                        let consuming_snapshot_row_count = self
196                            .metrics
197                            .snapshot_backfill_consume_row_count
198                            .with_guarded_label_values(&[
199                                table_id_str.as_str(),
200                                actor_id_str.as_str(),
201                                "consuming_snapshot",
202                            ]);
203                        let snapshot_stream = make_consume_snapshot_stream(
204                            &self.upstream_table,
205                            snapshot_epoch,
206                            self.chunk_size,
207                            self.rate_limit,
208                            &mut self.barrier_rx,
209                            &mut self.progress,
210                            &mut backfill_state,
211                            first_recv_barrier_epoch,
212                            initial_backfill_paused,
213                            &self.actor_ctx,
214                        );
215
216                        pin_mut!(snapshot_stream);
217
218                        while let Some(message) = upstream_buffer
219                            .run_future(snapshot_stream.try_next())
220                            .await?
221                        {
222                            if let Message::Chunk(chunk) = &message {
223                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
224                            }
225                            yield message;
226                        }
227                    }
228
229                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
230                    let recv_barrier_epoch = recv_barrier.epoch;
231                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
232                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
233                    yield Message::Barrier(recv_barrier);
234                    post_commit.post_yield_barrier(None).await?;
235                    (
236                        recv_barrier_epoch,
237                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
238                    )
239                } else {
240                    info!(
241                        table_id = %self.upstream_table.table_id(),
242                        snapshot_epoch,
243                        barrier_epoch = ?first_recv_barrier_epoch,
244                        "skip consuming snapshot"
245                    );
246                    (
247                        first_recv_barrier_epoch,
248                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
249                    )
250                };
251
252                // Phase 2: consume upstream log store
253                if let Some(mut upstream_buffer) = upstream_buffer {
254                    let initial_pending_lag = Duration::from_millis(
255                        Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
256                    );
257                    info!(
258                        ?barrier_epoch,
259                        table_id = self.upstream_table.table_id().table_id,
260                        ?initial_pending_lag,
261                        "start consuming log store"
262                    );
263
264                    let consuming_log_store_row_count = self
265                        .metrics
266                        .snapshot_backfill_consume_row_count
267                        .with_guarded_label_values(&[
268                            table_id_str.as_str(),
269                            actor_id_str.as_str(),
270                            "consuming_log_store",
271                        ]);
272                    loop {
273                        let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
274                        assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
275                        let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
276                        {
277                            // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
278                            // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
279                            let next_prev_epoch =
280                                self.upstream_table.next_epoch(barrier_epoch.prev).await?;
281                            assert_eq!(next_prev_epoch, barrier.epoch.prev);
282                        }
283                        barrier_epoch = barrier.epoch;
284                        debug!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
285                        // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
286                        // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
287                        // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
288                        let mut stream = upstream_buffer
289                            .run_future(make_log_stream(
290                                &self.upstream_table,
291                                barrier_epoch.prev,
292                                None,
293                                self.chunk_size,
294                            ))
295                            .await?;
296                        while let Some(chunk) =
297                            upstream_buffer.run_future(stream.try_next()).await?
298                        {
299                            trace!(
300                                ?barrier_epoch,
301                                size = chunk.cardinality(),
302                                "consume change log yield chunk",
303                            );
304                            consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
305                            yield Message::Chunk(chunk);
306                        }
307
308                        trace!(?barrier_epoch, "after consume change log");
309
310                        if is_finished {
311                            assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
312                            self.progress.finish_consuming_log_store(barrier.epoch);
313                        } else {
314                            self.progress.update_create_mview_log_store_progress(
315                                barrier.epoch,
316                                upstream_buffer.pending_epoch_lag(),
317                            );
318                        }
319
320                        stream
321                            .for_vnode_pk_progress(|vnode, row_count, progress| {
322                                assert_eq!(progress, None);
323                                backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
324                            })
325                            .await?;
326                        let post_commit = backfill_state.commit(barrier.epoch).await?;
327                        let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
328                        yield Message::Barrier(barrier);
329                        post_commit.post_yield_barrier(None).await?;
330                        if update_vnode_bitmap.is_some() {
331                            return Err(anyhow!(
332                                "should not update vnode bitmap during consuming log store"
333                            )
334                            .into());
335                        }
336
337                        if is_finished {
338                            break;
339                        }
340                    }
341                    info!(
342                        ?barrier_epoch,
343                        table_id = self.upstream_table.table_id().table_id,
344                        "finish consuming log store"
345                    );
346
347                    (barrier_epoch, false)
348                } else {
349                    info!(
350                        ?barrier_epoch,
351                        table_id = self.upstream_table.table_id().table_id,
352                        "skip consuming log store and start consuming upstream directly"
353                    );
354
355                    (barrier_epoch, true)
356                }
357            } else {
358                backfill_state
359                    .latest_progress()
360                    .for_each(|(vnode, progress)| {
361                        let progress = progress.expect("should not be empty");
362                        assert_eq!(
363                            progress.epoch, first_upstream_barrier.epoch.prev,
364                            "vnode: {:?}",
365                            vnode
366                        );
367                        assert_eq!(
368                            progress.progress,
369                            EpochBackfillProgress::Consumed,
370                            "vnode: {:?}",
371                            vnode
372                        );
373                    });
374                info!(
375                    table_id = self.upstream_table.table_id().table_id,
376                    "skip backfill"
377                );
378                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
379                (first_upstream_barrier.epoch, true)
380            }
381        };
382        let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
383        let mut epoch_row_count = 0;
384        // Phase 3: consume upstream
385        while let Some(msg) = upstream.try_next().await? {
386            match msg {
387                Message::Barrier(barrier) => {
388                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
389                    self.upstream_table
390                        .vnodes()
391                        .iter_vnodes()
392                        .for_each(|vnode| {
393                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
394                            // executor.
395                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
396                        });
397                    epoch_row_count = 0;
398                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
399                    barrier_epoch = barrier.epoch;
400                    if need_report_finish {
401                        need_report_finish = false;
402                        self.progress.finish_consuming_log_store(barrier_epoch);
403                    }
404                    let post_commit = backfill_state.commit(barrier.epoch).await?;
405                    yield Message::Barrier(barrier);
406                    if let Some(new_vnode_bitmap) =
407                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
408                    {
409                        let _prev_vnode_bitmap =
410                            self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
411                        backfill_state
412                            .latest_progress()
413                            .for_each(|(vnode, progress)| {
414                                let progress = progress.expect("should not be empty");
415                                assert_eq!(
416                                    progress.epoch, barrier_epoch.prev,
417                                    "vnode {:?} has unexpected progress epoch",
418                                    vnode
419                                );
420                                assert_eq!(
421                                    progress.progress,
422                                    EpochBackfillProgress::Consumed,
423                                    "vnode {:?} has unexpected progress",
424                                    vnode
425                                );
426                            });
427                    }
428                }
429                msg => {
430                    if let Message::Chunk(chunk) = &msg {
431                        epoch_row_count += chunk.cardinality();
432                    }
433                    yield msg;
434                }
435            }
436        }
437    }
438}
439
440impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
441    fn execute(self: Box<Self>) -> BoxedMessageStream {
442        let output_indices = self.output_indices.clone();
443        self.execute_inner()
444            .filter_map(move |result| {
445                ready({
446                    match result {
447                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
448                        Err(e) => Some(Err(e)),
449                    }
450                })
451            })
452            .boxed()
453    }
454}
455
456struct ConsumingSnapshot;
457struct ConsumingLogStore;
458
459struct PendingBarriers {
460    first_upstream_barrier_epoch: EpochPair,
461
462    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
463    /// Newer barrier at the front
464    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
465
466    /// In the outer `VecDeque`, newer barriers at the front.
467    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
468    /// and others as non-checkpoint barrier
469    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
470}
471
472impl PendingBarriers {
473    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
474        Self {
475            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
476            pending_non_checkpoint_barriers: Default::default(),
477            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
478                first_upstream_barrier,
479            ])]),
480        }
481    }
482
483    fn add(&mut self, barrier: DispatcherBarrier) {
484        let is_checkpoint = barrier.kind.is_checkpoint();
485        self.pending_non_checkpoint_barriers.push_front(barrier);
486        if is_checkpoint {
487            self.checkpoint_barrier_groups
488                .push_front(take(&mut self.pending_non_checkpoint_barriers));
489        }
490    }
491
492    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
493        self.checkpoint_barrier_groups.pop_back()
494    }
495
496    fn consume_epoch(&mut self, epoch: EpochPair) {
497        let barriers = self
498            .checkpoint_barrier_groups
499            .back_mut()
500            .expect("non-empty");
501        let oldest_upstream_barrier = barriers.back().expect("non-empty");
502        assert!(
503            oldest_upstream_barrier.epoch.prev >= epoch.prev,
504            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
505            oldest_upstream_barrier.epoch,
506            epoch
507        );
508        if oldest_upstream_barrier.epoch.prev == epoch.prev {
509            assert_eq!(oldest_upstream_barrier.epoch, epoch);
510            barriers.pop_back();
511            if barriers.is_empty() {
512                self.checkpoint_barrier_groups.pop_back();
513            }
514        }
515    }
516
517    fn latest_epoch(&self) -> Option<EpochPair> {
518        self.pending_non_checkpoint_barriers
519            .front()
520            .or_else(|| {
521                self.checkpoint_barrier_groups
522                    .front()
523                    .and_then(|barriers| barriers.front())
524            })
525            .map(|barrier| barrier.epoch)
526    }
527
528    fn checkpoint_epoch_count(&self) -> usize {
529        self.checkpoint_barrier_groups.len()
530    }
531
532    fn has_checkpoint_epoch(&self) -> bool {
533        !self.checkpoint_barrier_groups.is_empty()
534    }
535}
536
537struct UpstreamBuffer<'a, S> {
538    upstream: &'a mut MergeExecutorInput,
539    max_pending_epoch_lag: u64,
540    consumed_epoch: u64,
541    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
542    upstream_pending_barriers: PendingBarriers,
543    /// Whether we have started polling any upstream data before the next barrier.
544    /// When `true`, we should continue polling until the next barrier, because
545    /// some data in this epoch have been discarded and data in this epoch
546    /// must be read from log store
547    is_polling_epoch_data: bool,
548    consume_upstream_row_count: LabelGuardedIntCounter,
549    _phase: S,
550}
551
552impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
553    fn new(
554        upstream: &'a mut MergeExecutorInput,
555        first_upstream_barrier: DispatcherBarrier,
556        consume_upstream_row_count: LabelGuardedIntCounter,
557    ) -> Self {
558        Self {
559            upstream,
560            is_polling_epoch_data: false,
561            consume_upstream_row_count,
562            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
563            // no limit on the number of pending barrier in the beginning
564            max_pending_epoch_lag: u64::MAX,
565            consumed_epoch: 0,
566            _phase: ConsumingSnapshot {},
567        }
568    }
569
570    fn start_consuming_log_store(
571        mut self,
572        consumed_epoch: u64,
573    ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
574        if self
575            .upstream_pending_barriers
576            .first_upstream_barrier_epoch
577            .prev
578            == consumed_epoch
579        {
580            assert_eq!(
581                1,
582                self.upstream_pending_barriers
583                    .pop()
584                    .expect("non-empty")
585                    .len()
586            );
587        }
588        let max_pending_epoch_lag = self.pending_epoch_lag();
589        let buffer = UpstreamBuffer {
590            upstream: self.upstream,
591            upstream_pending_barriers: self.upstream_pending_barriers,
592            max_pending_epoch_lag,
593            is_polling_epoch_data: self.is_polling_epoch_data,
594            consume_upstream_row_count: self.consume_upstream_row_count,
595            consumed_epoch,
596            _phase: ConsumingLogStore {},
597        };
598        if buffer.is_finished() {
599            None
600        } else {
601            Some(buffer)
602        }
603    }
604}
605
606impl<S> UpstreamBuffer<'_, S> {
607    fn can_consume_upstream(&self) -> bool {
608        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
609    }
610
611    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
612        {
613            loop {
614                if let Err(e) = try {
615                    if !self.can_consume_upstream() {
616                        // pause the future to block consuming upstream
617                        return pending().await;
618                    }
619                    self.consume_until_next_checkpoint_barrier().await?;
620                } {
621                    break e;
622                }
623            }
624        }
625    }
626
627    /// Consume the upstream until seeing the next barrier.
628    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
629        loop {
630            let msg: DispatcherMessage = self
631                .upstream
632                .try_next()
633                .await?
634                .ok_or_else(|| anyhow!("end of upstream"))?;
635            match msg {
636                DispatcherMessage::Chunk(chunk) => {
637                    self.is_polling_epoch_data = true;
638                    self.consume_upstream_row_count
639                        .inc_by(chunk.cardinality() as _);
640                }
641                DispatcherMessage::Barrier(barrier) => {
642                    let is_checkpoint = barrier.kind.is_checkpoint();
643                    self.upstream_pending_barriers.add(barrier);
644                    if is_checkpoint {
645                        self.is_polling_epoch_data = false;
646                        break;
647                    } else {
648                        self.is_polling_epoch_data = true;
649                    }
650                }
651                DispatcherMessage::Watermark(_) => {
652                    self.is_polling_epoch_data = true;
653                }
654            }
655        }
656        Ok(())
657    }
658}
659
660impl UpstreamBuffer<'_, ConsumingLogStore> {
661    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
662        assert!(!self.is_finished());
663        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
664            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
665            // we must have returned true to indicate finish, and should not be called again.
666            assert!(self.is_polling_epoch_data);
667            self.consume_until_next_checkpoint_barrier().await?;
668            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
669        }
670        self.upstream_pending_barriers.consume_epoch(epoch);
671
672        {
673            {
674                let prev_epoch = epoch.prev;
675                assert!(self.consumed_epoch < prev_epoch);
676                let elapsed_epoch = prev_epoch - self.consumed_epoch;
677                self.consumed_epoch = prev_epoch;
678                if self.upstream_pending_barriers.has_checkpoint_epoch() {
679                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
680                    while self.can_consume_upstream()
681                        && let Some(result) =
682                            self.consume_until_next_checkpoint_barrier().now_or_never()
683                    {
684                        result?;
685                    }
686                }
687                // sub to ensure that the lag is monotonically decreasing.
688                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
689                // the upstream can at least progress for one epoch.
690                self.max_pending_epoch_lag = min(
691                    self.pending_epoch_lag(),
692                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
693                );
694            }
695        }
696        Ok(self.is_finished())
697    }
698
699    fn is_finished(&self) -> bool {
700        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
701            assert!(
702                self.upstream_pending_barriers
703                    .pending_non_checkpoint_barriers
704                    .is_empty()
705            )
706        }
707        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
708    }
709}
710
711impl<S> UpstreamBuffer<'_, S> {
712    /// Run a future while concurrently polling the upstream so that the upstream
713    /// won't be back-pressured.
714    async fn run_future<T, E: Into<StreamExecutorError>>(
715        &mut self,
716        future: impl Future<Output = Result<T, E>>,
717    ) -> StreamExecutorResult<T> {
718        select! {
719            biased;
720            e = self.concurrently_consume_upstream() => {
721                Err(e)
722            }
723            // this arm won't be starved, because the first arm is always pending unless returning with error
724            result = future => {
725                result.map_err(Into::into)
726            }
727        }
728    }
729
730    fn pending_epoch_lag(&self) -> u64 {
731        self.upstream_pending_barriers
732            .latest_epoch()
733            .map(|epoch| {
734                epoch
735                    .prev
736                    .checked_sub(self.consumed_epoch)
737                    .expect("pending epoch must be later than consumed_epoch")
738            })
739            .unwrap_or(0)
740    }
741}
742
743async fn make_log_stream(
744    upstream_table: &BatchTable<impl StateStore>,
745    prev_epoch: u64,
746    start_pk: Option<OwnedRow>,
747    chunk_size: usize,
748) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
749    let data_types = upstream_table.schema().data_types();
750    let start_pk = start_pk.as_ref();
751    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
752    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
753        upstream_table
754            .batch_iter_vnode_log(
755                prev_epoch,
756                HummockReadEpoch::Committed(prev_epoch),
757                start_pk,
758                vnode,
759            )
760            .map_ok(move |stream| {
761                let stream = stream.map_err(Into::into);
762                (vnode, stream, 0)
763            })
764    }))
765    .await?;
766    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
767    Ok(VnodeStream::new(
768        vnode_streams,
769        upstream_table.pk_in_output_indices().expect("should exist"),
770        builder,
771    ))
772}
773
774async fn make_snapshot_stream(
775    upstream_table: &BatchTable<impl StateStore>,
776    snapshot_epoch: u64,
777    backfill_state: &BackfillState<impl StateStore>,
778    rate_limit: RateLimit,
779    chunk_size: usize,
780) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
781    let data_types = upstream_table.schema().data_types();
782    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
783        move |(vnode, progress)| {
784            let start_pk = match progress {
785                None => Some((None, 0)),
786                Some(VnodeBackfillProgress {
787                    row_count,
788                    progress: EpochBackfillProgress::Consuming { latest_pk },
789                    ..
790                }) => Some((Some(latest_pk), *row_count)),
791                Some(VnodeBackfillProgress {
792                    progress: EpochBackfillProgress::Consumed,
793                    ..
794                }) => None,
795            };
796            start_pk.map(|(start_pk, row_count)| {
797                upstream_table
798                    .batch_iter_vnode(
799                        HummockReadEpoch::Committed(snapshot_epoch),
800                        start_pk,
801                        vnode,
802                        PrefetchOptions::prefetch_for_large_range_scan(),
803                    )
804                    .map_ok(move |stream| {
805                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
806                        (vnode, stream, row_count)
807                    })
808            })
809        },
810    ))
811    .await?;
812    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
813    Ok(VnodeStream::new(
814        vnode_streams,
815        upstream_table.pk_in_output_indices().expect("should exist"),
816        builder,
817    ))
818}
819
820#[expect(clippy::too_many_arguments)]
821#[try_stream(ok = Message, error = StreamExecutorError)]
822async fn make_consume_snapshot_stream<'a, S: StateStore>(
823    upstream_table: &'a BatchTable<S>,
824    snapshot_epoch: u64,
825    chunk_size: usize,
826    rate_limit: RateLimit,
827    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
828    progress: &'a mut CreateMviewProgressReporter,
829    backfill_state: &'a mut BackfillState<S>,
830    first_recv_barrier_epoch: EpochPair,
831    initial_backfill_paused: bool,
832    actor_ctx: &'a ActorContextRef,
833) {
834    let mut barrier_epoch = first_recv_barrier_epoch;
835
836    // start consume upstream snapshot
837    let mut snapshot_stream = make_snapshot_stream(
838        upstream_table,
839        snapshot_epoch,
840        &*backfill_state,
841        rate_limit,
842        chunk_size,
843    )
844    .await?;
845
846    async fn select_barrier_and_snapshot_stream(
847        barrier_rx: &mut UnboundedReceiver<Barrier>,
848        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
849        throttle_snapshot_stream: bool,
850        backfill_paused: bool,
851    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
852        select!(
853            result = receive_next_barrier(barrier_rx) => {
854                Ok(Either::Left(result?))
855            },
856            result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
857                Ok(Either::Right(result?))
858            }
859        )
860    }
861
862    let mut count = 0;
863    let mut epoch_row_count = 0;
864    let mut backfill_paused = initial_backfill_paused;
865    loop {
866        let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
867        match select_barrier_and_snapshot_stream(
868            barrier_rx,
869            &mut snapshot_stream,
870            throttle_snapshot_stream,
871            backfill_paused,
872        )
873        .await?
874        {
875            Either::Left(barrier) => {
876                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
877                barrier_epoch = barrier.epoch;
878                if barrier_epoch.curr >= snapshot_epoch {
879                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
880                }
881                if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
882                    if backfill_paused {
883                        backfill_paused = false;
884                    } else {
885                        tracing::error!(
886                            "received start fragment backfill mutation, but backfill is not paused"
887                        );
888                    }
889                }
890                if let Some(chunk) = snapshot_stream.consume_builder() {
891                    count += chunk.cardinality();
892                    epoch_row_count += chunk.cardinality();
893                    yield Message::Chunk(chunk);
894                }
895                snapshot_stream
896                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
897                        if let Some(pk) = pk_progress {
898                            backfill_state.update_epoch_progress(
899                                vnode,
900                                snapshot_epoch,
901                                row_count,
902                                pk,
903                            );
904                        } else {
905                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
906                        }
907                    })
908                    .await?;
909                let post_commit = backfill_state.commit(barrier.epoch).await?;
910                debug!(?barrier_epoch, count, epoch_row_count, "update progress");
911                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
912                epoch_row_count = 0;
913
914                yield Message::Barrier(barrier);
915                post_commit.post_yield_barrier(None).await?;
916            }
917            Either::Right(Some(chunk)) => {
918                if backfill_paused {
919                    return Err(
920                        anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
921                    );
922                }
923                count += chunk.cardinality();
924                epoch_row_count += chunk.cardinality();
925                yield Message::Chunk(chunk);
926            }
927            Either::Right(None) => {
928                break;
929            }
930        }
931    }
932
933    // finish consuming upstream snapshot, report finish
934    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
935    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
936    barrier_epoch = barrier_to_report_finish.epoch;
937    info!(?barrier_epoch, count, "report finish");
938    snapshot_stream
939        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
940            assert_eq!(pk_progress, None);
941            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
942        })
943        .await?;
944    let post_commit = backfill_state.commit(barrier_epoch).await?;
945    progress.finish(barrier_epoch, count as _);
946    yield Message::Barrier(barrier_to_report_finish);
947    post_commit.post_yield_barrier(None).await?;
948
949    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
950    loop {
951        let barrier = receive_next_barrier(barrier_rx).await?;
952        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
953        barrier_epoch = barrier.epoch;
954        let post_commit = backfill_state.commit(barrier.epoch).await?;
955        yield Message::Barrier(barrier);
956        post_commit.post_yield_barrier(None).await?;
957        if barrier_epoch.curr == snapshot_epoch {
958            break;
959        }
960    }
961    info!(?barrier_epoch, "finish consuming snapshot");
962}