risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::ChangeLogRow;
35use risingwave_storage::table::batch_table::BatchTable;
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
40use crate::executor::backfill::snapshot_backfill::state::{
41    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
42};
43use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
44use crate::executor::backfill::utils::{create_builder, mapping_message};
45use crate::executor::monitor::StreamingMetrics;
46use crate::executor::prelude::{StateTable, StreamExt, try_stream};
47use crate::executor::{
48    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
49    MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, expect_first_barrier,
50};
51use crate::task::CreateMviewProgressReporter;
52
53pub struct SnapshotBackfillExecutor<S: StateStore> {
54    /// Upstream table
55    upstream_table: BatchTable<S>,
56
57    /// Backfill progress table
58    progress_state_table: StateTable<S>,
59
60    /// Upstream with the same schema with the upstream table.
61    upstream: MergeExecutorInput,
62
63    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
64    output_indices: Vec<usize>,
65
66    progress: CreateMviewProgressReporter,
67
68    chunk_size: usize,
69    rate_limit: RateLimit,
70
71    barrier_rx: UnboundedReceiver<Barrier>,
72
73    actor_ctx: ActorContextRef,
74    metrics: Arc<StreamingMetrics>,
75
76    snapshot_epoch: Option<u64>,
77}
78
79impl<S: StateStore> SnapshotBackfillExecutor<S> {
80    #[expect(clippy::too_many_arguments)]
81    pub(crate) fn new(
82        upstream_table: BatchTable<S>,
83        progress_state_table: StateTable<S>,
84        upstream: MergeExecutorInput,
85        output_indices: Vec<usize>,
86        actor_ctx: ActorContextRef,
87        progress: CreateMviewProgressReporter,
88        chunk_size: usize,
89        rate_limit: RateLimit,
90        barrier_rx: UnboundedReceiver<Barrier>,
91        metrics: Arc<StreamingMetrics>,
92        snapshot_epoch: Option<u64>,
93    ) -> Self {
94        assert_eq!(&upstream.info.schema, upstream_table.schema());
95        if upstream_table.pk_in_output_indices().is_none() {
96            panic!(
97                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
98                upstream_table.pk_indices(),
99                upstream_table.output_indices(),
100                upstream_table.schema()
101            )
102        };
103        if !matches!(rate_limit, RateLimit::Disabled) {
104            debug!(
105                ?rate_limit,
106                "create snapshot backfill executor with rate limit"
107            );
108        }
109        Self {
110            upstream_table,
111            progress_state_table,
112            upstream,
113            output_indices,
114            progress,
115            chunk_size,
116            rate_limit,
117            barrier_rx,
118            actor_ctx,
119            metrics,
120            snapshot_epoch,
121        }
122    }
123
124    #[try_stream(ok = Message, error = StreamExecutorError)]
125    async fn execute_inner(mut self) {
126        debug!("snapshot backfill executor start");
127        let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
128        debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
129        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
130        debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
131        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
132            self.snapshot_epoch
133        {
134            if first_upstream_barrier.epoch != first_recv_barrier.epoch {
135                assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
136                Some(snapshot_epoch)
137            } else {
138                None
139            }
140        } else {
141            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
142            if cfg!(debug_assertions) {
143                panic!(
144                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
145                    first_upstream_barrier.epoch, first_recv_barrier.epoch
146                );
147            } else {
148                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
149                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
150                None
151            }
152        };
153        let first_recv_barrier_epoch = first_recv_barrier.epoch;
154        let initial_backfill_paused =
155            first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
156        yield Message::Barrier(first_recv_barrier);
157        let mut backfill_state = BackfillState::new(
158            self.progress_state_table,
159            first_recv_barrier_epoch,
160            self.upstream_table.pk_serializer().clone(),
161        )
162        .await?;
163
164        let (mut barrier_epoch, mut need_report_finish) = {
165            if let Some(snapshot_epoch) = should_snapshot_backfill {
166                let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
167                let actor_id_str = format!("{}", self.actor_ctx.id);
168
169                let consume_upstream_row_count = self
170                    .metrics
171                    .snapshot_backfill_consume_row_count
172                    .with_guarded_label_values(&[
173                        table_id_str.as_str(),
174                        actor_id_str.as_str(),
175                        "consume_upstream",
176                    ]);
177
178                let mut upstream_buffer = UpstreamBuffer::new(
179                    &mut self.upstream,
180                    first_upstream_barrier,
181                    consume_upstream_row_count,
182                );
183
184                // Phase 1: consume upstream snapshot
185                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
186                    < snapshot_epoch
187                {
188                    info!(
189                        table_id = %self.upstream_table.table_id(),
190                        snapshot_epoch,
191                        barrier_epoch = ?first_recv_barrier_epoch,
192                        "start consuming snapshot"
193                    );
194                    {
195                        let consuming_snapshot_row_count = self
196                            .metrics
197                            .snapshot_backfill_consume_row_count
198                            .with_guarded_label_values(&[
199                                table_id_str.as_str(),
200                                actor_id_str.as_str(),
201                                "consuming_snapshot",
202                            ]);
203                        let snapshot_stream = make_consume_snapshot_stream(
204                            &self.upstream_table,
205                            snapshot_epoch,
206                            self.chunk_size,
207                            self.rate_limit,
208                            &mut self.barrier_rx,
209                            &mut self.progress,
210                            &mut backfill_state,
211                            first_recv_barrier_epoch,
212                            initial_backfill_paused,
213                            &self.actor_ctx,
214                        );
215
216                        pin_mut!(snapshot_stream);
217
218                        while let Some(message) = upstream_buffer
219                            .run_future(snapshot_stream.try_next())
220                            .await?
221                        {
222                            if let Message::Chunk(chunk) = &message {
223                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
224                            }
225                            yield message;
226                        }
227                    }
228
229                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
230                    let recv_barrier_epoch = recv_barrier.epoch;
231                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
232                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
233                    yield Message::Barrier(recv_barrier);
234                    post_commit.post_yield_barrier(None).await?;
235                    (
236                        recv_barrier_epoch,
237                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
238                    )
239                } else {
240                    info!(
241                        table_id = %self.upstream_table.table_id(),
242                        snapshot_epoch,
243                        barrier_epoch = ?first_recv_barrier_epoch,
244                        "skip consuming snapshot"
245                    );
246                    (
247                        first_recv_barrier_epoch,
248                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
249                    )
250                };
251
252                // Phase 2: consume upstream log store
253                if let Some(mut upstream_buffer) = upstream_buffer {
254                    let initial_pending_lag = Duration::from_millis(
255                        Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
256                    );
257                    info!(
258                        ?barrier_epoch,
259                        table_id = self.upstream_table.table_id().table_id,
260                        ?initial_pending_lag,
261                        "start consuming log store"
262                    );
263
264                    let consuming_log_store_row_count = self
265                        .metrics
266                        .snapshot_backfill_consume_row_count
267                        .with_guarded_label_values(&[
268                            table_id_str.as_str(),
269                            actor_id_str.as_str(),
270                            "consuming_log_store",
271                        ]);
272                    loop {
273                        let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
274                        assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
275                        let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
276                        {
277                            // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
278                            // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
279                            let next_prev_epoch =
280                                self.upstream_table.next_epoch(barrier_epoch.prev).await?;
281                            assert_eq!(next_prev_epoch, barrier.epoch.prev);
282                        }
283                        barrier_epoch = barrier.epoch;
284                        debug!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
285                        // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
286                        // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
287                        // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
288                        let mut stream = upstream_buffer
289                            .run_future(make_log_stream(
290                                &self.upstream_table,
291                                barrier_epoch.prev,
292                                None,
293                                self.chunk_size,
294                            ))
295                            .await?;
296                        while let Some(chunk) =
297                            upstream_buffer.run_future(stream.try_next()).await?
298                        {
299                            trace!(
300                                ?barrier_epoch,
301                                size = chunk.cardinality(),
302                                "consume change log yield chunk",
303                            );
304                            consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
305                            yield Message::Chunk(chunk);
306                        }
307
308                        trace!(?barrier_epoch, "after consume change log");
309
310                        if is_finished {
311                            assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
312                            self.progress.finish_consuming_log_store(barrier.epoch);
313                        } else {
314                            self.progress.update_create_mview_log_store_progress(
315                                barrier.epoch,
316                                upstream_buffer.pending_epoch_lag(),
317                            );
318                        }
319
320                        stream
321                            .for_vnode_pk_progress(|vnode, row_count, progress| {
322                                assert_eq!(progress, None);
323                                backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
324                            })
325                            .await?;
326                        let post_commit = backfill_state.commit(barrier.epoch).await?;
327                        let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
328                        yield Message::Barrier(barrier);
329                        post_commit.post_yield_barrier(None).await?;
330                        if update_vnode_bitmap.is_some() {
331                            return Err(anyhow!(
332                                "should not update vnode bitmap during consuming log store"
333                            )
334                            .into());
335                        }
336
337                        if is_finished {
338                            break;
339                        }
340                    }
341                    info!(
342                        ?barrier_epoch,
343                        table_id = self.upstream_table.table_id().table_id,
344                        "finish consuming log store"
345                    );
346
347                    (barrier_epoch, false)
348                } else {
349                    info!(
350                        ?barrier_epoch,
351                        table_id = self.upstream_table.table_id().table_id,
352                        "skip consuming log store and start consuming upstream directly"
353                    );
354
355                    (barrier_epoch, true)
356                }
357            } else {
358                backfill_state
359                    .latest_progress()
360                    .for_each(|(vnode, progress)| {
361                        let progress = progress.expect("should not be empty");
362                        assert_eq!(
363                            progress.epoch, first_upstream_barrier.epoch.prev,
364                            "vnode: {:?}",
365                            vnode
366                        );
367                        assert_eq!(
368                            progress.progress,
369                            EpochBackfillProgress::Consumed,
370                            "vnode: {:?}",
371                            vnode
372                        );
373                    });
374                info!(
375                    table_id = self.upstream_table.table_id().table_id,
376                    "skip backfill"
377                );
378                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
379                (first_upstream_barrier.epoch, true)
380            }
381        };
382        let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
383        let mut epoch_row_count = 0;
384        // Phase 3: consume upstream
385        while let Some(msg) = upstream.try_next().await? {
386            match msg {
387                Message::Barrier(barrier) => {
388                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
389                    self.upstream_table
390                        .vnodes()
391                        .iter_vnodes()
392                        .for_each(|vnode| {
393                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
394                            // executor.
395                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
396                        });
397                    epoch_row_count = 0;
398                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
399                    barrier_epoch = barrier.epoch;
400                    if need_report_finish {
401                        need_report_finish = false;
402                        self.progress.finish_consuming_log_store(barrier_epoch);
403                    }
404                    let post_commit = backfill_state.commit(barrier.epoch).await?;
405                    yield Message::Barrier(barrier);
406                    if let Some(new_vnode_bitmap) =
407                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
408                    {
409                        let _prev_vnode_bitmap = self
410                            .upstream_table
411                            .update_vnode_bitmap(new_vnode_bitmap.clone());
412                        backfill_state
413                            .latest_progress()
414                            .for_each(|(vnode, progress)| {
415                                let progress = progress.expect("should not be empty");
416                                assert_eq!(
417                                    progress.epoch, barrier_epoch.prev,
418                                    "vnode {:?} has unexpected progress epoch",
419                                    vnode
420                                );
421                                assert_eq!(
422                                    progress.progress,
423                                    EpochBackfillProgress::Consumed,
424                                    "vnode {:?} has unexpected progress",
425                                    vnode
426                                );
427                            });
428                    }
429                }
430                msg => {
431                    if let Message::Chunk(chunk) = &msg {
432                        epoch_row_count += chunk.cardinality();
433                    }
434                    yield msg;
435                }
436            }
437        }
438    }
439}
440
441impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
442    fn execute(self: Box<Self>) -> BoxedMessageStream {
443        let output_indices = self.output_indices.clone();
444        self.execute_inner()
445            .filter_map(move |result| {
446                ready({
447                    match result {
448                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
449                        Err(e) => Some(Err(e)),
450                    }
451                })
452            })
453            .boxed()
454    }
455}
456
457struct ConsumingSnapshot;
458struct ConsumingLogStore;
459
460struct PendingBarriers {
461    first_upstream_barrier_epoch: EpochPair,
462
463    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
464    /// Newer barrier at the front
465    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
466
467    /// In the outer `VecDeque`, newer barriers at the front.
468    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
469    /// and others as non-checkpoint barrier
470    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
471}
472
473impl PendingBarriers {
474    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
475        Self {
476            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
477            pending_non_checkpoint_barriers: Default::default(),
478            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
479                first_upstream_barrier,
480            ])]),
481        }
482    }
483
484    fn add(&mut self, barrier: DispatcherBarrier) {
485        let is_checkpoint = barrier.kind.is_checkpoint();
486        self.pending_non_checkpoint_barriers.push_front(barrier);
487        if is_checkpoint {
488            self.checkpoint_barrier_groups
489                .push_front(take(&mut self.pending_non_checkpoint_barriers));
490        }
491    }
492
493    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
494        self.checkpoint_barrier_groups.pop_back()
495    }
496
497    fn consume_epoch(&mut self, epoch: EpochPair) {
498        let barriers = self
499            .checkpoint_barrier_groups
500            .back_mut()
501            .expect("non-empty");
502        let oldest_upstream_barrier = barriers.back().expect("non-empty");
503        assert!(
504            oldest_upstream_barrier.epoch.prev >= epoch.prev,
505            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
506            oldest_upstream_barrier.epoch,
507            epoch
508        );
509        if oldest_upstream_barrier.epoch.prev == epoch.prev {
510            assert_eq!(oldest_upstream_barrier.epoch, epoch);
511            barriers.pop_back();
512            if barriers.is_empty() {
513                self.checkpoint_barrier_groups.pop_back();
514            }
515        }
516    }
517
518    fn latest_epoch(&self) -> Option<EpochPair> {
519        self.pending_non_checkpoint_barriers
520            .front()
521            .or_else(|| {
522                self.checkpoint_barrier_groups
523                    .front()
524                    .and_then(|barriers| barriers.front())
525            })
526            .map(|barrier| barrier.epoch)
527    }
528
529    fn checkpoint_epoch_count(&self) -> usize {
530        self.checkpoint_barrier_groups.len()
531    }
532
533    fn has_checkpoint_epoch(&self) -> bool {
534        !self.checkpoint_barrier_groups.is_empty()
535    }
536}
537
538struct UpstreamBuffer<'a, S> {
539    upstream: &'a mut MergeExecutorInput,
540    max_pending_epoch_lag: u64,
541    consumed_epoch: u64,
542    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
543    upstream_pending_barriers: PendingBarriers,
544    /// Whether we have started polling any upstream data before the next barrier.
545    /// When `true`, we should continue polling until the next barrier, because
546    /// some data in this epoch have been discarded and data in this epoch
547    /// must be read from log store
548    is_polling_epoch_data: bool,
549    consume_upstream_row_count: LabelGuardedIntCounter,
550    _phase: S,
551}
552
553impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
554    fn new(
555        upstream: &'a mut MergeExecutorInput,
556        first_upstream_barrier: DispatcherBarrier,
557        consume_upstream_row_count: LabelGuardedIntCounter,
558    ) -> Self {
559        Self {
560            upstream,
561            is_polling_epoch_data: false,
562            consume_upstream_row_count,
563            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
564            // no limit on the number of pending barrier in the beginning
565            max_pending_epoch_lag: u64::MAX,
566            consumed_epoch: 0,
567            _phase: ConsumingSnapshot {},
568        }
569    }
570
571    fn start_consuming_log_store(
572        mut self,
573        consumed_epoch: u64,
574    ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
575        if self
576            .upstream_pending_barriers
577            .first_upstream_barrier_epoch
578            .prev
579            == consumed_epoch
580        {
581            assert_eq!(
582                1,
583                self.upstream_pending_barriers
584                    .pop()
585                    .expect("non-empty")
586                    .len()
587            );
588        }
589        let max_pending_epoch_lag = self.pending_epoch_lag();
590        let buffer = UpstreamBuffer {
591            upstream: self.upstream,
592            upstream_pending_barriers: self.upstream_pending_barriers,
593            max_pending_epoch_lag,
594            is_polling_epoch_data: self.is_polling_epoch_data,
595            consume_upstream_row_count: self.consume_upstream_row_count,
596            consumed_epoch,
597            _phase: ConsumingLogStore {},
598        };
599        if buffer.is_finished() {
600            None
601        } else {
602            Some(buffer)
603        }
604    }
605}
606
607impl<S> UpstreamBuffer<'_, S> {
608    fn can_consume_upstream(&self) -> bool {
609        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
610    }
611
612    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
613        {
614            loop {
615                if let Err(e) = try {
616                    if !self.can_consume_upstream() {
617                        // pause the future to block consuming upstream
618                        return pending().await;
619                    }
620                    self.consume_until_next_checkpoint_barrier().await?;
621                } {
622                    break e;
623                }
624            }
625        }
626    }
627
628    /// Consume the upstream until seeing the next barrier.
629    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
630        loop {
631            let msg: DispatcherMessage = self
632                .upstream
633                .try_next()
634                .await?
635                .ok_or_else(|| anyhow!("end of upstream"))?;
636            match msg {
637                DispatcherMessage::Chunk(chunk) => {
638                    self.is_polling_epoch_data = true;
639                    self.consume_upstream_row_count
640                        .inc_by(chunk.cardinality() as _);
641                }
642                DispatcherMessage::Barrier(barrier) => {
643                    let is_checkpoint = barrier.kind.is_checkpoint();
644                    self.upstream_pending_barriers.add(barrier);
645                    if is_checkpoint {
646                        self.is_polling_epoch_data = false;
647                        break;
648                    } else {
649                        self.is_polling_epoch_data = true;
650                    }
651                }
652                DispatcherMessage::Watermark(_) => {
653                    self.is_polling_epoch_data = true;
654                }
655            }
656        }
657        Ok(())
658    }
659}
660
661impl UpstreamBuffer<'_, ConsumingLogStore> {
662    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
663        assert!(!self.is_finished());
664        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
665            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
666            // we must have returned true to indicate finish, and should not be called again.
667            assert!(self.is_polling_epoch_data);
668            self.consume_until_next_checkpoint_barrier().await?;
669            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
670        }
671        self.upstream_pending_barriers.consume_epoch(epoch);
672
673        {
674            {
675                let prev_epoch = epoch.prev;
676                assert!(self.consumed_epoch < prev_epoch);
677                let elapsed_epoch = prev_epoch - self.consumed_epoch;
678                self.consumed_epoch = prev_epoch;
679                if self.upstream_pending_barriers.has_checkpoint_epoch() {
680                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
681                    while self.can_consume_upstream()
682                        && let Some(result) =
683                            self.consume_until_next_checkpoint_barrier().now_or_never()
684                    {
685                        result?;
686                    }
687                }
688                // sub to ensure that the lag is monotonically decreasing.
689                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
690                // the upstream can at least progress for one epoch.
691                self.max_pending_epoch_lag = min(
692                    self.pending_epoch_lag(),
693                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
694                );
695            }
696        }
697        Ok(self.is_finished())
698    }
699
700    fn is_finished(&self) -> bool {
701        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
702            assert!(
703                self.upstream_pending_barriers
704                    .pending_non_checkpoint_barriers
705                    .is_empty()
706            )
707        }
708        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
709    }
710}
711
712impl<S> UpstreamBuffer<'_, S> {
713    /// Run a future while concurrently polling the upstream so that the upstream
714    /// won't be back-pressured.
715    async fn run_future<T, E: Into<StreamExecutorError>>(
716        &mut self,
717        future: impl Future<Output = Result<T, E>>,
718    ) -> StreamExecutorResult<T> {
719        select! {
720            biased;
721            e = self.concurrently_consume_upstream() => {
722                Err(e)
723            }
724            // this arm won't be starved, because the first arm is always pending unless returning with error
725            result = future => {
726                result.map_err(Into::into)
727            }
728        }
729    }
730
731    fn pending_epoch_lag(&self) -> u64 {
732        self.upstream_pending_barriers
733            .latest_epoch()
734            .map(|epoch| {
735                epoch
736                    .prev
737                    .checked_sub(self.consumed_epoch)
738                    .expect("pending epoch must be later than consumed_epoch")
739            })
740            .unwrap_or(0)
741    }
742}
743
744async fn make_log_stream(
745    upstream_table: &BatchTable<impl StateStore>,
746    prev_epoch: u64,
747    start_pk: Option<OwnedRow>,
748    chunk_size: usize,
749) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
750    let data_types = upstream_table.schema().data_types();
751    let start_pk = start_pk.as_ref();
752    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
753    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
754        upstream_table
755            .batch_iter_vnode_log(
756                prev_epoch,
757                HummockReadEpoch::Committed(prev_epoch),
758                start_pk,
759                vnode,
760            )
761            .map_ok(move |stream| {
762                let stream = stream.map_err(Into::into);
763                (vnode, stream, 0)
764            })
765    }))
766    .await?;
767    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
768    Ok(VnodeStream::new(
769        vnode_streams,
770        upstream_table.pk_in_output_indices().expect("should exist"),
771        builder,
772    ))
773}
774
775async fn make_snapshot_stream(
776    upstream_table: &BatchTable<impl StateStore>,
777    snapshot_epoch: u64,
778    backfill_state: &BackfillState<impl StateStore>,
779    rate_limit: RateLimit,
780    chunk_size: usize,
781) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
782    let data_types = upstream_table.schema().data_types();
783    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
784        move |(vnode, progress)| {
785            let start_pk = match progress {
786                None => Some((None, 0)),
787                Some(VnodeBackfillProgress {
788                    row_count,
789                    progress: EpochBackfillProgress::Consuming { latest_pk },
790                    ..
791                }) => Some((Some(latest_pk), *row_count)),
792                Some(VnodeBackfillProgress {
793                    progress: EpochBackfillProgress::Consumed,
794                    ..
795                }) => None,
796            };
797            start_pk.map(|(start_pk, row_count)| {
798                upstream_table
799                    .batch_iter_vnode(
800                        HummockReadEpoch::Committed(snapshot_epoch),
801                        start_pk,
802                        vnode,
803                        PrefetchOptions::prefetch_for_large_range_scan(),
804                    )
805                    .map_ok(move |stream| {
806                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
807                        (vnode, stream, row_count)
808                    })
809            })
810        },
811    ))
812    .await?;
813    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
814    Ok(VnodeStream::new(
815        vnode_streams,
816        upstream_table.pk_in_output_indices().expect("should exist"),
817        builder,
818    ))
819}
820
821#[expect(clippy::too_many_arguments)]
822#[try_stream(ok = Message, error = StreamExecutorError)]
823async fn make_consume_snapshot_stream<'a, S: StateStore>(
824    upstream_table: &'a BatchTable<S>,
825    snapshot_epoch: u64,
826    chunk_size: usize,
827    rate_limit: RateLimit,
828    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
829    progress: &'a mut CreateMviewProgressReporter,
830    backfill_state: &'a mut BackfillState<S>,
831    first_recv_barrier_epoch: EpochPair,
832    initial_backfill_paused: bool,
833    actor_ctx: &'a ActorContextRef,
834) {
835    let mut barrier_epoch = first_recv_barrier_epoch;
836
837    // start consume upstream snapshot
838    let mut snapshot_stream = make_snapshot_stream(
839        upstream_table,
840        snapshot_epoch,
841        &*backfill_state,
842        rate_limit,
843        chunk_size,
844    )
845    .await?;
846
847    async fn select_barrier_and_snapshot_stream(
848        barrier_rx: &mut UnboundedReceiver<Barrier>,
849        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
850        throttle_snapshot_stream: bool,
851        backfill_paused: bool,
852    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
853        select!(
854            result = receive_next_barrier(barrier_rx) => {
855                Ok(Either::Left(result?))
856            },
857            result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
858                Ok(Either::Right(result?))
859            }
860        )
861    }
862
863    let mut count = 0;
864    let mut epoch_row_count = 0;
865    let mut backfill_paused = initial_backfill_paused;
866    loop {
867        let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
868        match select_barrier_and_snapshot_stream(
869            barrier_rx,
870            &mut snapshot_stream,
871            throttle_snapshot_stream,
872            backfill_paused,
873        )
874        .await?
875        {
876            Either::Left(barrier) => {
877                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
878                barrier_epoch = barrier.epoch;
879                if barrier_epoch.curr >= snapshot_epoch {
880                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
881                }
882                if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
883                    if backfill_paused {
884                        backfill_paused = false;
885                    } else {
886                        tracing::error!(
887                            "received start fragment backfill mutation, but backfill is not paused"
888                        );
889                    }
890                }
891                if let Some(chunk) = snapshot_stream.consume_builder() {
892                    count += chunk.cardinality();
893                    epoch_row_count += chunk.cardinality();
894                    yield Message::Chunk(chunk);
895                }
896                snapshot_stream
897                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
898                        if let Some(pk) = pk_progress {
899                            backfill_state.update_epoch_progress(
900                                vnode,
901                                snapshot_epoch,
902                                row_count,
903                                pk,
904                            );
905                        } else {
906                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
907                        }
908                    })
909                    .await?;
910                let post_commit = backfill_state.commit(barrier.epoch).await?;
911                debug!(?barrier_epoch, count, epoch_row_count, "update progress");
912                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
913                epoch_row_count = 0;
914
915                yield Message::Barrier(barrier);
916                post_commit.post_yield_barrier(None).await?;
917            }
918            Either::Right(Some(chunk)) => {
919                if backfill_paused {
920                    return Err(
921                        anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
922                    );
923                }
924                count += chunk.cardinality();
925                epoch_row_count += chunk.cardinality();
926                yield Message::Chunk(chunk);
927            }
928            Either::Right(None) => {
929                break;
930            }
931        }
932    }
933
934    // finish consuming upstream snapshot, report finish
935    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
936    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
937    barrier_epoch = barrier_to_report_finish.epoch;
938    info!(?barrier_epoch, count, "report finish");
939    snapshot_stream
940        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
941            assert_eq!(pk_progress, None);
942            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
943        })
944        .await?;
945    let post_commit = backfill_state.commit(barrier_epoch).await?;
946    progress.finish(barrier_epoch, count as _);
947    yield Message::Barrier(barrier_to_report_finish);
948    post_commit.post_yield_barrier(None).await?;
949
950    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
951    loop {
952        let barrier = receive_next_barrier(barrier_rx).await?;
953        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
954        barrier_epoch = barrier.epoch;
955        let post_commit = backfill_state.commit(barrier.epoch).await?;
956        yield Message::Barrier(barrier);
957        post_commit.post_yield_barrier(None).await?;
958        if barrier_epoch.curr == snapshot_epoch {
959            break;
960        }
961    }
962    info!(?barrier_epoch, "finish consuming snapshot");
963}