risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::ChangeLogRow;
35use risingwave_storage::table::batch_table::BatchTable;
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
40use crate::executor::backfill::snapshot_backfill::state::{
41    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
42};
43use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
44use crate::executor::backfill::utils::{create_builder, mapping_message};
45use crate::executor::monitor::StreamingMetrics;
46use crate::executor::prelude::{StateTable, StreamExt, try_stream};
47use crate::executor::{
48    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
49    MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, expect_first_barrier,
50};
51use crate::task::CreateMviewProgressReporter;
52
53pub struct SnapshotBackfillExecutor<S: StateStore> {
54    /// Upstream table
55    upstream_table: BatchTable<S>,
56
57    /// Backfill progress table
58    progress_state_table: StateTable<S>,
59
60    /// Upstream with the same schema with the upstream table.
61    upstream: MergeExecutorInput,
62
63    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
64    output_indices: Vec<usize>,
65
66    progress: CreateMviewProgressReporter,
67
68    chunk_size: usize,
69    rate_limit: RateLimit,
70
71    barrier_rx: UnboundedReceiver<Barrier>,
72
73    actor_ctx: ActorContextRef,
74    metrics: Arc<StreamingMetrics>,
75
76    snapshot_epoch: Option<u64>,
77}
78
79impl<S: StateStore> SnapshotBackfillExecutor<S> {
80    #[expect(clippy::too_many_arguments)]
81    pub(crate) fn new(
82        upstream_table: BatchTable<S>,
83        progress_state_table: StateTable<S>,
84        upstream: MergeExecutorInput,
85        output_indices: Vec<usize>,
86        actor_ctx: ActorContextRef,
87        progress: CreateMviewProgressReporter,
88        chunk_size: usize,
89        rate_limit: RateLimit,
90        barrier_rx: UnboundedReceiver<Barrier>,
91        metrics: Arc<StreamingMetrics>,
92        snapshot_epoch: Option<u64>,
93    ) -> Self {
94        assert_eq!(&upstream.info.schema, upstream_table.schema());
95        if upstream_table.pk_in_output_indices().is_none() {
96            panic!(
97                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
98                upstream_table.pk_indices(),
99                upstream_table.output_indices(),
100                upstream_table.schema()
101            )
102        };
103        if !matches!(rate_limit, RateLimit::Disabled) {
104            debug!(
105                ?rate_limit,
106                "create snapshot backfill executor with rate limit"
107            );
108        }
109        Self {
110            upstream_table,
111            progress_state_table,
112            upstream,
113            output_indices,
114            progress,
115            chunk_size,
116            rate_limit,
117            barrier_rx,
118            actor_ctx,
119            metrics,
120            snapshot_epoch,
121        }
122    }
123
124    #[try_stream(ok = Message, error = StreamExecutorError)]
125    async fn execute_inner(mut self) {
126        debug!("snapshot backfill executor start");
127        let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
128        debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
129        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
130        debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
131        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
132            self.snapshot_epoch
133        {
134            if first_upstream_barrier.epoch != first_recv_barrier.epoch {
135                assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
136                Some(snapshot_epoch)
137            } else {
138                None
139            }
140        } else {
141            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
142            if cfg!(debug_assertions) {
143                panic!(
144                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
145                    first_upstream_barrier.epoch, first_recv_barrier.epoch
146                );
147            } else {
148                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
149                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
150                None
151            }
152        };
153        let first_recv_barrier_epoch = first_recv_barrier.epoch;
154        yield Message::Barrier(first_recv_barrier);
155        let mut backfill_state = BackfillState::new(
156            self.progress_state_table,
157            first_recv_barrier_epoch,
158            self.upstream_table.pk_serializer().clone(),
159        )
160        .await?;
161
162        let (mut barrier_epoch, mut need_report_finish) = {
163            if let Some(snapshot_epoch) = should_snapshot_backfill {
164                let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
165                let actor_id_str = format!("{}", self.actor_ctx.id);
166
167                let consume_upstream_row_count = self
168                    .metrics
169                    .snapshot_backfill_consume_row_count
170                    .with_guarded_label_values(&[
171                        table_id_str.as_str(),
172                        actor_id_str.as_str(),
173                        "consume_upstream",
174                    ]);
175
176                let mut upstream_buffer = UpstreamBuffer::new(
177                    &mut self.upstream,
178                    first_upstream_barrier,
179                    consume_upstream_row_count,
180                );
181
182                // Phase 1: consume upstream snapshot
183                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
184                    < snapshot_epoch
185                {
186                    info!(
187                        table_id = %self.upstream_table.table_id(),
188                        snapshot_epoch,
189                        barrier_epoch = ?first_recv_barrier_epoch,
190                        "start consuming snapshot"
191                    );
192                    {
193                        let consuming_snapshot_row_count = self
194                            .metrics
195                            .snapshot_backfill_consume_row_count
196                            .with_guarded_label_values(&[
197                                table_id_str.as_str(),
198                                actor_id_str.as_str(),
199                                "consuming_snapshot",
200                            ]);
201                        let snapshot_stream = make_consume_snapshot_stream(
202                            &self.upstream_table,
203                            snapshot_epoch,
204                            self.chunk_size,
205                            self.rate_limit,
206                            &mut self.barrier_rx,
207                            &mut self.progress,
208                            &mut backfill_state,
209                            first_recv_barrier_epoch,
210                        );
211
212                        pin_mut!(snapshot_stream);
213
214                        while let Some(message) = upstream_buffer
215                            .run_future(snapshot_stream.try_next())
216                            .await?
217                        {
218                            if let Message::Chunk(chunk) = &message {
219                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
220                            }
221                            yield message;
222                        }
223                    }
224
225                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
226                    let recv_barrier_epoch = recv_barrier.epoch;
227                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
228                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
229                    yield Message::Barrier(recv_barrier);
230                    post_commit.post_yield_barrier(None).await?;
231                    (
232                        recv_barrier_epoch,
233                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
234                    )
235                } else {
236                    info!(
237                        table_id = %self.upstream_table.table_id(),
238                        snapshot_epoch,
239                        barrier_epoch = ?first_recv_barrier_epoch,
240                        "skip consuming snapshot"
241                    );
242                    (
243                        first_recv_barrier_epoch,
244                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
245                    )
246                };
247
248                // Phase 2: consume upstream log store
249                if let Some(mut upstream_buffer) = upstream_buffer {
250                    let initial_pending_lag = Duration::from_millis(
251                        Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
252                    );
253                    info!(
254                        ?barrier_epoch,
255                        table_id = self.upstream_table.table_id().table_id,
256                        ?initial_pending_lag,
257                        "start consuming log store"
258                    );
259
260                    let consuming_log_store_row_count = self
261                        .metrics
262                        .snapshot_backfill_consume_row_count
263                        .with_guarded_label_values(&[
264                            table_id_str.as_str(),
265                            actor_id_str.as_str(),
266                            "consuming_log_store",
267                        ]);
268                    loop {
269                        let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
270                        assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
271                        let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
272                        {
273                            // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
274                            // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
275                            let next_prev_epoch =
276                                self.upstream_table.next_epoch(barrier_epoch.prev).await?;
277                            assert_eq!(next_prev_epoch, barrier.epoch.prev);
278                        }
279                        barrier_epoch = barrier.epoch;
280                        debug!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
281                        // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
282                        // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
283                        // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
284                        let mut stream = upstream_buffer
285                            .run_future(make_log_stream(
286                                &self.upstream_table,
287                                barrier_epoch.prev,
288                                None,
289                                self.chunk_size,
290                            ))
291                            .await?;
292                        while let Some(chunk) =
293                            upstream_buffer.run_future(stream.try_next()).await?
294                        {
295                            trace!(
296                                ?barrier_epoch,
297                                size = chunk.cardinality(),
298                                "consume change log yield chunk",
299                            );
300                            consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
301                            yield Message::Chunk(chunk);
302                        }
303
304                        trace!(?barrier_epoch, "after consume change log");
305
306                        if is_finished {
307                            assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
308                            self.progress.finish_consuming_log_store(barrier.epoch);
309                        } else {
310                            self.progress.update_create_mview_log_store_progress(
311                                barrier.epoch,
312                                upstream_buffer.pending_epoch_lag(),
313                            );
314                        }
315
316                        stream
317                            .for_vnode_pk_progress(|vnode, row_count, progress| {
318                                assert_eq!(progress, None);
319                                backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
320                            })
321                            .await?;
322                        let post_commit = backfill_state.commit(barrier.epoch).await?;
323                        let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
324                        yield Message::Barrier(barrier);
325                        post_commit.post_yield_barrier(None).await?;
326                        if update_vnode_bitmap.is_some() {
327                            return Err(anyhow!(
328                                "should not update vnode bitmap during consuming log store"
329                            )
330                            .into());
331                        }
332
333                        if is_finished {
334                            break;
335                        }
336                    }
337                    info!(
338                        ?barrier_epoch,
339                        table_id = self.upstream_table.table_id().table_id,
340                        "finish consuming log store"
341                    );
342
343                    (barrier_epoch, false)
344                } else {
345                    info!(
346                        ?barrier_epoch,
347                        table_id = self.upstream_table.table_id().table_id,
348                        "skip consuming log store and start consuming upstream directly"
349                    );
350
351                    (barrier_epoch, true)
352                }
353            } else {
354                backfill_state
355                    .latest_progress()
356                    .for_each(|(vnode, progress)| {
357                        let progress = progress.expect("should not be empty");
358                        assert_eq!(
359                            progress.epoch, first_upstream_barrier.epoch.prev,
360                            "vnode: {:?}",
361                            vnode
362                        );
363                        assert_eq!(
364                            progress.progress,
365                            EpochBackfillProgress::Consumed,
366                            "vnode: {:?}",
367                            vnode
368                        );
369                    });
370                info!(
371                    table_id = self.upstream_table.table_id().table_id,
372                    "skip backfill"
373                );
374                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
375                (first_upstream_barrier.epoch, true)
376            }
377        };
378        let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
379        let mut epoch_row_count = 0;
380        // Phase 3: consume upstream
381        while let Some(msg) = upstream.try_next().await? {
382            match msg {
383                Message::Barrier(barrier) => {
384                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
385                    self.upstream_table
386                        .vnodes()
387                        .iter_vnodes()
388                        .for_each(|vnode| {
389                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
390                            // executor.
391                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
392                        });
393                    epoch_row_count = 0;
394                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
395                    barrier_epoch = barrier.epoch;
396                    if need_report_finish {
397                        need_report_finish = false;
398                        self.progress.finish_consuming_log_store(barrier_epoch);
399                    }
400                    let post_commit = backfill_state.commit(barrier.epoch).await?;
401                    yield Message::Barrier(barrier);
402                    if let Some(new_vnode_bitmap) =
403                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
404                    {
405                        let _prev_vnode_bitmap = self
406                            .upstream_table
407                            .update_vnode_bitmap(new_vnode_bitmap.clone());
408                        backfill_state
409                            .latest_progress()
410                            .for_each(|(vnode, progress)| {
411                                let progress = progress.expect("should not be empty");
412                                assert_eq!(
413                                    progress.epoch, barrier_epoch.prev,
414                                    "vnode {:?} has unexpected progress epoch",
415                                    vnode
416                                );
417                                assert_eq!(
418                                    progress.progress,
419                                    EpochBackfillProgress::Consumed,
420                                    "vnode {:?} has unexpected progress",
421                                    vnode
422                                );
423                            });
424                    }
425                }
426                msg => {
427                    if let Message::Chunk(chunk) = &msg {
428                        epoch_row_count += chunk.cardinality();
429                    }
430                    yield msg;
431                }
432            }
433        }
434    }
435}
436
437impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
438    fn execute(self: Box<Self>) -> BoxedMessageStream {
439        let output_indices = self.output_indices.clone();
440        self.execute_inner()
441            .filter_map(move |result| {
442                ready({
443                    match result {
444                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
445                        Err(e) => Some(Err(e)),
446                    }
447                })
448            })
449            .boxed()
450    }
451}
452
453struct ConsumingSnapshot;
454struct ConsumingLogStore;
455
456struct PendingBarriers {
457    first_upstream_barrier_epoch: EpochPair,
458
459    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
460    /// Newer barrier at the front
461    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
462
463    /// In the outer `VecDeque`, newer barriers at the front.
464    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
465    /// and others as non-checkpoint barrier
466    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
467}
468
469impl PendingBarriers {
470    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
471        Self {
472            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
473            pending_non_checkpoint_barriers: Default::default(),
474            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
475                first_upstream_barrier,
476            ])]),
477        }
478    }
479
480    fn add(&mut self, barrier: DispatcherBarrier) {
481        let is_checkpoint = barrier.kind.is_checkpoint();
482        self.pending_non_checkpoint_barriers.push_front(barrier);
483        if is_checkpoint {
484            self.checkpoint_barrier_groups
485                .push_front(take(&mut self.pending_non_checkpoint_barriers));
486        }
487    }
488
489    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
490        self.checkpoint_barrier_groups.pop_back()
491    }
492
493    fn consume_epoch(&mut self, epoch: EpochPair) {
494        let barriers = self
495            .checkpoint_barrier_groups
496            .back_mut()
497            .expect("non-empty");
498        let oldest_upstream_barrier = barriers.back().expect("non-empty");
499        assert!(
500            oldest_upstream_barrier.epoch.prev >= epoch.prev,
501            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
502            oldest_upstream_barrier.epoch,
503            epoch
504        );
505        if oldest_upstream_barrier.epoch.prev == epoch.prev {
506            assert_eq!(oldest_upstream_barrier.epoch, epoch);
507            barriers.pop_back();
508            if barriers.is_empty() {
509                self.checkpoint_barrier_groups.pop_back();
510            }
511        }
512    }
513
514    fn latest_epoch(&self) -> Option<EpochPair> {
515        self.pending_non_checkpoint_barriers
516            .front()
517            .or_else(|| {
518                self.checkpoint_barrier_groups
519                    .front()
520                    .and_then(|barriers| barriers.front())
521            })
522            .map(|barrier| barrier.epoch)
523    }
524
525    fn checkpoint_epoch_count(&self) -> usize {
526        self.checkpoint_barrier_groups.len()
527    }
528
529    fn has_checkpoint_epoch(&self) -> bool {
530        !self.checkpoint_barrier_groups.is_empty()
531    }
532}
533
534struct UpstreamBuffer<'a, S> {
535    upstream: &'a mut MergeExecutorInput,
536    max_pending_epoch_lag: u64,
537    consumed_epoch: u64,
538    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
539    upstream_pending_barriers: PendingBarriers,
540    /// Whether we have started polling any upstream data before the next barrier.
541    /// When `true`, we should continue polling until the next barrier, because
542    /// some data in this epoch have been discarded and data in this epoch
543    /// must be read from log store
544    is_polling_epoch_data: bool,
545    consume_upstream_row_count: LabelGuardedIntCounter,
546    _phase: S,
547}
548
549impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
550    fn new(
551        upstream: &'a mut MergeExecutorInput,
552        first_upstream_barrier: DispatcherBarrier,
553        consume_upstream_row_count: LabelGuardedIntCounter,
554    ) -> Self {
555        Self {
556            upstream,
557            is_polling_epoch_data: false,
558            consume_upstream_row_count,
559            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
560            // no limit on the number of pending barrier in the beginning
561            max_pending_epoch_lag: u64::MAX,
562            consumed_epoch: 0,
563            _phase: ConsumingSnapshot {},
564        }
565    }
566
567    fn start_consuming_log_store(
568        mut self,
569        consumed_epoch: u64,
570    ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
571        if self
572            .upstream_pending_barriers
573            .first_upstream_barrier_epoch
574            .prev
575            == consumed_epoch
576        {
577            assert_eq!(
578                1,
579                self.upstream_pending_barriers
580                    .pop()
581                    .expect("non-empty")
582                    .len()
583            );
584        }
585        let max_pending_epoch_lag = self.pending_epoch_lag();
586        let buffer = UpstreamBuffer {
587            upstream: self.upstream,
588            upstream_pending_barriers: self.upstream_pending_barriers,
589            max_pending_epoch_lag,
590            is_polling_epoch_data: self.is_polling_epoch_data,
591            consume_upstream_row_count: self.consume_upstream_row_count,
592            consumed_epoch,
593            _phase: ConsumingLogStore {},
594        };
595        if buffer.is_finished() {
596            None
597        } else {
598            Some(buffer)
599        }
600    }
601}
602
603impl<S> UpstreamBuffer<'_, S> {
604    fn can_consume_upstream(&self) -> bool {
605        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
606    }
607
608    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
609        {
610            loop {
611                if let Err(e) = try {
612                    if !self.can_consume_upstream() {
613                        // pause the future to block consuming upstream
614                        return pending().await;
615                    }
616                    self.consume_until_next_checkpoint_barrier().await?;
617                } {
618                    break e;
619                }
620            }
621        }
622    }
623
624    /// Consume the upstream until seeing the next barrier.
625    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
626        loop {
627            let msg: DispatcherMessage = self
628                .upstream
629                .try_next()
630                .await?
631                .ok_or_else(|| anyhow!("end of upstream"))?;
632            match msg {
633                DispatcherMessage::Chunk(chunk) => {
634                    self.is_polling_epoch_data = true;
635                    self.consume_upstream_row_count
636                        .inc_by(chunk.cardinality() as _);
637                }
638                DispatcherMessage::Barrier(barrier) => {
639                    let is_checkpoint = barrier.kind.is_checkpoint();
640                    self.upstream_pending_barriers.add(barrier);
641                    if is_checkpoint {
642                        self.is_polling_epoch_data = false;
643                        break;
644                    } else {
645                        self.is_polling_epoch_data = true;
646                    }
647                }
648                DispatcherMessage::Watermark(_) => {
649                    self.is_polling_epoch_data = true;
650                }
651            }
652        }
653        Ok(())
654    }
655}
656
657impl UpstreamBuffer<'_, ConsumingLogStore> {
658    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
659        assert!(!self.is_finished());
660        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
661            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
662            // we must have returned true to indicate finish, and should not be called again.
663            assert!(self.is_polling_epoch_data);
664            self.consume_until_next_checkpoint_barrier().await?;
665            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
666        }
667        self.upstream_pending_barriers.consume_epoch(epoch);
668
669        {
670            {
671                let prev_epoch = epoch.prev;
672                assert!(self.consumed_epoch < prev_epoch);
673                let elapsed_epoch = prev_epoch - self.consumed_epoch;
674                self.consumed_epoch = prev_epoch;
675                if self.upstream_pending_barriers.has_checkpoint_epoch() {
676                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
677                    while self.can_consume_upstream()
678                        && let Some(result) =
679                            self.consume_until_next_checkpoint_barrier().now_or_never()
680                    {
681                        result?;
682                    }
683                }
684                // sub to ensure that the lag is monotonically decreasing.
685                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
686                // the upstream can at least progress for one epoch.
687                self.max_pending_epoch_lag = min(
688                    self.pending_epoch_lag(),
689                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
690                );
691            }
692        }
693        Ok(self.is_finished())
694    }
695
696    fn is_finished(&self) -> bool {
697        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
698            assert!(
699                self.upstream_pending_barriers
700                    .pending_non_checkpoint_barriers
701                    .is_empty()
702            )
703        }
704        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
705    }
706}
707
708impl<S> UpstreamBuffer<'_, S> {
709    /// Run a future while concurrently polling the upstream so that the upstream
710    /// won't be back-pressured.
711    async fn run_future<T, E: Into<StreamExecutorError>>(
712        &mut self,
713        future: impl Future<Output = Result<T, E>>,
714    ) -> StreamExecutorResult<T> {
715        select! {
716            biased;
717            e = self.concurrently_consume_upstream() => {
718                Err(e)
719            }
720            // this arm won't be starved, because the first arm is always pending unless returning with error
721            result = future => {
722                result.map_err(Into::into)
723            }
724        }
725    }
726
727    fn pending_epoch_lag(&self) -> u64 {
728        self.upstream_pending_barriers
729            .latest_epoch()
730            .map(|epoch| {
731                epoch
732                    .prev
733                    .checked_sub(self.consumed_epoch)
734                    .expect("pending epoch must be later than consumed_epoch")
735            })
736            .unwrap_or(0)
737    }
738}
739
740async fn make_log_stream(
741    upstream_table: &BatchTable<impl StateStore>,
742    prev_epoch: u64,
743    start_pk: Option<OwnedRow>,
744    chunk_size: usize,
745) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
746    let data_types = upstream_table.schema().data_types();
747    let start_pk = start_pk.as_ref();
748    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
749    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
750        upstream_table
751            .batch_iter_vnode_log(
752                prev_epoch,
753                HummockReadEpoch::Committed(prev_epoch),
754                start_pk,
755                vnode,
756            )
757            .map_ok(move |stream| {
758                let stream = stream.map_err(Into::into);
759                (vnode, stream, 0)
760            })
761    }))
762    .await?;
763    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
764    Ok(VnodeStream::new(
765        vnode_streams,
766        upstream_table.pk_in_output_indices().expect("should exist"),
767        builder,
768    ))
769}
770
771async fn make_snapshot_stream(
772    upstream_table: &BatchTable<impl StateStore>,
773    snapshot_epoch: u64,
774    backfill_state: &BackfillState<impl StateStore>,
775    rate_limit: RateLimit,
776    chunk_size: usize,
777) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
778    let data_types = upstream_table.schema().data_types();
779    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
780        move |(vnode, progress)| {
781            let start_pk = match progress {
782                None => Some((None, 0)),
783                Some(VnodeBackfillProgress {
784                    row_count,
785                    progress: EpochBackfillProgress::Consuming { latest_pk },
786                    ..
787                }) => Some((Some(latest_pk), *row_count)),
788                Some(VnodeBackfillProgress {
789                    progress: EpochBackfillProgress::Consumed,
790                    ..
791                }) => None,
792            };
793            start_pk.map(|(start_pk, row_count)| {
794                upstream_table
795                    .batch_iter_vnode(
796                        HummockReadEpoch::Committed(snapshot_epoch),
797                        start_pk,
798                        vnode,
799                        PrefetchOptions::prefetch_for_large_range_scan(),
800                    )
801                    .map_ok(move |stream| {
802                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
803                        (vnode, stream, row_count)
804                    })
805            })
806        },
807    ))
808    .await?;
809    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
810    Ok(VnodeStream::new(
811        vnode_streams,
812        upstream_table.pk_in_output_indices().expect("should exist"),
813        builder,
814    ))
815}
816
817#[expect(clippy::too_many_arguments)]
818#[try_stream(ok = Message, error = StreamExecutorError)]
819async fn make_consume_snapshot_stream<'a, S: StateStore>(
820    upstream_table: &'a BatchTable<S>,
821    snapshot_epoch: u64,
822    chunk_size: usize,
823    rate_limit: RateLimit,
824    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
825    progress: &'a mut CreateMviewProgressReporter,
826    backfill_state: &'a mut BackfillState<S>,
827    first_recv_barrier_epoch: EpochPair,
828) {
829    let mut barrier_epoch = first_recv_barrier_epoch;
830
831    // start consume upstream snapshot
832    let mut snapshot_stream = make_snapshot_stream(
833        upstream_table,
834        snapshot_epoch,
835        &*backfill_state,
836        rate_limit,
837        chunk_size,
838    )
839    .await?;
840
841    async fn select_barrier_and_snapshot_stream(
842        barrier_rx: &mut UnboundedReceiver<Barrier>,
843        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
844        throttle_snapshot_stream: bool,
845    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
846        select!(
847            result = receive_next_barrier(barrier_rx) => {
848                Ok(Either::Left(result?))
849            },
850            result = snapshot_stream.try_next(), if !throttle_snapshot_stream => {
851                Ok(Either::Right(result?))
852            }
853        )
854    }
855
856    let mut count = 0;
857    let mut epoch_row_count = 0;
858    loop {
859        let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
860        match select_barrier_and_snapshot_stream(
861            barrier_rx,
862            &mut snapshot_stream,
863            throttle_snapshot_stream,
864        )
865        .await?
866        {
867            Either::Left(barrier) => {
868                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
869                barrier_epoch = barrier.epoch;
870                if barrier_epoch.curr >= snapshot_epoch {
871                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
872                }
873                if let Some(chunk) = snapshot_stream.consume_builder() {
874                    count += chunk.cardinality();
875                    epoch_row_count += chunk.cardinality();
876                    yield Message::Chunk(chunk);
877                }
878                snapshot_stream
879                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
880                        if let Some(pk) = pk_progress {
881                            backfill_state.update_epoch_progress(
882                                vnode,
883                                snapshot_epoch,
884                                row_count,
885                                pk,
886                            );
887                        } else {
888                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
889                        }
890                    })
891                    .await?;
892                let post_commit = backfill_state.commit(barrier.epoch).await?;
893                debug!(?barrier_epoch, count, epoch_row_count, "update progress");
894                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
895                epoch_row_count = 0;
896
897                yield Message::Barrier(barrier);
898                post_commit.post_yield_barrier(None).await?;
899            }
900            Either::Right(Some(chunk)) => {
901                count += chunk.cardinality();
902                epoch_row_count += chunk.cardinality();
903                yield Message::Chunk(chunk);
904            }
905            Either::Right(None) => {
906                break;
907            }
908        }
909    }
910
911    // finish consuming upstream snapshot, report finish
912    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
913    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
914    barrier_epoch = barrier_to_report_finish.epoch;
915    info!(?barrier_epoch, count, "report finish");
916    snapshot_stream
917        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
918            assert_eq!(pk_progress, None);
919            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
920        })
921        .await?;
922    let post_commit = backfill_state.commit(barrier_epoch).await?;
923    progress.finish(barrier_epoch, count as _);
924    yield Message::Barrier(barrier_to_report_finish);
925    post_commit.post_yield_barrier(None).await?;
926
927    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
928    loop {
929        let barrier = receive_next_barrier(barrier_rx).await?;
930        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
931        barrier_epoch = barrier.epoch;
932        let post_commit = backfill_state.commit(barrier.epoch).await?;
933        yield Message::Barrier(barrier);
934        post_commit.post_yield_barrier(None).await?;
935        if barrier_epoch.curr == snapshot_epoch {
936            break;
937        }
938    }
939    info!(?barrier_epoch, "finish consuming snapshot");
940}