risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::ChangeLogRow;
35use risingwave_storage::table::batch_table::BatchTable;
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
40use crate::executor::backfill::snapshot_backfill::state::{
41    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
42};
43use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
44use crate::executor::backfill::utils::{create_builder, mapping_message};
45use crate::executor::monitor::StreamingMetrics;
46use crate::executor::prelude::{StateTable, StreamExt, try_stream};
47use crate::executor::{
48    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
49    MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, expect_first_barrier,
50};
51use crate::task::CreateMviewProgressReporter;
52
53pub struct SnapshotBackfillExecutor<S: StateStore> {
54    /// Upstream table
55    upstream_table: BatchTable<S>,
56
57    /// Backfill progress table
58    progress_state_table: StateTable<S>,
59
60    /// Upstream with the same schema with the upstream table.
61    upstream: MergeExecutorInput,
62
63    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
64    output_indices: Vec<usize>,
65
66    progress: CreateMviewProgressReporter,
67
68    chunk_size: usize,
69    rate_limit: RateLimit,
70
71    barrier_rx: UnboundedReceiver<Barrier>,
72
73    actor_ctx: ActorContextRef,
74    metrics: Arc<StreamingMetrics>,
75
76    snapshot_epoch: Option<u64>,
77}
78
79impl<S: StateStore> SnapshotBackfillExecutor<S> {
80    #[expect(clippy::too_many_arguments)]
81    pub(crate) fn new(
82        upstream_table: BatchTable<S>,
83        progress_state_table: StateTable<S>,
84        upstream: MergeExecutorInput,
85        output_indices: Vec<usize>,
86        actor_ctx: ActorContextRef,
87        progress: CreateMviewProgressReporter,
88        chunk_size: usize,
89        rate_limit: RateLimit,
90        barrier_rx: UnboundedReceiver<Barrier>,
91        metrics: Arc<StreamingMetrics>,
92        snapshot_epoch: Option<u64>,
93    ) -> Self {
94        assert_eq!(&upstream.info.schema, upstream_table.schema());
95        if upstream_table.pk_in_output_indices().is_none() {
96            panic!(
97                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
98                upstream_table.pk_indices(),
99                upstream_table.output_indices(),
100                upstream_table.schema()
101            )
102        };
103        if !matches!(rate_limit, RateLimit::Disabled) {
104            debug!(
105                ?rate_limit,
106                "create snapshot backfill executor with rate limit"
107            );
108        }
109        Self {
110            upstream_table,
111            progress_state_table,
112            upstream,
113            output_indices,
114            progress,
115            chunk_size,
116            rate_limit,
117            barrier_rx,
118            actor_ctx,
119            metrics,
120            snapshot_epoch,
121        }
122    }
123
124    #[try_stream(ok = Message, error = StreamExecutorError)]
125    async fn execute_inner(mut self) {
126        debug!("snapshot backfill executor start");
127        let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
128        debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
129        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
130        debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
131        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
132            self.snapshot_epoch
133        {
134            if first_upstream_barrier.epoch != first_recv_barrier.epoch {
135                assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
136                Some(snapshot_epoch)
137            } else {
138                None
139            }
140        } else {
141            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
142            if cfg!(debug_assertions) {
143                panic!(
144                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
145                    first_upstream_barrier.epoch, first_recv_barrier.epoch
146                );
147            } else {
148                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
149                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
150                None
151            }
152        };
153        let first_recv_barrier_epoch = first_recv_barrier.epoch;
154        yield Message::Barrier(first_recv_barrier);
155        let mut backfill_state = BackfillState::new(
156            self.progress_state_table,
157            first_recv_barrier_epoch,
158            self.upstream_table.pk_serializer().clone(),
159        )
160        .await?;
161
162        let (mut barrier_epoch, mut need_report_finish) = {
163            if let Some(snapshot_epoch) = should_snapshot_backfill {
164                let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
165                let actor_id_str = format!("{}", self.actor_ctx.id);
166
167                let consume_upstream_row_count = self
168                    .metrics
169                    .snapshot_backfill_consume_row_count
170                    .with_guarded_label_values(&[&table_id_str, &actor_id_str, "consume_upstream"]);
171
172                let mut upstream_buffer = UpstreamBuffer::new(
173                    &mut self.upstream,
174                    first_upstream_barrier,
175                    consume_upstream_row_count,
176                );
177
178                // Phase 1: consume upstream snapshot
179                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
180                    < snapshot_epoch
181                {
182                    info!(
183                        table_id = %self.upstream_table.table_id(),
184                        snapshot_epoch,
185                        barrier_epoch = ?first_recv_barrier_epoch,
186                        "start consuming snapshot"
187                    );
188                    {
189                        let consuming_snapshot_row_count = self
190                            .metrics
191                            .snapshot_backfill_consume_row_count
192                            .with_guarded_label_values(&[
193                                &table_id_str,
194                                &actor_id_str,
195                                "consuming_snapshot",
196                            ]);
197                        let snapshot_stream = make_consume_snapshot_stream(
198                            &self.upstream_table,
199                            snapshot_epoch,
200                            self.chunk_size,
201                            self.rate_limit,
202                            &mut self.barrier_rx,
203                            &mut self.progress,
204                            &mut backfill_state,
205                            first_recv_barrier_epoch,
206                        );
207
208                        pin_mut!(snapshot_stream);
209
210                        while let Some(message) = upstream_buffer
211                            .run_future(snapshot_stream.try_next())
212                            .await?
213                        {
214                            if let Message::Chunk(chunk) = &message {
215                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
216                            }
217                            yield message;
218                        }
219                    }
220
221                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
222                    let recv_barrier_epoch = recv_barrier.epoch;
223                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
224                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
225                    yield Message::Barrier(recv_barrier);
226                    post_commit.post_yield_barrier(None).await?;
227                    (
228                        recv_barrier_epoch,
229                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
230                    )
231                } else {
232                    info!(
233                        table_id = %self.upstream_table.table_id(),
234                        snapshot_epoch,
235                        barrier_epoch = ?first_recv_barrier_epoch,
236                        "skip consuming snapshot"
237                    );
238                    (
239                        first_recv_barrier_epoch,
240                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
241                    )
242                };
243
244                // Phase 2: consume upstream log store
245                if let Some(mut upstream_buffer) = upstream_buffer {
246                    let initial_pending_lag = Duration::from_millis(
247                        Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
248                    );
249                    info!(
250                        ?barrier_epoch,
251                        table_id = self.upstream_table.table_id().table_id,
252                        ?initial_pending_lag,
253                        "start consuming log store"
254                    );
255
256                    let consuming_log_store_row_count = self
257                        .metrics
258                        .snapshot_backfill_consume_row_count
259                        .with_guarded_label_values(&[
260                            &table_id_str,
261                            &actor_id_str,
262                            "consuming_log_store",
263                        ]);
264                    loop {
265                        let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
266                        assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
267                        let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
268                        {
269                            // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
270                            // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
271                            let next_prev_epoch =
272                                self.upstream_table.next_epoch(barrier_epoch.prev).await?;
273                            assert_eq!(next_prev_epoch, barrier.epoch.prev);
274                        }
275                        barrier_epoch = barrier.epoch;
276                        debug!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
277                        // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
278                        // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
279                        // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
280                        let mut stream = upstream_buffer
281                            .run_future(make_log_stream(
282                                &self.upstream_table,
283                                barrier_epoch.prev,
284                                None,
285                                self.chunk_size,
286                            ))
287                            .await?;
288                        while let Some(chunk) =
289                            upstream_buffer.run_future(stream.try_next()).await?
290                        {
291                            trace!(
292                                ?barrier_epoch,
293                                size = chunk.cardinality(),
294                                "consume change log yield chunk",
295                            );
296                            consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
297                            yield Message::Chunk(chunk);
298                        }
299
300                        trace!(?barrier_epoch, "after consume change log");
301
302                        if is_finished {
303                            assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
304                            self.progress.finish_consuming_log_store(barrier.epoch);
305                        } else {
306                            self.progress.update_create_mview_log_store_progress(
307                                barrier.epoch,
308                                upstream_buffer.pending_epoch_lag(),
309                            );
310                        }
311
312                        stream
313                            .for_vnode_pk_progress(|vnode, row_count, progress| {
314                                assert_eq!(progress, None);
315                                backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
316                            })
317                            .await?;
318                        let post_commit = backfill_state.commit(barrier.epoch).await?;
319                        let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
320                        yield Message::Barrier(barrier);
321                        post_commit.post_yield_barrier(None).await?;
322                        if update_vnode_bitmap.is_some() {
323                            return Err(anyhow!(
324                                "should not update vnode bitmap during consuming log store"
325                            )
326                            .into());
327                        }
328
329                        if is_finished {
330                            break;
331                        }
332                    }
333                    info!(
334                        ?barrier_epoch,
335                        table_id = self.upstream_table.table_id().table_id,
336                        "finish consuming log store"
337                    );
338
339                    (barrier_epoch, false)
340                } else {
341                    info!(
342                        ?barrier_epoch,
343                        table_id = self.upstream_table.table_id().table_id,
344                        "skip consuming log store and start consuming upstream directly"
345                    );
346
347                    (barrier_epoch, true)
348                }
349            } else {
350                backfill_state
351                    .latest_progress()
352                    .for_each(|(vnode, progress)| {
353                        let progress = progress.expect("should not be empty");
354                        assert_eq!(
355                            progress.epoch, first_upstream_barrier.epoch.prev,
356                            "vnode: {:?}",
357                            vnode
358                        );
359                        assert_eq!(
360                            progress.progress,
361                            EpochBackfillProgress::Consumed,
362                            "vnode: {:?}",
363                            vnode
364                        );
365                    });
366                info!(
367                    table_id = self.upstream_table.table_id().table_id,
368                    "skip backfill"
369                );
370                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
371                (first_upstream_barrier.epoch, true)
372            }
373        };
374        let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
375        let mut epoch_row_count = 0;
376        // Phase 3: consume upstream
377        while let Some(msg) = upstream.try_next().await? {
378            match msg {
379                Message::Barrier(barrier) => {
380                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
381                    self.upstream_table
382                        .vnodes()
383                        .iter_vnodes()
384                        .for_each(|vnode| {
385                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
386                            // executor.
387                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
388                        });
389                    epoch_row_count = 0;
390                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
391                    barrier_epoch = barrier.epoch;
392                    if need_report_finish {
393                        need_report_finish = false;
394                        self.progress.finish_consuming_log_store(barrier_epoch);
395                    }
396                    let post_commit = backfill_state.commit(barrier.epoch).await?;
397                    yield Message::Barrier(barrier);
398                    if let Some(new_vnode_bitmap) =
399                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
400                    {
401                        let _prev_vnode_bitmap = self
402                            .upstream_table
403                            .update_vnode_bitmap(new_vnode_bitmap.clone());
404                        backfill_state
405                            .latest_progress()
406                            .for_each(|(vnode, progress)| {
407                                let progress = progress.expect("should not be empty");
408                                assert_eq!(
409                                    progress.epoch, barrier_epoch.prev,
410                                    "vnode {:?} has unexpected progress epoch",
411                                    vnode
412                                );
413                                assert_eq!(
414                                    progress.progress,
415                                    EpochBackfillProgress::Consumed,
416                                    "vnode {:?} has unexpected progress",
417                                    vnode
418                                );
419                            });
420                    }
421                }
422                msg => {
423                    if let Message::Chunk(chunk) = &msg {
424                        epoch_row_count += chunk.cardinality();
425                    }
426                    yield msg;
427                }
428            }
429        }
430    }
431}
432
433impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
434    fn execute(self: Box<Self>) -> BoxedMessageStream {
435        let output_indices = self.output_indices.clone();
436        self.execute_inner()
437            .filter_map(move |result| {
438                ready({
439                    match result {
440                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
441                        Err(e) => Some(Err(e)),
442                    }
443                })
444            })
445            .boxed()
446    }
447}
448
449struct ConsumingSnapshot;
450struct ConsumingLogStore;
451
452struct PendingBarriers {
453    first_upstream_barrier_epoch: EpochPair,
454
455    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
456    /// Newer barrier at the front
457    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
458
459    /// In the outer `VecDeque`, newer barriers at the front.
460    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
461    /// and others as non-checkpoint barrier
462    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
463}
464
465impl PendingBarriers {
466    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
467        Self {
468            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
469            pending_non_checkpoint_barriers: Default::default(),
470            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
471                first_upstream_barrier,
472            ])]),
473        }
474    }
475
476    fn add(&mut self, barrier: DispatcherBarrier) {
477        let is_checkpoint = barrier.kind.is_checkpoint();
478        self.pending_non_checkpoint_barriers.push_front(barrier);
479        if is_checkpoint {
480            self.checkpoint_barrier_groups
481                .push_front(take(&mut self.pending_non_checkpoint_barriers));
482        }
483    }
484
485    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
486        self.checkpoint_barrier_groups.pop_back()
487    }
488
489    fn consume_epoch(&mut self, epoch: EpochPair) {
490        let barriers = self
491            .checkpoint_barrier_groups
492            .back_mut()
493            .expect("non-empty");
494        let oldest_upstream_barrier = barriers.back().expect("non-empty");
495        assert!(
496            oldest_upstream_barrier.epoch.prev >= epoch.prev,
497            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
498            oldest_upstream_barrier.epoch,
499            epoch
500        );
501        if oldest_upstream_barrier.epoch.prev == epoch.prev {
502            assert_eq!(oldest_upstream_barrier.epoch, epoch);
503            barriers.pop_back();
504            if barriers.is_empty() {
505                self.checkpoint_barrier_groups.pop_back();
506            }
507        }
508    }
509
510    fn latest_epoch(&self) -> Option<EpochPair> {
511        self.pending_non_checkpoint_barriers
512            .front()
513            .or_else(|| {
514                self.checkpoint_barrier_groups
515                    .front()
516                    .and_then(|barriers| barriers.front())
517            })
518            .map(|barrier| barrier.epoch)
519    }
520
521    fn checkpoint_epoch_count(&self) -> usize {
522        self.checkpoint_barrier_groups.len()
523    }
524
525    fn has_checkpoint_epoch(&self) -> bool {
526        !self.checkpoint_barrier_groups.is_empty()
527    }
528}
529
530struct UpstreamBuffer<'a, S> {
531    upstream: &'a mut MergeExecutorInput,
532    max_pending_epoch_lag: u64,
533    consumed_epoch: u64,
534    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
535    upstream_pending_barriers: PendingBarriers,
536    /// Whether we have started polling any upstream data before the next barrier.
537    /// When `true`, we should continue polling until the next barrier, because
538    /// some data in this epoch have been discarded and data in this epoch
539    /// must be read from log store
540    is_polling_epoch_data: bool,
541    consume_upstream_row_count: LabelGuardedIntCounter<3>,
542    _phase: S,
543}
544
545impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
546    fn new(
547        upstream: &'a mut MergeExecutorInput,
548        first_upstream_barrier: DispatcherBarrier,
549        consume_upstream_row_count: LabelGuardedIntCounter<3>,
550    ) -> Self {
551        Self {
552            upstream,
553            is_polling_epoch_data: false,
554            consume_upstream_row_count,
555            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
556            // no limit on the number of pending barrier in the beginning
557            max_pending_epoch_lag: u64::MAX,
558            consumed_epoch: 0,
559            _phase: ConsumingSnapshot {},
560        }
561    }
562
563    fn start_consuming_log_store(
564        mut self,
565        consumed_epoch: u64,
566    ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
567        if self
568            .upstream_pending_barriers
569            .first_upstream_barrier_epoch
570            .prev
571            == consumed_epoch
572        {
573            assert_eq!(
574                1,
575                self.upstream_pending_barriers
576                    .pop()
577                    .expect("non-empty")
578                    .len()
579            );
580        }
581        let max_pending_epoch_lag = self.pending_epoch_lag();
582        let buffer = UpstreamBuffer {
583            upstream: self.upstream,
584            upstream_pending_barriers: self.upstream_pending_barriers,
585            max_pending_epoch_lag,
586            is_polling_epoch_data: self.is_polling_epoch_data,
587            consume_upstream_row_count: self.consume_upstream_row_count,
588            consumed_epoch,
589            _phase: ConsumingLogStore {},
590        };
591        if buffer.is_finished() {
592            None
593        } else {
594            Some(buffer)
595        }
596    }
597}
598
599impl<S> UpstreamBuffer<'_, S> {
600    fn can_consume_upstream(&self) -> bool {
601        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
602    }
603
604    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
605        {
606            loop {
607                if let Err(e) = try {
608                    if !self.can_consume_upstream() {
609                        // pause the future to block consuming upstream
610                        return pending().await;
611                    }
612                    self.consume_until_next_checkpoint_barrier().await?;
613                } {
614                    break e;
615                }
616            }
617        }
618    }
619
620    /// Consume the upstream until seeing the next barrier.
621    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
622        loop {
623            let msg: DispatcherMessage = self
624                .upstream
625                .try_next()
626                .await?
627                .ok_or_else(|| anyhow!("end of upstream"))?;
628            match msg {
629                DispatcherMessage::Chunk(chunk) => {
630                    self.is_polling_epoch_data = true;
631                    self.consume_upstream_row_count
632                        .inc_by(chunk.cardinality() as _);
633                }
634                DispatcherMessage::Barrier(barrier) => {
635                    let is_checkpoint = barrier.kind.is_checkpoint();
636                    self.upstream_pending_barriers.add(barrier);
637                    if is_checkpoint {
638                        self.is_polling_epoch_data = false;
639                        break;
640                    } else {
641                        self.is_polling_epoch_data = true;
642                    }
643                }
644                DispatcherMessage::Watermark(_) => {
645                    self.is_polling_epoch_data = true;
646                }
647            }
648        }
649        Ok(())
650    }
651}
652
653impl UpstreamBuffer<'_, ConsumingLogStore> {
654    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
655        assert!(!self.is_finished());
656        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
657            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
658            // we must have returned true to indicate finish, and should not be called again.
659            assert!(self.is_polling_epoch_data);
660            self.consume_until_next_checkpoint_barrier().await?;
661            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
662        }
663        self.upstream_pending_barriers.consume_epoch(epoch);
664
665        {
666            {
667                let prev_epoch = epoch.prev;
668                assert!(self.consumed_epoch < prev_epoch);
669                let elapsed_epoch = prev_epoch - self.consumed_epoch;
670                self.consumed_epoch = prev_epoch;
671                if self.upstream_pending_barriers.has_checkpoint_epoch() {
672                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
673                    while self.can_consume_upstream()
674                        && let Some(result) =
675                            self.consume_until_next_checkpoint_barrier().now_or_never()
676                    {
677                        result?;
678                    }
679                }
680                // sub to ensure that the lag is monotonically decreasing.
681                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
682                // the upstream can at least progress for one epoch.
683                self.max_pending_epoch_lag = min(
684                    self.pending_epoch_lag(),
685                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
686                );
687            }
688        }
689        Ok(self.is_finished())
690    }
691
692    fn is_finished(&self) -> bool {
693        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
694            assert!(
695                self.upstream_pending_barriers
696                    .pending_non_checkpoint_barriers
697                    .is_empty()
698            )
699        }
700        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
701    }
702}
703
704impl<S> UpstreamBuffer<'_, S> {
705    /// Run a future while concurrently polling the upstream so that the upstream
706    /// won't be back-pressured.
707    async fn run_future<T, E: Into<StreamExecutorError>>(
708        &mut self,
709        future: impl Future<Output = Result<T, E>>,
710    ) -> StreamExecutorResult<T> {
711        select! {
712            biased;
713            e = self.concurrently_consume_upstream() => {
714                Err(e)
715            }
716            // this arm won't be starved, because the first arm is always pending unless returning with error
717            result = future => {
718                result.map_err(Into::into)
719            }
720        }
721    }
722
723    fn pending_epoch_lag(&self) -> u64 {
724        self.upstream_pending_barriers
725            .latest_epoch()
726            .map(|epoch| {
727                epoch
728                    .prev
729                    .checked_sub(self.consumed_epoch)
730                    .expect("pending epoch must be later than consumed_epoch")
731            })
732            .unwrap_or(0)
733    }
734}
735
736async fn make_log_stream(
737    upstream_table: &BatchTable<impl StateStore>,
738    prev_epoch: u64,
739    start_pk: Option<OwnedRow>,
740    chunk_size: usize,
741) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
742    let data_types = upstream_table.schema().data_types();
743    let start_pk = start_pk.as_ref();
744    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
745    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
746        upstream_table
747            .batch_iter_vnode_log(
748                prev_epoch,
749                HummockReadEpoch::Committed(prev_epoch),
750                start_pk,
751                vnode,
752            )
753            .map_ok(move |stream| {
754                let stream = stream.map_err(Into::into);
755                (vnode, stream, 0)
756            })
757    }))
758    .await?;
759    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
760    Ok(VnodeStream::new(
761        vnode_streams,
762        upstream_table.pk_in_output_indices().expect("should exist"),
763        builder,
764    ))
765}
766
767async fn make_snapshot_stream(
768    upstream_table: &BatchTable<impl StateStore>,
769    snapshot_epoch: u64,
770    backfill_state: &BackfillState<impl StateStore>,
771    rate_limit: RateLimit,
772    chunk_size: usize,
773) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
774    let data_types = upstream_table.schema().data_types();
775    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
776        move |(vnode, progress)| {
777            let start_pk = match progress {
778                None => Some((None, 0)),
779                Some(VnodeBackfillProgress {
780                    row_count,
781                    progress: EpochBackfillProgress::Consuming { latest_pk },
782                    ..
783                }) => Some((Some(latest_pk), *row_count)),
784                Some(VnodeBackfillProgress {
785                    progress: EpochBackfillProgress::Consumed,
786                    ..
787                }) => None,
788            };
789            start_pk.map(|(start_pk, row_count)| {
790                upstream_table
791                    .batch_iter_vnode(
792                        HummockReadEpoch::Committed(snapshot_epoch),
793                        start_pk,
794                        vnode,
795                        PrefetchOptions::prefetch_for_large_range_scan(),
796                    )
797                    .map_ok(move |stream| {
798                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
799                        (vnode, stream, row_count)
800                    })
801            })
802        },
803    ))
804    .await?;
805    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
806    Ok(VnodeStream::new(
807        vnode_streams,
808        upstream_table.pk_in_output_indices().expect("should exist"),
809        builder,
810    ))
811}
812
813#[expect(clippy::too_many_arguments)]
814#[try_stream(ok = Message, error = StreamExecutorError)]
815async fn make_consume_snapshot_stream<'a, S: StateStore>(
816    upstream_table: &'a BatchTable<S>,
817    snapshot_epoch: u64,
818    chunk_size: usize,
819    rate_limit: RateLimit,
820    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
821    progress: &'a mut CreateMviewProgressReporter,
822    backfill_state: &'a mut BackfillState<S>,
823    first_recv_barrier_epoch: EpochPair,
824) {
825    let mut barrier_epoch = first_recv_barrier_epoch;
826
827    // start consume upstream snapshot
828    let mut snapshot_stream = make_snapshot_stream(
829        upstream_table,
830        snapshot_epoch,
831        &*backfill_state,
832        rate_limit,
833        chunk_size,
834    )
835    .await?;
836
837    async fn select_barrier_and_snapshot_stream(
838        barrier_rx: &mut UnboundedReceiver<Barrier>,
839        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
840        throttle_snapshot_stream: bool,
841    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
842        select!(
843            result = receive_next_barrier(barrier_rx) => {
844                Ok(Either::Left(result?))
845            },
846            result = snapshot_stream.try_next(), if !throttle_snapshot_stream => {
847                Ok(Either::Right(result?))
848            }
849        )
850    }
851
852    let mut count = 0;
853    let mut epoch_row_count = 0;
854    loop {
855        let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
856        match select_barrier_and_snapshot_stream(
857            barrier_rx,
858            &mut snapshot_stream,
859            throttle_snapshot_stream,
860        )
861        .await?
862        {
863            Either::Left(barrier) => {
864                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
865                barrier_epoch = barrier.epoch;
866                if barrier_epoch.curr >= snapshot_epoch {
867                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
868                }
869                if let Some(chunk) = snapshot_stream.consume_builder() {
870                    count += chunk.cardinality();
871                    epoch_row_count += chunk.cardinality();
872                    yield Message::Chunk(chunk);
873                }
874                snapshot_stream
875                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
876                        if let Some(pk) = pk_progress {
877                            backfill_state.update_epoch_progress(
878                                vnode,
879                                snapshot_epoch,
880                                row_count,
881                                pk,
882                            );
883                        } else {
884                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
885                        }
886                    })
887                    .await?;
888                let post_commit = backfill_state.commit(barrier.epoch).await?;
889                debug!(?barrier_epoch, count, epoch_row_count, "update progress");
890                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
891                epoch_row_count = 0;
892
893                yield Message::Barrier(barrier);
894                post_commit.post_yield_barrier(None).await?;
895            }
896            Either::Right(Some(chunk)) => {
897                count += chunk.cardinality();
898                epoch_row_count += chunk.cardinality();
899                yield Message::Chunk(chunk);
900            }
901            Either::Right(None) => {
902                break;
903            }
904        }
905    }
906
907    // finish consuming upstream snapshot, report finish
908    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
909    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
910    barrier_epoch = barrier_to_report_finish.epoch;
911    info!(?barrier_epoch, count, "report finish");
912    snapshot_stream
913        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
914            assert_eq!(pk_progress, None);
915            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
916        })
917        .await?;
918    let post_commit = backfill_state.commit(barrier_epoch).await?;
919    progress.finish(barrier_epoch, count as _);
920    yield Message::Barrier(barrier_to_report_finish);
921    post_commit.post_yield_barrier(None).await?;
922
923    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
924    loop {
925        let barrier = receive_next_barrier(barrier_rx).await?;
926        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
927        barrier_epoch = barrier.epoch;
928        let post_commit = backfill_state.commit(barrier.epoch).await?;
929        yield Message::Barrier(barrier);
930        post_commit.post_yield_barrier(None).await?;
931        if barrier_epoch.curr == snapshot_epoch {
932            break;
933        }
934    }
935    info!(?barrier_epoch, "finish consuming snapshot");
936}