risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::common::PbThrottleType;
33use risingwave_storage::StateStore;
34use risingwave_storage::store::PrefetchOptions;
35use risingwave_storage::table::ChangeLogRow;
36use risingwave_storage::table::batch_table::BatchTable;
37use tokio::select;
38use tokio::sync::mpsc::UnboundedReceiver;
39
40use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
41use crate::executor::backfill::snapshot_backfill::state::{
42    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
43};
44use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
45use crate::executor::backfill::utils::{create_builder, mapping_message};
46use crate::executor::monitor::StreamingMetrics;
47use crate::executor::prelude::{StateTable, StreamExt, try_stream};
48use crate::executor::{
49    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
50    MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
51    expect_first_barrier,
52};
53use crate::task::CreateMviewProgressReporter;
54
55pub struct SnapshotBackfillExecutor<S: StateStore> {
56    /// Upstream table
57    upstream_table: BatchTable<S>,
58
59    /// Backfill progress table
60    progress_state_table: StateTable<S>,
61
62    /// Upstream with the same schema with the upstream table.
63    upstream: MergeExecutorInput,
64
65    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
66    output_indices: Vec<usize>,
67
68    progress: CreateMviewProgressReporter,
69
70    chunk_size: usize,
71    rate_limit: RateLimit,
72
73    barrier_rx: UnboundedReceiver<Barrier>,
74
75    actor_ctx: ActorContextRef,
76    metrics: Arc<StreamingMetrics>,
77
78    snapshot_epoch: Option<u64>,
79}
80
81impl<S: StateStore> SnapshotBackfillExecutor<S> {
82    #[expect(clippy::too_many_arguments)]
83    pub(crate) fn new(
84        upstream_table: BatchTable<S>,
85        progress_state_table: StateTable<S>,
86        upstream: MergeExecutorInput,
87        output_indices: Vec<usize>,
88        actor_ctx: ActorContextRef,
89        progress: CreateMviewProgressReporter,
90        chunk_size: usize,
91        rate_limit: RateLimit,
92        barrier_rx: UnboundedReceiver<Barrier>,
93        metrics: Arc<StreamingMetrics>,
94        snapshot_epoch: Option<u64>,
95    ) -> Self {
96        assert_eq!(&upstream.info.schema, upstream_table.schema());
97        if upstream_table.pk_in_output_indices().is_none() {
98            panic!(
99                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
100                upstream_table.pk_indices(),
101                upstream_table.output_indices(),
102                upstream_table.schema()
103            )
104        };
105        if !matches!(rate_limit, RateLimit::Disabled) {
106            trace!(
107                ?rate_limit,
108                "create snapshot backfill executor with rate limit"
109            );
110        }
111        Self {
112            upstream_table,
113            progress_state_table,
114            upstream,
115            output_indices,
116            progress,
117            chunk_size,
118            rate_limit,
119            barrier_rx,
120            actor_ctx,
121            metrics,
122            snapshot_epoch,
123        }
124    }
125
126    #[try_stream(ok = Message, error = StreamExecutorError)]
127    async fn execute_inner(mut self) {
128        trace!("snapshot backfill executor start");
129        let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
130        trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
131        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
132        trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
133        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
134            self.snapshot_epoch
135        {
136            if first_upstream_barrier.epoch != first_recv_barrier.epoch {
137                assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
138                Some(snapshot_epoch)
139            } else {
140                None
141            }
142        } else {
143            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
144            if cfg!(debug_assertions) {
145                panic!(
146                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
147                    first_upstream_barrier.epoch, first_recv_barrier.epoch
148                );
149            } else {
150                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
151                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
152                None
153            }
154        };
155        let first_recv_barrier_epoch = first_recv_barrier.epoch;
156        let initial_backfill_paused =
157            first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
158        yield Message::Barrier(first_recv_barrier);
159        let mut backfill_state = BackfillState::new(
160            self.progress_state_table,
161            first_recv_barrier_epoch,
162            self.upstream_table.pk_serializer().clone(),
163        )
164        .await?;
165
166        let (mut barrier_epoch, mut need_report_finish) = {
167            if let Some(snapshot_epoch) = should_snapshot_backfill {
168                let table_id_str = format!("{}", self.upstream_table.table_id());
169                let actor_id_str = format!("{}", self.actor_ctx.id);
170
171                let consume_upstream_row_count = self
172                    .metrics
173                    .snapshot_backfill_consume_row_count
174                    .with_guarded_label_values(&[
175                        table_id_str.as_str(),
176                        actor_id_str.as_str(),
177                        "consume_upstream",
178                    ]);
179
180                let mut upstream_buffer = UpstreamBuffer::new(
181                    &mut self.upstream,
182                    first_upstream_barrier,
183                    consume_upstream_row_count,
184                );
185
186                // Phase 1: consume upstream snapshot
187                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
188                    < snapshot_epoch
189                {
190                    trace!(
191                        table_id = %self.upstream_table.table_id(),
192                        snapshot_epoch,
193                        barrier_epoch = ?first_recv_barrier_epoch,
194                        "start consuming snapshot"
195                    );
196                    {
197                        let consuming_snapshot_row_count = self
198                            .metrics
199                            .snapshot_backfill_consume_row_count
200                            .with_guarded_label_values(&[
201                                table_id_str.as_str(),
202                                actor_id_str.as_str(),
203                                "consuming_snapshot",
204                            ]);
205                        let snapshot_stream = make_consume_snapshot_stream(
206                            &self.upstream_table,
207                            snapshot_epoch,
208                            self.chunk_size,
209                            &mut self.rate_limit,
210                            &mut self.barrier_rx,
211                            &mut self.progress,
212                            &mut backfill_state,
213                            first_recv_barrier_epoch,
214                            initial_backfill_paused,
215                            &self.actor_ctx,
216                        );
217
218                        pin_mut!(snapshot_stream);
219
220                        while let Some(message) = upstream_buffer
221                            .run_future(snapshot_stream.try_next())
222                            .await?
223                        {
224                            if let Message::Chunk(chunk) = &message {
225                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
226                            }
227                            yield message;
228                        }
229                    }
230
231                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
232                    let recv_barrier_epoch = recv_barrier.epoch;
233                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
234                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
235                    yield Message::Barrier(recv_barrier);
236                    post_commit.post_yield_barrier(None).await?;
237                    (
238                        recv_barrier_epoch,
239                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
240                    )
241                } else {
242                    trace!(
243                        table_id = %self.upstream_table.table_id(),
244                        snapshot_epoch,
245                        barrier_epoch = ?first_recv_barrier_epoch,
246                        "skip consuming snapshot"
247                    );
248                    (
249                        first_recv_barrier_epoch,
250                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
251                    )
252                };
253
254                // Phase 2: consume upstream log store
255                if let Some(mut upstream_buffer) = upstream_buffer {
256                    let initial_pending_lag = Duration::from_millis(
257                        Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
258                    );
259                    trace!(
260                        ?barrier_epoch,
261                        table_id = %self.upstream_table.table_id(),
262                        ?initial_pending_lag,
263                        "start consuming log store"
264                    );
265
266                    let consuming_log_store_row_count = self
267                        .metrics
268                        .snapshot_backfill_consume_row_count
269                        .with_guarded_label_values(&[
270                            table_id_str.as_str(),
271                            actor_id_str.as_str(),
272                            "consuming_log_store",
273                        ]);
274                    loop {
275                        let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
276                        assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
277                        let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
278                        {
279                            // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
280                            // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
281                            let next_prev_epoch =
282                                self.upstream_table.next_epoch(barrier_epoch.prev).await?;
283                            assert_eq!(next_prev_epoch, barrier.epoch.prev);
284                        }
285                        barrier_epoch = barrier.epoch;
286                        trace!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
287                        // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
288                        // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
289                        // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
290                        let mut stream = upstream_buffer
291                            .run_future(make_log_stream(
292                                &self.upstream_table,
293                                barrier_epoch.prev,
294                                None,
295                                self.chunk_size,
296                            ))
297                            .await?;
298                        while let Some(chunk) =
299                            upstream_buffer.run_future(stream.try_next()).await?
300                        {
301                            trace!(
302                                ?barrier_epoch,
303                                size = chunk.cardinality(),
304                                "consume change log yield chunk",
305                            );
306                            consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
307                            yield Message::Chunk(chunk);
308                        }
309
310                        trace!(?barrier_epoch, "after consume change log");
311
312                        if is_finished {
313                            assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
314                            self.progress.finish_consuming_log_store(barrier.epoch);
315                        } else {
316                            self.progress.update_create_mview_log_store_progress(
317                                barrier.epoch,
318                                upstream_buffer.pending_epoch_lag(),
319                            );
320                        }
321
322                        stream
323                            .for_vnode_pk_progress(|vnode, row_count, progress| {
324                                assert_eq!(progress, None);
325                                backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
326                            })
327                            .await?;
328                        let post_commit = backfill_state.commit(barrier.epoch).await?;
329                        let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
330                        yield Message::Barrier(barrier);
331                        post_commit.post_yield_barrier(None).await?;
332                        if update_vnode_bitmap.is_some() {
333                            return Err(anyhow!(
334                                "should not update vnode bitmap during consuming log store"
335                            )
336                            .into());
337                        }
338
339                        if is_finished {
340                            break;
341                        }
342                    }
343                    trace!(
344                        ?barrier_epoch,
345                        table_id = %self.upstream_table.table_id(),
346                        "finish consuming log store"
347                    );
348
349                    (barrier_epoch, false)
350                } else {
351                    trace!(
352                        ?barrier_epoch,
353                        table_id = %self.upstream_table.table_id(),
354                        "skip consuming log store and start consuming upstream directly"
355                    );
356
357                    (barrier_epoch, true)
358                }
359            } else {
360                backfill_state
361                    .latest_progress()
362                    .for_each(|(vnode, progress)| {
363                        let progress = progress.expect("should not be empty");
364                        assert_eq!(
365                            progress.epoch, first_upstream_barrier.epoch.prev,
366                            "vnode: {:?}",
367                            vnode
368                        );
369                        assert_eq!(
370                            progress.progress,
371                            EpochBackfillProgress::Consumed,
372                            "vnode: {:?}",
373                            vnode
374                        );
375                    });
376                trace!(
377                    table_id = %self.upstream_table.table_id(),
378                    "skip backfill"
379                );
380                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
381                (first_upstream_barrier.epoch, true)
382            }
383        };
384        let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
385        let mut epoch_row_count = 0;
386        // Phase 3: consume upstream
387        while let Some(msg) = upstream.try_next().await? {
388            match msg {
389                Message::Barrier(barrier) => {
390                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
391                    self.upstream_table
392                        .vnodes()
393                        .iter_vnodes()
394                        .for_each(|vnode| {
395                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
396                            // executor.
397                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
398                        });
399                    epoch_row_count = 0;
400                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
401                    barrier_epoch = barrier.epoch;
402                    if need_report_finish {
403                        need_report_finish = false;
404                        self.progress.finish_consuming_log_store(barrier_epoch);
405                    }
406                    let post_commit = backfill_state.commit(barrier.epoch).await?;
407                    yield Message::Barrier(barrier);
408                    if let Some(new_vnode_bitmap) =
409                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
410                    {
411                        let _prev_vnode_bitmap =
412                            self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
413                        backfill_state
414                            .latest_progress()
415                            .for_each(|(vnode, progress)| {
416                                let progress = progress.expect("should not be empty");
417                                assert_eq!(
418                                    progress.epoch, barrier_epoch.prev,
419                                    "vnode {:?} has unexpected progress epoch",
420                                    vnode
421                                );
422                                assert_eq!(
423                                    progress.progress,
424                                    EpochBackfillProgress::Consumed,
425                                    "vnode {:?} has unexpected progress",
426                                    vnode
427                                );
428                            });
429                    }
430                }
431                msg => {
432                    if let Message::Chunk(chunk) = &msg {
433                        epoch_row_count += chunk.cardinality();
434                    }
435                    yield msg;
436                }
437            }
438        }
439    }
440}
441
442impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
443    fn execute(self: Box<Self>) -> BoxedMessageStream {
444        let output_indices = self.output_indices.clone();
445        self.execute_inner()
446            .filter_map(move |result| {
447                ready({
448                    match result {
449                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
450                        Err(e) => Some(Err(e)),
451                    }
452                })
453            })
454            .boxed()
455    }
456}
457
458struct ConsumingSnapshot;
459struct ConsumingLogStore;
460
461struct PendingBarriers {
462    first_upstream_barrier_epoch: EpochPair,
463
464    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
465    /// Newer barrier at the front
466    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
467
468    /// In the outer `VecDeque`, newer barriers at the front.
469    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
470    /// and others as non-checkpoint barrier
471    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
472}
473
474impl PendingBarriers {
475    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
476        Self {
477            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
478            pending_non_checkpoint_barriers: Default::default(),
479            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
480                first_upstream_barrier,
481            ])]),
482        }
483    }
484
485    fn add(&mut self, barrier: DispatcherBarrier) {
486        let is_checkpoint = barrier.kind.is_checkpoint();
487        self.pending_non_checkpoint_barriers.push_front(barrier);
488        if is_checkpoint {
489            self.checkpoint_barrier_groups
490                .push_front(take(&mut self.pending_non_checkpoint_barriers));
491        }
492    }
493
494    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
495        self.checkpoint_barrier_groups.pop_back()
496    }
497
498    fn consume_epoch(&mut self, epoch: EpochPair) {
499        let barriers = self
500            .checkpoint_barrier_groups
501            .back_mut()
502            .expect("non-empty");
503        let oldest_upstream_barrier = barriers.back().expect("non-empty");
504        assert!(
505            oldest_upstream_barrier.epoch.prev >= epoch.prev,
506            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
507            oldest_upstream_barrier.epoch,
508            epoch
509        );
510        if oldest_upstream_barrier.epoch.prev == epoch.prev {
511            assert_eq!(oldest_upstream_barrier.epoch, epoch);
512            barriers.pop_back();
513            if barriers.is_empty() {
514                self.checkpoint_barrier_groups.pop_back();
515            }
516        }
517    }
518
519    fn latest_epoch(&self) -> Option<EpochPair> {
520        self.pending_non_checkpoint_barriers
521            .front()
522            .or_else(|| {
523                self.checkpoint_barrier_groups
524                    .front()
525                    .and_then(|barriers| barriers.front())
526            })
527            .map(|barrier| barrier.epoch)
528    }
529
530    fn checkpoint_epoch_count(&self) -> usize {
531        self.checkpoint_barrier_groups.len()
532    }
533
534    fn has_checkpoint_epoch(&self) -> bool {
535        !self.checkpoint_barrier_groups.is_empty()
536    }
537}
538
539struct UpstreamBuffer<'a, S> {
540    upstream: &'a mut MergeExecutorInput,
541    max_pending_epoch_lag: u64,
542    consumed_epoch: u64,
543    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
544    upstream_pending_barriers: PendingBarriers,
545    /// Whether we have started polling any upstream data before the next barrier.
546    /// When `true`, we should continue polling until the next barrier, because
547    /// some data in this epoch have been discarded and data in this epoch
548    /// must be read from log store
549    is_polling_epoch_data: bool,
550    consume_upstream_row_count: LabelGuardedIntCounter,
551    _phase: S,
552}
553
554impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
555    fn new(
556        upstream: &'a mut MergeExecutorInput,
557        first_upstream_barrier: DispatcherBarrier,
558        consume_upstream_row_count: LabelGuardedIntCounter,
559    ) -> Self {
560        Self {
561            upstream,
562            is_polling_epoch_data: false,
563            consume_upstream_row_count,
564            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
565            // no limit on the number of pending barrier in the beginning
566            max_pending_epoch_lag: u64::MAX,
567            consumed_epoch: 0,
568            _phase: ConsumingSnapshot {},
569        }
570    }
571
572    fn start_consuming_log_store(
573        mut self,
574        consumed_epoch: u64,
575    ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
576        if self
577            .upstream_pending_barriers
578            .first_upstream_barrier_epoch
579            .prev
580            == consumed_epoch
581        {
582            assert_eq!(
583                1,
584                self.upstream_pending_barriers
585                    .pop()
586                    .expect("non-empty")
587                    .len()
588            );
589        }
590        let max_pending_epoch_lag = self.pending_epoch_lag();
591        let buffer = UpstreamBuffer {
592            upstream: self.upstream,
593            upstream_pending_barriers: self.upstream_pending_barriers,
594            max_pending_epoch_lag,
595            is_polling_epoch_data: self.is_polling_epoch_data,
596            consume_upstream_row_count: self.consume_upstream_row_count,
597            consumed_epoch,
598            _phase: ConsumingLogStore {},
599        };
600        if buffer.is_finished() {
601            None
602        } else {
603            Some(buffer)
604        }
605    }
606}
607
608impl<S> UpstreamBuffer<'_, S> {
609    fn can_consume_upstream(&self) -> bool {
610        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
611    }
612
613    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
614        {
615            loop {
616                if let Err(e) = try {
617                    if !self.can_consume_upstream() {
618                        // pause the future to block consuming upstream
619                        return pending().await;
620                    }
621                    self.consume_until_next_checkpoint_barrier().await?;
622                } {
623                    break e;
624                }
625            }
626        }
627    }
628
629    /// Consume the upstream until seeing the next barrier.
630    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
631        loop {
632            let msg: DispatcherMessage = self
633                .upstream
634                .try_next()
635                .await?
636                .ok_or_else(|| anyhow!("end of upstream"))?;
637            match msg {
638                DispatcherMessage::Chunk(chunk) => {
639                    self.is_polling_epoch_data = true;
640                    self.consume_upstream_row_count
641                        .inc_by(chunk.cardinality() as _);
642                }
643                DispatcherMessage::Barrier(barrier) => {
644                    let is_checkpoint = barrier.kind.is_checkpoint();
645                    self.upstream_pending_barriers.add(barrier);
646                    if is_checkpoint {
647                        self.is_polling_epoch_data = false;
648                        break;
649                    } else {
650                        self.is_polling_epoch_data = true;
651                    }
652                }
653                DispatcherMessage::Watermark(_) => {
654                    self.is_polling_epoch_data = true;
655                }
656            }
657        }
658        Ok(())
659    }
660}
661
662impl UpstreamBuffer<'_, ConsumingLogStore> {
663    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
664        assert!(!self.is_finished());
665        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
666            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
667            // we must have returned true to indicate finish, and should not be called again.
668            assert!(self.is_polling_epoch_data);
669            self.consume_until_next_checkpoint_barrier().await?;
670            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
671        }
672        self.upstream_pending_barriers.consume_epoch(epoch);
673
674        {
675            {
676                let prev_epoch = epoch.prev;
677                assert!(self.consumed_epoch < prev_epoch);
678                let elapsed_epoch = prev_epoch - self.consumed_epoch;
679                self.consumed_epoch = prev_epoch;
680                if self.upstream_pending_barriers.has_checkpoint_epoch() {
681                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
682                    while self.can_consume_upstream()
683                        && let Some(result) =
684                            self.consume_until_next_checkpoint_barrier().now_or_never()
685                    {
686                        result?;
687                    }
688                }
689                // sub to ensure that the lag is monotonically decreasing.
690                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
691                // the upstream can at least progress for one epoch.
692                self.max_pending_epoch_lag = min(
693                    self.pending_epoch_lag(),
694                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
695                );
696            }
697        }
698        Ok(self.is_finished())
699    }
700
701    fn is_finished(&self) -> bool {
702        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
703            assert!(
704                self.upstream_pending_barriers
705                    .pending_non_checkpoint_barriers
706                    .is_empty()
707            )
708        }
709        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
710    }
711}
712
713impl<S> UpstreamBuffer<'_, S> {
714    /// Run a future while concurrently polling the upstream so that the upstream
715    /// won't be back-pressured.
716    async fn run_future<T, E: Into<StreamExecutorError>>(
717        &mut self,
718        future: impl Future<Output = Result<T, E>>,
719    ) -> StreamExecutorResult<T> {
720        select! {
721            biased;
722            e = self.concurrently_consume_upstream() => {
723                Err(e)
724            }
725            // this arm won't be starved, because the first arm is always pending unless returning with error
726            result = future => {
727                result.map_err(Into::into)
728            }
729        }
730    }
731
732    fn pending_epoch_lag(&self) -> u64 {
733        self.upstream_pending_barriers
734            .latest_epoch()
735            .map(|epoch| {
736                epoch
737                    .prev
738                    .checked_sub(self.consumed_epoch)
739                    .expect("pending epoch must be later than consumed_epoch")
740            })
741            .unwrap_or(0)
742    }
743}
744
745async fn make_log_stream(
746    upstream_table: &BatchTable<impl StateStore>,
747    prev_epoch: u64,
748    start_pk: Option<OwnedRow>,
749    chunk_size: usize,
750) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
751    let data_types = upstream_table.schema().data_types();
752    let start_pk = start_pk.as_ref();
753    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
754    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
755        upstream_table
756            .batch_iter_vnode_log(
757                prev_epoch,
758                HummockReadEpoch::Committed(prev_epoch),
759                start_pk,
760                vnode,
761            )
762            .map_ok(move |stream| {
763                let stream = stream.map_err(Into::into);
764                (vnode, stream, 0)
765            })
766    }))
767    .await?;
768    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
769    Ok(VnodeStream::new(
770        vnode_streams,
771        upstream_table.pk_in_output_indices().expect("should exist"),
772        builder,
773    ))
774}
775
776async fn make_snapshot_stream(
777    upstream_table: &BatchTable<impl StateStore>,
778    snapshot_epoch: u64,
779    backfill_state: &BackfillState<impl StateStore>,
780    rate_limit: RateLimit,
781    chunk_size: usize,
782) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
783    let data_types = upstream_table.schema().data_types();
784    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
785        move |(vnode, progress)| {
786            let start_pk = match progress {
787                None => Some((None, 0)),
788                Some(VnodeBackfillProgress {
789                    row_count,
790                    progress: EpochBackfillProgress::Consuming { latest_pk },
791                    ..
792                }) => Some((Some(latest_pk), *row_count)),
793                Some(VnodeBackfillProgress {
794                    progress: EpochBackfillProgress::Consumed,
795                    ..
796                }) => None,
797            };
798            start_pk.map(|(start_pk, row_count)| {
799                upstream_table
800                    .batch_iter_vnode(
801                        HummockReadEpoch::Committed(snapshot_epoch),
802                        start_pk,
803                        vnode,
804                        PrefetchOptions::prefetch_for_large_range_scan(),
805                    )
806                    .map_ok(move |stream| {
807                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
808                        (vnode, stream, row_count)
809                    })
810            })
811        },
812    ))
813    .await?;
814    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
815    Ok(VnodeStream::new(
816        vnode_streams,
817        upstream_table.pk_in_output_indices().expect("should exist"),
818        builder,
819    ))
820}
821
822#[expect(clippy::too_many_arguments)]
823#[try_stream(ok = Message, error = StreamExecutorError)]
824async fn make_consume_snapshot_stream<'a, S: StateStore>(
825    upstream_table: &'a BatchTable<S>,
826    snapshot_epoch: u64,
827    chunk_size: usize,
828    rate_limit: &'a mut RateLimit,
829    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
830    progress: &'a mut CreateMviewProgressReporter,
831    backfill_state: &'a mut BackfillState<S>,
832    first_recv_barrier_epoch: EpochPair,
833    initial_backfill_paused: bool,
834    actor_ctx: &'a ActorContextRef,
835) {
836    let mut barrier_epoch = first_recv_barrier_epoch;
837
838    // start consume upstream snapshot
839    let mut snapshot_stream = make_snapshot_stream(
840        upstream_table,
841        snapshot_epoch,
842        &*backfill_state,
843        *rate_limit,
844        chunk_size,
845    )
846    .await?;
847
848    async fn select_barrier_and_snapshot_stream(
849        barrier_rx: &mut UnboundedReceiver<Barrier>,
850        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
851        throttle_snapshot_stream: bool,
852        backfill_paused: bool,
853    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
854        select!(
855            result = receive_next_barrier(barrier_rx) => {
856                Ok(Either::Left(result?))
857            },
858            result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
859                Ok(Either::Right(result?))
860            }
861        )
862    }
863
864    let mut count = 0;
865    let mut epoch_row_count = 0;
866    let mut backfill_paused = initial_backfill_paused;
867    loop {
868        let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
869        match select_barrier_and_snapshot_stream(
870            barrier_rx,
871            &mut snapshot_stream,
872            throttle_snapshot_stream,
873            backfill_paused,
874        )
875        .await?
876        {
877            Either::Left(barrier) => {
878                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
879                barrier_epoch = barrier.epoch;
880
881                if barrier_epoch.curr >= snapshot_epoch {
882                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
883                }
884                if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
885                    if backfill_paused {
886                        backfill_paused = false;
887                    } else {
888                        tracing::error!(
889                            "received start fragment backfill mutation, but backfill is not paused"
890                        );
891                    }
892                }
893                if let Some(chunk) = snapshot_stream.consume_builder() {
894                    count += chunk.cardinality();
895                    epoch_row_count += chunk.cardinality();
896                    yield Message::Chunk(chunk);
897                }
898                snapshot_stream
899                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
900                        if let Some(pk) = pk_progress {
901                            backfill_state.update_epoch_progress(
902                                vnode,
903                                snapshot_epoch,
904                                row_count,
905                                pk,
906                            );
907                        } else {
908                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
909                        }
910                    })
911                    .await?;
912                let post_commit = backfill_state.commit(barrier.epoch).await?;
913                trace!(?barrier_epoch, count, epoch_row_count, "update progress");
914                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
915                epoch_row_count = 0;
916
917                let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
918                    if let Mutation::Throttle(config) = &**m
919                        && let Some(config) = config.get(&actor_ctx.fragment_id)
920                        && config.throttle_type() == PbThrottleType::Backfill
921                    {
922                        Some(config.rate_limit)
923                    } else {
924                        None
925                    }
926                });
927                yield Message::Barrier(barrier);
928                post_commit.post_yield_barrier(None).await?;
929
930                if let Some(new_rate_limit) = new_rate_limit {
931                    let new_rate_limit = new_rate_limit.into();
932                    *rate_limit = new_rate_limit;
933                    snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
934                }
935            }
936            Either::Right(Some(chunk)) => {
937                if backfill_paused {
938                    return Err(
939                        anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
940                    );
941                }
942                count += chunk.cardinality();
943                epoch_row_count += chunk.cardinality();
944                yield Message::Chunk(chunk);
945            }
946            Either::Right(None) => {
947                break;
948            }
949        }
950    }
951
952    // finish consuming upstream snapshot, report finish
953    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
954    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
955    barrier_epoch = barrier_to_report_finish.epoch;
956    trace!(?barrier_epoch, count, "report finish");
957    snapshot_stream
958        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
959            assert_eq!(pk_progress, None);
960            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
961        })
962        .await?;
963    let post_commit = backfill_state.commit(barrier_epoch).await?;
964    progress.finish(barrier_epoch, count as _);
965    yield Message::Barrier(barrier_to_report_finish);
966    post_commit.post_yield_barrier(None).await?;
967
968    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
969    loop {
970        let barrier = receive_next_barrier(barrier_rx).await?;
971        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
972        barrier_epoch = barrier.epoch;
973        let post_commit = backfill_state.commit(barrier.epoch).await?;
974        yield Message::Barrier(barrier);
975        post_commit.post_yield_barrier(None).await?;
976        if barrier_epoch.curr == snapshot_epoch {
977            break;
978        }
979    }
980    trace!(?barrier_epoch, "finish consuming snapshot");
981}