risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::common::PbThrottleType;
33use risingwave_storage::StateStore;
34use risingwave_storage::store::PrefetchOptions;
35use risingwave_storage::table::ChangeLogRow;
36use risingwave_storage::table::batch_table::BatchTable;
37use tokio::select;
38use tokio::sync::mpsc::UnboundedReceiver;
39use tokio::time::sleep;
40
41use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
42use crate::executor::backfill::snapshot_backfill::state::{
43    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
44};
45use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
46use crate::executor::backfill::utils::{create_builder, mapping_message};
47use crate::executor::monitor::StreamingMetrics;
48use crate::executor::prelude::{StateTable, StreamExt, try_stream};
49use crate::executor::{
50    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
51    MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
52    expect_first_barrier,
53};
54use crate::task::CreateMviewProgressReporter;
55
56pub struct SnapshotBackfillExecutor<S: StateStore> {
57    /// Upstream table
58    upstream_table: BatchTable<S>,
59
60    /// Backfill progress table
61    progress_state_table: StateTable<S>,
62
63    /// Upstream with the same schema with the upstream table.
64    upstream: Option<MergeExecutorInput>,
65
66    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
67    output_indices: Vec<usize>,
68
69    progress: CreateMviewProgressReporter,
70
71    chunk_size: usize,
72    rate_limit: RateLimit,
73
74    barrier_rx: UnboundedReceiver<Barrier>,
75
76    actor_ctx: ActorContextRef,
77    metrics: Arc<StreamingMetrics>,
78
79    snapshot_epoch: Option<u64>,
80}
81
82impl<S: StateStore> SnapshotBackfillExecutor<S> {
83    #[expect(clippy::too_many_arguments)]
84    pub(crate) fn new(
85        upstream_table: BatchTable<S>,
86        progress_state_table: StateTable<S>,
87        upstream: Option<MergeExecutorInput>,
88        output_indices: Vec<usize>,
89        actor_ctx: ActorContextRef,
90        progress: CreateMviewProgressReporter,
91        chunk_size: usize,
92        rate_limit: RateLimit,
93        barrier_rx: UnboundedReceiver<Barrier>,
94        metrics: Arc<StreamingMetrics>,
95        snapshot_epoch: Option<u64>,
96    ) -> Self {
97        if let Some(upstream) = &upstream {
98            assert_eq!(&upstream.info.schema, upstream_table.schema());
99        }
100        if upstream_table.pk_in_output_indices().is_none() {
101            panic!(
102                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
103                upstream_table.pk_indices(),
104                upstream_table.output_indices(),
105                upstream_table.schema()
106            )
107        };
108        if !matches!(rate_limit, RateLimit::Disabled) {
109            trace!(
110                ?rate_limit,
111                "create snapshot backfill executor with rate limit"
112            );
113        }
114        Self {
115            upstream_table,
116            progress_state_table,
117            upstream,
118            output_indices,
119            progress,
120            chunk_size,
121            rate_limit,
122            barrier_rx,
123            actor_ctx,
124            metrics,
125            snapshot_epoch,
126        }
127    }
128
129    #[try_stream(ok = Message, error = StreamExecutorError)]
130    async fn execute_inner(mut self) {
131        trace!("snapshot backfill executor start");
132        let upstream = if let Some(mut upstream) = self.upstream {
133            let first_upstream_barrier = expect_first_barrier(&mut upstream).await?;
134            trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
135            Some((first_upstream_barrier, upstream))
136        } else {
137            None
138        };
139        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
140        trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
141        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
142            self.snapshot_epoch
143        {
144            if let Some((first_upstream_barrier, _)) = &upstream {
145                if first_upstream_barrier.epoch != first_recv_barrier.epoch {
146                    assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
147                    Some(snapshot_epoch)
148                } else {
149                    None
150                }
151            } else {
152                // must go through snapshot backfill when having no upstream
153                Some(snapshot_epoch)
154            }
155        } else {
156            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
157            if cfg!(debug_assertions) {
158                panic!(
159                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
160                    upstream.map(|(first_upstream_barrier, _)| first_upstream_barrier.epoch),
161                    first_recv_barrier.epoch
162                );
163            } else {
164                let (first_upstream_barrier, _) = upstream
165                    .as_ref()
166                    .ok_or_else(|| anyhow!("no upstream while snapshot epoch not set"))?;
167                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
168                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
169                None
170            }
171        };
172        let first_recv_barrier_epoch = first_recv_barrier.epoch;
173        let initial_backfill_paused =
174            first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
175        yield Message::Barrier(first_recv_barrier);
176        let mut backfill_state = BackfillState::new(
177            self.progress_state_table,
178            first_recv_barrier_epoch,
179            self.upstream_table.pk_serializer().clone(),
180        )
181        .await?;
182
183        let (mut barrier_epoch, mut need_report_finish, upstream) = {
184            if let Some(snapshot_epoch) = should_snapshot_backfill {
185                let table_id_str = format!("{}", self.upstream_table.table_id());
186                let actor_id_str = format!("{}", self.actor_ctx.id);
187
188                let consume_upstream_row_count = self
189                    .metrics
190                    .snapshot_backfill_consume_row_count
191                    .with_guarded_label_values(&[
192                        table_id_str.as_str(),
193                        actor_id_str.as_str(),
194                        "consume_upstream",
195                    ]);
196
197                let mut upstream_buffer = if let Some((first_upstream_barrier, upstream)) = upstream
198                {
199                    SnapshotBackfillUpstream::Buffer(UpstreamBuffer::new(
200                        upstream,
201                        first_upstream_barrier,
202                        consume_upstream_row_count,
203                    ))
204                } else {
205                    SnapshotBackfillUpstream::Empty
206                };
207
208                // Phase 1: consume upstream snapshot
209                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
210                    < snapshot_epoch
211                {
212                    trace!(
213                        table_id = %self.upstream_table.table_id(),
214                        snapshot_epoch,
215                        barrier_epoch = ?first_recv_barrier_epoch,
216                        "start consuming snapshot"
217                    );
218                    {
219                        let consuming_snapshot_row_count = self
220                            .metrics
221                            .snapshot_backfill_consume_row_count
222                            .with_guarded_label_values(&[
223                                table_id_str.as_str(),
224                                actor_id_str.as_str(),
225                                "consuming_snapshot",
226                            ]);
227                        let snapshot_stream = make_consume_snapshot_stream(
228                            &self.upstream_table,
229                            snapshot_epoch,
230                            self.chunk_size,
231                            &mut self.rate_limit,
232                            &mut self.barrier_rx,
233                            &mut self.progress,
234                            &mut backfill_state,
235                            first_recv_barrier_epoch,
236                            initial_backfill_paused,
237                            &self.actor_ctx,
238                        );
239
240                        pin_mut!(snapshot_stream);
241
242                        while let Some(message) = upstream_buffer
243                            .run_future(snapshot_stream.try_next())
244                            .await?
245                        {
246                            if let Message::Chunk(chunk) = &message {
247                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
248                            }
249                            yield message;
250                        }
251                    }
252
253                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
254                    let recv_barrier_epoch = recv_barrier.epoch;
255                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
256                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
257                    yield Message::Barrier(recv_barrier);
258                    post_commit.post_yield_barrier(None).await?;
259                    (
260                        recv_barrier_epoch,
261                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
262                    )
263                } else {
264                    trace!(
265                        table_id = %self.upstream_table.table_id(),
266                        snapshot_epoch,
267                        barrier_epoch = ?first_recv_barrier_epoch,
268                        "skip consuming snapshot"
269                    );
270                    (
271                        first_recv_barrier_epoch,
272                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
273                    )
274                };
275
276                // Phase 2: consume upstream log store
277                match upstream_buffer {
278                    Either::Left(mut upstream_buffer) => {
279                        let initial_pending_lag =
280                            if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
281                                &upstream_buffer
282                            {
283                                Some(Duration::from_millis(
284                                    Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
285                                ))
286                            } else {
287                                None
288                            };
289                        trace!(
290                            ?barrier_epoch,
291                            table_id = %self.upstream_table.table_id(),
292                            ?initial_pending_lag,
293                            "start consuming log store"
294                        );
295
296                        let consuming_log_store_row_count = self
297                            .metrics
298                            .snapshot_backfill_consume_row_count
299                            .with_guarded_label_values(&[
300                                table_id_str.as_str(),
301                                actor_id_str.as_str(),
302                                "consuming_log_store",
303                            ]);
304                        let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
305                        loop {
306                            let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
307                            assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
308                            let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
309                            // Disable calling next_epoch, because, if barrier_epoch.prev is a checkpoint epoch,
310                            // next_epoch(barrier_epoch.prev) is actually waiting for the committed epoch.
311                            // However, upstream_buffer's is_polling_epoch_data can be false, since just received
312                            // the checkpoint barrier_epoch.prev. And then the upstream_buffer may stop polling upstream
313                            // when the max_pending_epoch_lag is small. When upstream is not polled, the barrier of the next
314                            // committed epoch cannot be collected.
315                            // {
316                            //     // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
317                            //     // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
318                            //     let next_prev_epoch = upstream_buffer
319                            //         .run_future(self.upstream_table.next_epoch(barrier_epoch.prev))
320                            //         .await?;
321                            //     assert_eq!(next_prev_epoch, barrier.epoch.prev);
322                            // }
323                            barrier_epoch = barrier.epoch;
324                            if barrier.kind.is_checkpoint() {
325                                let pending_non_checkpoint_barrier =
326                                    take(&mut pending_non_checkpoint_barrier);
327                                let end_epoch = barrier_epoch.prev;
328                                let start_epoch = pending_non_checkpoint_barrier
329                                    .first()
330                                    .map(|epoch| epoch.prev)
331                                    .unwrap_or(end_epoch);
332                                trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
333                                // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
334                                // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
335                                // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
336                                let mut stream = upstream_buffer
337                                    .run_future(make_log_stream(
338                                        &self.upstream_table,
339                                        start_epoch,
340                                        end_epoch,
341                                        None,
342                                        self.chunk_size,
343                                    ))
344                                    .await?;
345                                while let Some(chunk) =
346                                    upstream_buffer.run_future(stream.try_next()).await?
347                                {
348                                    trace!(
349                                        ?barrier_epoch,
350                                        size = chunk.cardinality(),
351                                        "consume change log yield chunk",
352                                    );
353                                    consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
354                                    yield Message::Chunk(chunk);
355                                }
356
357                                trace!(?barrier_epoch, "after consume change log");
358
359                                stream
360                                    .for_vnode_pk_progress(|vnode, row_count, progress| {
361                                        assert_eq!(progress, None);
362                                        backfill_state.finish_epoch(
363                                            vnode,
364                                            barrier.epoch.prev,
365                                            row_count,
366                                        );
367                                    })
368                                    .await?;
369                            } else {
370                                pending_non_checkpoint_barrier.push(barrier.epoch);
371                            }
372
373                            if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
374                                &upstream_buffer
375                            {
376                                if is_finished {
377                                    assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
378                                    assert!(pending_non_checkpoint_barrier.is_empty());
379                                    self.progress.finish_consuming_log_store(barrier.epoch);
380                                } else {
381                                    self.progress.update_create_mview_log_store_progress(
382                                        barrier.epoch,
383                                        upstream_buffer.pending_epoch_lag(),
384                                    );
385                                }
386                            }
387
388                            let post_commit = backfill_state.commit(barrier.epoch).await?;
389                            let update_vnode_bitmap =
390                                barrier.as_update_vnode_bitmap(self.actor_ctx.id);
391                            yield Message::Barrier(barrier);
392                            post_commit.post_yield_barrier(None).await?;
393                            if update_vnode_bitmap.is_some() {
394                                return Err(anyhow!(
395                                    "should not update vnode bitmap during consuming log store"
396                                )
397                                .into());
398                            }
399
400                            if is_finished {
401                                assert!(
402                                    pending_non_checkpoint_barrier.is_empty(),
403                                    "{pending_non_checkpoint_barrier:?}"
404                                );
405                                break;
406                            }
407                        }
408                        trace!(
409                            ?barrier_epoch,
410                            table_id = %self.upstream_table.table_id(),
411                            "finish consuming log store"
412                        );
413
414                        (
415                            barrier_epoch,
416                            false,
417                            upstream_buffer.start_consuming_upstream(),
418                        )
419                    }
420                    Either::Right(upstream) => {
421                        trace!(
422                            ?barrier_epoch,
423                            table_id = %self.upstream_table.table_id(),
424                            "skip consuming log store and start consuming upstream directly"
425                        );
426
427                        (barrier_epoch, true, upstream)
428                    }
429                }
430            } else {
431                let (first_upstream_barrier, _) = upstream
432                    .as_ref()
433                    .expect("should have upstream when skipping snapshot backfill");
434                backfill_state
435                    .latest_progress()
436                    .for_each(|(vnode, progress)| {
437                        let progress = progress.expect("should not be empty");
438                        assert_eq!(
439                            progress.epoch, first_upstream_barrier.epoch.prev,
440                            "vnode: {:?}",
441                            vnode
442                        );
443                        assert_eq!(
444                            progress.progress,
445                            EpochBackfillProgress::Consumed,
446                            "vnode: {:?}",
447                            vnode
448                        );
449                    });
450                trace!(
451                    table_id = %self.upstream_table.table_id(),
452                    "skip backfill"
453                );
454                let (first_upstream_barrier, upstream) =
455                    upstream.expect("should have upstream when skipping snapshot backfill");
456                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
457                (first_upstream_barrier.epoch, true, upstream)
458            }
459        };
460        let mut upstream = upstream.into_executor(self.barrier_rx).execute();
461        let mut epoch_row_count = 0;
462        // Phase 3: consume upstream
463        while let Some(msg) = upstream.try_next().await? {
464            match msg {
465                Message::Barrier(barrier) => {
466                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
467                    self.upstream_table
468                        .vnodes()
469                        .iter_vnodes()
470                        .for_each(|vnode| {
471                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
472                            // executor.
473                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
474                        });
475                    epoch_row_count = 0;
476                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
477                    barrier_epoch = barrier.epoch;
478                    if need_report_finish {
479                        need_report_finish = false;
480                        self.progress.finish_consuming_log_store(barrier_epoch);
481                    }
482                    let post_commit = backfill_state.commit(barrier.epoch).await?;
483                    yield Message::Barrier(barrier);
484                    if let Some(new_vnode_bitmap) =
485                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
486                    {
487                        let _prev_vnode_bitmap =
488                            self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
489                        backfill_state
490                            .latest_progress()
491                            .for_each(|(vnode, progress)| {
492                                let progress = progress.expect("should not be empty");
493                                assert_eq!(
494                                    progress.epoch, barrier_epoch.prev,
495                                    "vnode {:?} has unexpected progress epoch",
496                                    vnode
497                                );
498                                assert_eq!(
499                                    progress.progress,
500                                    EpochBackfillProgress::Consumed,
501                                    "vnode {:?} has unexpected progress",
502                                    vnode
503                                );
504                            });
505                    }
506                }
507                msg => {
508                    if let Message::Chunk(chunk) = &msg {
509                        epoch_row_count += chunk.cardinality();
510                    }
511                    yield msg;
512                }
513            }
514        }
515    }
516}
517
518impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
519    fn execute(self: Box<Self>) -> BoxedMessageStream {
520        let output_indices = self.output_indices.clone();
521        self.execute_inner()
522            .filter_map(move |result| {
523                ready({
524                    match result {
525                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
526                        Err(e) => Some(Err(e)),
527                    }
528                })
529            })
530            .boxed()
531    }
532}
533
534struct ConsumingSnapshot;
535struct ConsumingLogStore;
536
537#[derive(Debug)]
538struct PendingBarriers {
539    first_upstream_barrier_epoch: EpochPair,
540
541    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
542    /// Newer barrier at the front
543    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
544
545    /// In the outer `VecDeque`, newer barriers at the front.
546    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
547    /// and others as non-checkpoint barrier
548    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
549}
550
551impl PendingBarriers {
552    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
553        Self {
554            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
555            pending_non_checkpoint_barriers: Default::default(),
556            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
557                first_upstream_barrier,
558            ])]),
559        }
560    }
561
562    fn add(&mut self, barrier: DispatcherBarrier) {
563        let is_checkpoint = barrier.kind.is_checkpoint();
564        self.pending_non_checkpoint_barriers.push_front(barrier);
565        if is_checkpoint {
566            self.checkpoint_barrier_groups
567                .push_front(take(&mut self.pending_non_checkpoint_barriers));
568        }
569    }
570
571    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
572        self.checkpoint_barrier_groups.pop_back()
573    }
574
575    fn consume_epoch(&mut self, epoch: EpochPair) {
576        let barriers = self
577            .checkpoint_barrier_groups
578            .back_mut()
579            .expect("non-empty");
580        let oldest_upstream_barrier = barriers.back().expect("non-empty");
581        assert!(
582            oldest_upstream_barrier.epoch.prev >= epoch.prev,
583            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
584            oldest_upstream_barrier.epoch,
585            epoch
586        );
587        if oldest_upstream_barrier.epoch.prev == epoch.prev {
588            assert_eq!(oldest_upstream_barrier.epoch, epoch);
589            barriers.pop_back();
590            if barriers.is_empty() {
591                self.checkpoint_barrier_groups.pop_back();
592            }
593        }
594    }
595
596    fn latest_epoch(&self) -> Option<EpochPair> {
597        self.pending_non_checkpoint_barriers
598            .front()
599            .or_else(|| {
600                self.checkpoint_barrier_groups
601                    .front()
602                    .and_then(|barriers| barriers.front())
603            })
604            .map(|barrier| barrier.epoch)
605    }
606
607    fn checkpoint_epoch_count(&self) -> usize {
608        self.checkpoint_barrier_groups.len()
609    }
610
611    fn has_checkpoint_epoch(&self) -> bool {
612        !self.checkpoint_barrier_groups.is_empty()
613    }
614}
615
616enum SnapshotBackfillUpstream<S> {
617    Empty,
618    Buffer(UpstreamBuffer<S>),
619}
620
621impl<S> SnapshotBackfillUpstream<S> {
622    async fn run_future<T, E: Into<StreamExecutorError>>(
623        &mut self,
624        future: impl Future<Output = Result<T, E>>,
625    ) -> StreamExecutorResult<T> {
626        match self {
627            SnapshotBackfillUpstream::Empty => future.await.map_err(Into::into),
628            SnapshotBackfillUpstream::Buffer(buffer) => buffer.run_future(future).await,
629        }
630    }
631}
632
633impl SnapshotBackfillUpstream<ConsumingSnapshot> {
634    fn start_consuming_log_store(
635        self,
636        consumed_epoch: u64,
637    ) -> Either<SnapshotBackfillUpstream<ConsumingLogStore>, MergeExecutorInput> {
638        match self {
639            SnapshotBackfillUpstream::Empty => Either::Left(SnapshotBackfillUpstream::Empty),
640            SnapshotBackfillUpstream::Buffer(buffer) => {
641                match buffer.start_consuming_log_store(consumed_epoch) {
642                    Either::Left(buffer) => Either::Left(SnapshotBackfillUpstream::Buffer(buffer)),
643                    Either::Right(input) => Either::Right(input),
644                }
645            }
646        }
647    }
648}
649
650impl SnapshotBackfillUpstream<ConsumingLogStore> {
651    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
652        match self {
653            SnapshotBackfillUpstream::Empty => Ok(false),
654            SnapshotBackfillUpstream::Buffer(buffer) => buffer.consumed_epoch(epoch).await,
655        }
656    }
657
658    fn start_consuming_upstream(self) -> MergeExecutorInput {
659        match self {
660            SnapshotBackfillUpstream::Empty => {
661                unreachable!("unlike to start consuming upstream when having no upstream")
662            }
663            SnapshotBackfillUpstream::Buffer(buffer) => buffer.start_consuming_upstream(),
664        }
665    }
666}
667
668struct UpstreamBuffer<S> {
669    upstream: MergeExecutorInput,
670    max_pending_epoch_lag: u64,
671    consumed_epoch: u64,
672    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
673    upstream_pending_barriers: PendingBarriers,
674    /// Whether we have started polling any upstream data before the next checkpoint barrier.
675    /// When `true`, we should continue polling until the next checkpoint barrier, because
676    /// some data in this epoch have been discarded and data in this epoch
677    /// must be read from log store
678    is_polling_epoch_data: bool,
679    consume_upstream_row_count: LabelGuardedIntCounter,
680    _phase: S,
681}
682
683impl UpstreamBuffer<ConsumingSnapshot> {
684    fn new(
685        upstream: MergeExecutorInput,
686        first_upstream_barrier: DispatcherBarrier,
687        consume_upstream_row_count: LabelGuardedIntCounter,
688    ) -> Self {
689        Self {
690            upstream,
691            is_polling_epoch_data: false,
692            consume_upstream_row_count,
693            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
694            // no limit on the number of pending barrier in the beginning
695            max_pending_epoch_lag: u64::MAX,
696            consumed_epoch: 0,
697            _phase: ConsumingSnapshot {},
698        }
699    }
700
701    fn start_consuming_log_store(
702        mut self,
703        consumed_epoch: u64,
704    ) -> Either<UpstreamBuffer<ConsumingLogStore>, MergeExecutorInput> {
705        if self
706            .upstream_pending_barriers
707            .first_upstream_barrier_epoch
708            .prev
709            == consumed_epoch
710        {
711            assert_eq!(
712                1,
713                self.upstream_pending_barriers
714                    .pop()
715                    .expect("non-empty")
716                    .len()
717            );
718        }
719        let max_pending_epoch_lag = self.pending_epoch_lag();
720        let buffer = UpstreamBuffer {
721            upstream: self.upstream,
722            upstream_pending_barriers: self.upstream_pending_barriers,
723            max_pending_epoch_lag,
724            is_polling_epoch_data: self.is_polling_epoch_data,
725            consume_upstream_row_count: self.consume_upstream_row_count,
726            consumed_epoch,
727            _phase: ConsumingLogStore {},
728        };
729        if buffer.is_finished() {
730            Either::Right(buffer.upstream)
731        } else {
732            Either::Left(buffer)
733        }
734    }
735}
736
737impl<S> UpstreamBuffer<S> {
738    fn can_consume_upstream(&self) -> bool {
739        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
740    }
741
742    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
743        {
744            loop {
745                if let Err(e) = try {
746                    if !self.can_consume_upstream() {
747                        // pause the future to block consuming upstream
748                        sleep(Duration::from_secs(30)).await;
749                        warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
750                        return pending().await;
751                    }
752                    self.consume_until_next_checkpoint_barrier().await?;
753                } {
754                    break e;
755                }
756            }
757        }
758    }
759
760    /// Consume the upstream until seeing the next barrier.
761    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
762        loop {
763            let msg: DispatcherMessage = self
764                .upstream
765                .try_next()
766                .await?
767                .ok_or_else(|| anyhow!("end of upstream"))?;
768            match msg {
769                DispatcherMessage::Chunk(chunk) => {
770                    self.is_polling_epoch_data = true;
771                    self.consume_upstream_row_count
772                        .inc_by(chunk.cardinality() as _);
773                }
774                DispatcherMessage::Barrier(barrier) => {
775                    let is_checkpoint = barrier.kind.is_checkpoint();
776                    self.upstream_pending_barriers.add(barrier);
777                    if is_checkpoint {
778                        self.is_polling_epoch_data = false;
779                        break;
780                    } else {
781                        self.is_polling_epoch_data = true;
782                    }
783                }
784                DispatcherMessage::Watermark(_) => {
785                    self.is_polling_epoch_data = true;
786                }
787            }
788        }
789        Ok(())
790    }
791}
792
793impl UpstreamBuffer<ConsumingLogStore> {
794    #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
795    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
796        assert!(!self.is_finished());
797        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
798            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
799            // we must have returned true to indicate finish, and should not be called again.
800            assert!(self.is_polling_epoch_data);
801            self.consume_until_next_checkpoint_barrier().await?;
802            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
803        }
804        self.upstream_pending_barriers.consume_epoch(epoch);
805
806        {
807            {
808                let prev_epoch = epoch.prev;
809                assert!(self.consumed_epoch < prev_epoch);
810                let elapsed_epoch = prev_epoch - self.consumed_epoch;
811                self.consumed_epoch = prev_epoch;
812                if self.upstream_pending_barriers.has_checkpoint_epoch() {
813                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
814                    while self.can_consume_upstream()
815                        && let Some(result) =
816                            self.consume_until_next_checkpoint_barrier().now_or_never()
817                    {
818                        result?;
819                    }
820                }
821                // sub to ensure that the lag is monotonically decreasing.
822                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
823                // the upstream can at least progress for one epoch.
824                self.max_pending_epoch_lag = min(
825                    self.pending_epoch_lag(),
826                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
827                );
828            }
829        }
830        Ok(self.is_finished())
831    }
832
833    fn is_finished(&self) -> bool {
834        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
835            assert!(
836                self.upstream_pending_barriers
837                    .pending_non_checkpoint_barriers
838                    .is_empty()
839            )
840        }
841        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
842    }
843
844    fn start_consuming_upstream(self) -> MergeExecutorInput {
845        assert!(self.is_finished());
846        assert_eq!(self.pending_epoch_lag(), 0);
847        self.upstream
848    }
849}
850
851impl<S> UpstreamBuffer<S> {
852    /// Run a future while concurrently polling the upstream so that the upstream
853    /// won't be back-pressured.
854    async fn run_future<T, E: Into<StreamExecutorError>>(
855        &mut self,
856        future: impl Future<Output = Result<T, E>>,
857    ) -> StreamExecutorResult<T> {
858        select! {
859            biased;
860            e = self.concurrently_consume_upstream() => {
861                Err(e)
862            }
863            // this arm won't be starved, because the first arm is always pending unless returning with error
864            result = future => {
865                result.map_err(Into::into)
866            }
867        }
868    }
869
870    fn pending_epoch_lag(&self) -> u64 {
871        self.upstream_pending_barriers
872            .latest_epoch()
873            .map(|epoch| {
874                epoch
875                    .prev
876                    .checked_sub(self.consumed_epoch)
877                    .expect("pending epoch must be later than consumed_epoch")
878            })
879            .unwrap_or(0)
880    }
881}
882
883#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
884async fn make_log_stream(
885    upstream_table: &BatchTable<impl StateStore>,
886    start_epoch: u64,
887    end_epoch: u64,
888    start_pk: Option<OwnedRow>,
889    chunk_size: usize,
890) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
891    let data_types = upstream_table.schema().data_types();
892    let start_pk = start_pk.as_ref();
893    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
894    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
895        upstream_table
896            .batch_iter_vnode_log(
897                start_epoch,
898                HummockReadEpoch::Committed(end_epoch),
899                start_pk,
900                vnode,
901            )
902            .map_ok(move |stream| {
903                let stream = stream.map_err(Into::into);
904                (vnode, stream, 0)
905            })
906    }))
907    .await?;
908    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
909    Ok(VnodeStream::new(
910        vnode_streams,
911        upstream_table.pk_in_output_indices().expect("should exist"),
912        builder,
913    ))
914}
915
916async fn make_snapshot_stream(
917    upstream_table: &BatchTable<impl StateStore>,
918    snapshot_epoch: u64,
919    backfill_state: &BackfillState<impl StateStore>,
920    rate_limit: RateLimit,
921    chunk_size: usize,
922    snapshot_rebuild_interval: Duration,
923) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
924    let data_types = upstream_table.schema().data_types();
925    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
926        move |(vnode, progress)| {
927            let start_pk = match progress {
928                None => Some((None, 0)),
929                Some(VnodeBackfillProgress {
930                    row_count,
931                    progress: EpochBackfillProgress::Consuming { latest_pk },
932                    ..
933                }) => Some((Some(latest_pk), *row_count)),
934                Some(VnodeBackfillProgress {
935                    progress: EpochBackfillProgress::Consumed,
936                    ..
937                }) => None,
938            };
939            start_pk.map(|(start_pk, row_count)| {
940                upstream_table
941                    .batch_iter_vnode(
942                        HummockReadEpoch::Committed(snapshot_epoch),
943                        start_pk,
944                        vnode,
945                        PrefetchOptions::prefetch_for_large_range_scan(),
946                        snapshot_rebuild_interval,
947                    )
948                    .map_ok(move |stream| {
949                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
950                        (vnode, stream, row_count)
951                    })
952            })
953        },
954    ))
955    .await?;
956    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
957    Ok(VnodeStream::new(
958        vnode_streams,
959        upstream_table.pk_in_output_indices().expect("should exist"),
960        builder,
961    ))
962}
963
964#[expect(clippy::too_many_arguments)]
965#[try_stream(ok = Message, error = StreamExecutorError)]
966async fn make_consume_snapshot_stream<'a, S: StateStore>(
967    upstream_table: &'a BatchTable<S>,
968    snapshot_epoch: u64,
969    chunk_size: usize,
970    rate_limit: &'a mut RateLimit,
971    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
972    progress: &'a mut CreateMviewProgressReporter,
973    backfill_state: &'a mut BackfillState<S>,
974    first_recv_barrier_epoch: EpochPair,
975    initial_backfill_paused: bool,
976    actor_ctx: &'a ActorContextRef,
977) {
978    let mut barrier_epoch = first_recv_barrier_epoch;
979
980    // start consume upstream snapshot
981    let mut snapshot_stream = make_snapshot_stream(
982        upstream_table,
983        snapshot_epoch,
984        &*backfill_state,
985        *rate_limit,
986        chunk_size,
987        actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
988    )
989    .await?;
990
991    async fn select_barrier_and_snapshot_stream(
992        barrier_rx: &mut UnboundedReceiver<Barrier>,
993        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
994        throttle_snapshot_stream: bool,
995        backfill_paused: bool,
996    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
997        select!(
998            result = receive_next_barrier(barrier_rx) => {
999                Ok(Either::Left(result?))
1000            },
1001            result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
1002                Ok(Either::Right(result?))
1003            }
1004        )
1005    }
1006
1007    let mut count = 0;
1008    let mut epoch_row_count = 0;
1009    let mut backfill_paused = initial_backfill_paused;
1010    loop {
1011        let throttle_snapshot_stream = epoch_row_count as u64 >= rate_limit.to_u64();
1012        match select_barrier_and_snapshot_stream(
1013            barrier_rx,
1014            &mut snapshot_stream,
1015            throttle_snapshot_stream,
1016            backfill_paused,
1017        )
1018        .await?
1019        {
1020            Either::Left(barrier) => {
1021                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1022                barrier_epoch = barrier.epoch;
1023
1024                if barrier_epoch.curr >= snapshot_epoch {
1025                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
1026                }
1027                if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
1028                    backfill_paused = false;
1029                }
1030                if let Some(chunk) = snapshot_stream.consume_builder() {
1031                    count += chunk.cardinality();
1032                    epoch_row_count += chunk.cardinality();
1033                    yield Message::Chunk(chunk);
1034                }
1035                snapshot_stream
1036                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1037                        if let Some(pk) = pk_progress {
1038                            backfill_state.update_epoch_progress(
1039                                vnode,
1040                                snapshot_epoch,
1041                                row_count,
1042                                pk,
1043                            );
1044                        } else {
1045                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1046                        }
1047                    })
1048                    .await?;
1049                let post_commit = backfill_state.commit(barrier.epoch).await?;
1050                trace!(?barrier_epoch, count, epoch_row_count, "update progress");
1051                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
1052                epoch_row_count = 0;
1053
1054                let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
1055                    if let Mutation::Throttle(config) = &**m
1056                        && let Some(config) = config.get(&actor_ctx.fragment_id)
1057                        && config.throttle_type() == PbThrottleType::Backfill
1058                    {
1059                        Some(config.rate_limit)
1060                    } else {
1061                        None
1062                    }
1063                });
1064                yield Message::Barrier(barrier);
1065                post_commit.post_yield_barrier(None).await?;
1066
1067                if let Some(new_rate_limit) = new_rate_limit {
1068                    let new_rate_limit = new_rate_limit.into();
1069                    *rate_limit = new_rate_limit;
1070                    snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
1071                }
1072            }
1073            Either::Right(Some(chunk)) => {
1074                if backfill_paused {
1075                    return Err(
1076                        anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
1077                    );
1078                }
1079                count += chunk.cardinality();
1080                epoch_row_count += chunk.cardinality();
1081                yield Message::Chunk(chunk);
1082            }
1083            Either::Right(None) => {
1084                break;
1085            }
1086        }
1087    }
1088
1089    // finish consuming upstream snapshot, report finish
1090    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
1091    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
1092    barrier_epoch = barrier_to_report_finish.epoch;
1093    trace!(?barrier_epoch, count, "report finish");
1094    snapshot_stream
1095        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1096            assert_eq!(pk_progress, None);
1097            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1098        })
1099        .await?;
1100    let post_commit = backfill_state.commit(barrier_epoch).await?;
1101    progress.finish(barrier_epoch, count as _);
1102    yield Message::Barrier(barrier_to_report_finish);
1103    post_commit.post_yield_barrier(None).await?;
1104
1105    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
1106    loop {
1107        let barrier = receive_next_barrier(barrier_rx).await?;
1108        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1109        barrier_epoch = barrier.epoch;
1110        let post_commit = backfill_state.commit(barrier.epoch).await?;
1111        yield Message::Barrier(barrier);
1112        post_commit.post_yield_barrier(None).await?;
1113        if barrier_epoch.curr == snapshot_epoch {
1114            break;
1115        }
1116    }
1117    trace!(?barrier_epoch, "finish consuming snapshot");
1118}
1119
1120#[cfg(test)]
1121mod tests {
1122    use std::collections::HashSet;
1123    use std::sync::Arc;
1124
1125    use risingwave_common::array::StreamChunk;
1126    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1127    use risingwave_common::row::OwnedRow;
1128    use risingwave_common::test_prelude::StreamChunkTestExt;
1129    use risingwave_common::types::DataType;
1130    use risingwave_common::util::epoch::{EpochPair, test_epoch};
1131    use risingwave_common::util::sort_util::OrderType;
1132    use risingwave_hummock_test::test_utils::{HummockTestEnv, prepare_hummock_test_env};
1133    use risingwave_rpc_client::HummockMetaClient;
1134    use risingwave_storage::hummock::HummockStorage;
1135    use risingwave_storage::table::batch_table::BatchTable;
1136    use tokio::sync::mpsc::unbounded_channel;
1137    use tokio::time::{Duration, timeout};
1138
1139    use super::*;
1140    use crate::common::table::state_table::{
1141        StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
1142    };
1143    use crate::common::table::test_utils::gen_pbtable_with_value_indices;
1144    use crate::executor::exchange::input::{Input, LocalInput};
1145    use crate::executor::exchange::permit::channel_for_test;
1146    use crate::executor::{ActorContext, DispatcherMessage, ExecutorInfo, MergeExecutorUpstream};
1147    use crate::task::LocalBarrierManager;
1148
1149    const SOURCE_TABLE_ID: TableId = TableId::new(0x233);
1150    const PROGRESS_TABLE_ID: TableId = TableId::new(0x234);
1151
1152    fn source_table_pb() -> risingwave_pb::catalog::PbTable {
1153        gen_pbtable_with_value_indices(
1154            SOURCE_TABLE_ID,
1155            vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1156            vec![OrderType::ascending()],
1157            vec![0],
1158            0,
1159            vec![0],
1160        )
1161    }
1162
1163    fn progress_table_pb() -> risingwave_pb::catalog::PbTable {
1164        gen_pbtable_with_value_indices(
1165            PROGRESS_TABLE_ID,
1166            vec![
1167                ColumnDesc::unnamed(ColumnId::new(0), DataType::Int16),
1168                ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1169                ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1170                ColumnDesc::unnamed(ColumnId::new(3), DataType::Boolean),
1171                ColumnDesc::unnamed(ColumnId::new(4), DataType::Int64),
1172            ],
1173            vec![OrderType::ascending()],
1174            vec![0],
1175            1,
1176            vec![1, 2, 3, 4],
1177        )
1178    }
1179
1180    fn source_batch_table(store: HummockStorage) -> BatchTable<HummockStorage> {
1181        BatchTable::for_test(
1182            store,
1183            SOURCE_TABLE_ID,
1184            vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1185            vec![OrderType::ascending()],
1186            vec![0],
1187            vec![0],
1188        )
1189    }
1190
1191    async fn source_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1192        StateTableBuilder::new(&source_table_pb(), store, None)
1193            .with_op_consistency_level(StateTableOpConsistencyLevel::LogStoreEnabled)
1194            .forbid_preload_all_rows()
1195            .build()
1196            .await
1197    }
1198
1199    async fn progress_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1200        StateTable::from_table_catalog(&progress_table_pb(), store, None).await
1201    }
1202
1203    async fn commit_insert_epoch(
1204        test_env: &HummockTestEnv,
1205        source_state_table: &mut StateTable<HummockStorage>,
1206        epoch: &mut EpochPair,
1207        table_ids: HashSet<TableId>,
1208        values: &[i64],
1209    ) {
1210        for value in values {
1211            source_state_table.insert(OwnedRow::new(vec![Some((*value).into())]));
1212        }
1213        epoch.inc_for_test();
1214        test_env.storage.start_epoch(epoch.curr, table_ids);
1215        source_state_table.commit_for_test(*epoch).await.unwrap();
1216        let res = test_env
1217            .storage
1218            .seal_and_sync_epoch(epoch.prev, HashSet::from_iter([SOURCE_TABLE_ID]))
1219            .await
1220            .unwrap();
1221        test_env
1222            .meta_client
1223            .commit_epoch_with_change_log(epoch.prev, res, Some(vec![epoch.prev]))
1224            .await
1225            .unwrap();
1226        test_env
1227            .storage
1228            .wait_version(test_env.manager.get_current_version().await)
1229            .await;
1230    }
1231
1232    fn start_progress_epochs(test_env: &HummockTestEnv, max_epoch: u64) {
1233        for epoch in 1..=max_epoch {
1234            test_env
1235                .storage
1236                .start_epoch(test_epoch(epoch), HashSet::from_iter([PROGRESS_TABLE_ID]));
1237        }
1238    }
1239
1240    fn make_upstream_input(
1241        barrier_manager: LocalBarrierManager,
1242        actor_ctx: ActorContextRef,
1243        rx: crate::executor::exchange::permit::Receiver,
1244    ) -> MergeExecutorInput {
1245        MergeExecutorInput::new(
1246            MergeExecutorUpstream::Singleton(LocalInput::new(rx, 1001.into()).boxed_input()),
1247            actor_ctx,
1248            1919.into(),
1249            barrier_manager,
1250            Arc::new(StreamingMetrics::unused()),
1251            ExecutorInfo::for_test(
1252                Schema::new(vec![Field::unnamed(DataType::Int64)]),
1253                vec![0],
1254                "SnapshotBackfillUpstream".to_owned(),
1255                0,
1256            ),
1257        )
1258    }
1259
1260    async fn expect_barrier_with_timeout(
1261        executor: &mut BoxedMessageStream,
1262        reason: &str,
1263    ) -> Barrier {
1264        let message = timeout(Duration::from_secs(10), executor.next())
1265            .await
1266            .unwrap_or_else(|_| panic!("timed out waiting for barrier: {reason}"))
1267            .unwrap()
1268            .unwrap();
1269        match message {
1270            Message::Barrier(barrier) => barrier,
1271            other => panic!("expected barrier for {reason}, got {other:?}"),
1272        }
1273    }
1274
1275    async fn expect_chunk_with_timeout(
1276        executor: &mut BoxedMessageStream,
1277        reason: &str,
1278    ) -> StreamChunk {
1279        let message = timeout(Duration::from_secs(10), executor.next())
1280            .await
1281            .unwrap_or_else(|_| panic!("timed out waiting for chunk: {reason}"))
1282            .unwrap()
1283            .unwrap();
1284        match message {
1285            Message::Chunk(chunk) => chunk,
1286            other => panic!("expected chunk for {reason}, got {other:?}"),
1287        }
1288    }
1289
1290    async fn expect_pending_with_timeout(executor: &mut BoxedMessageStream, reason: &str) {
1291        assert!(
1292            timeout(Duration::from_millis(200), executor.next())
1293                .await
1294                .is_err(),
1295            "executor unexpectedly produced a message while waiting for {reason}"
1296        );
1297    }
1298
1299    #[tokio::test]
1300    async fn test_snapshot_backfill_without_upstream_on_hummock() {
1301        let source_env = prepare_hummock_test_env().await;
1302        source_env.register_table(source_table_pb()).await;
1303        let progress_env = prepare_hummock_test_env().await;
1304        progress_env.register_table(progress_table_pb()).await;
1305
1306        let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1307        let source_table = source_batch_table(source_env.storage.clone());
1308        let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1309
1310        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1311        source_env
1312            .storage
1313            .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1314        source_state_table.init_epoch(epoch).await.unwrap();
1315
1316        commit_insert_epoch(
1317            &source_env,
1318            &mut source_state_table,
1319            &mut epoch,
1320            HashSet::from_iter([SOURCE_TABLE_ID]),
1321            &[1],
1322        )
1323        .await;
1324        commit_insert_epoch(
1325            &source_env,
1326            &mut source_state_table,
1327            &mut epoch,
1328            HashSet::from_iter([SOURCE_TABLE_ID]),
1329            &[2],
1330        )
1331        .await;
1332        commit_insert_epoch(
1333            &source_env,
1334            &mut source_state_table,
1335            &mut epoch,
1336            HashSet::from_iter([SOURCE_TABLE_ID]),
1337            &[3],
1338        )
1339        .await;
1340        commit_insert_epoch(
1341            &source_env,
1342            &mut source_state_table,
1343            &mut epoch,
1344            HashSet::from_iter([SOURCE_TABLE_ID]),
1345            &[],
1346        )
1347        .await;
1348        start_progress_epochs(&progress_env, 5);
1349
1350        let barrier_manager = LocalBarrierManager::for_test();
1351        let progress = CreateMviewProgressReporter::for_test(barrier_manager);
1352        let actor_ctx = ActorContext::for_test(1234);
1353        let (barrier_tx, barrier_rx) = unbounded_channel();
1354        barrier_tx
1355            .send(Barrier::new_test_barrier(test_epoch(1)))
1356            .unwrap();
1357
1358        let mut executor = SnapshotBackfillExecutor::new(
1359            source_table,
1360            progress_state_table,
1361            None,
1362            vec![0],
1363            actor_ctx,
1364            progress,
1365            1024,
1366            RateLimit::Disabled,
1367            barrier_rx,
1368            Arc::new(StreamingMetrics::unused()),
1369            Some(test_epoch(3)),
1370        )
1371        .boxed()
1372        .execute();
1373
1374        assert_eq!(
1375            expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1376                .await
1377                .epoch,
1378            Barrier::new_test_barrier(test_epoch(1)).epoch
1379        );
1380        assert_eq!(
1381            expect_chunk_with_timeout(&mut executor, "snapshot chunk without upstream").await,
1382            StreamChunk::from_pretty(
1383                " I
1384                + 1
1385                + 2
1386                + 3"
1387            )
1388        );
1389        expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1390
1391        barrier_tx
1392            .send(Barrier::new_test_barrier(test_epoch(2)))
1393            .unwrap();
1394        assert_eq!(
1395            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1396                .await
1397                .epoch,
1398            Barrier::new_test_barrier(test_epoch(2)).epoch
1399        );
1400
1401        barrier_tx
1402            .send(Barrier::new_test_barrier(test_epoch(3)))
1403            .unwrap();
1404        assert_eq!(
1405            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1406                .await
1407                .epoch,
1408            Barrier::new_test_barrier(test_epoch(3)).epoch
1409        );
1410
1411        barrier_tx
1412            .send(Barrier::new_test_barrier(test_epoch(4)))
1413            .unwrap();
1414        assert_eq!(
1415            expect_barrier_with_timeout(&mut executor, "post-snapshot barrier 4")
1416                .await
1417                .epoch,
1418            Barrier::new_test_barrier(test_epoch(4)).epoch
1419        );
1420
1421        barrier_tx
1422            .send(Barrier::new_test_barrier(test_epoch(5)))
1423            .unwrap();
1424        assert_eq!(
1425            expect_barrier_with_timeout(&mut executor, "steady-state barrier 5")
1426                .await
1427                .epoch,
1428            Barrier::new_test_barrier(test_epoch(5)).epoch
1429        );
1430
1431        expect_pending_with_timeout(&mut executor, "next local barrier").await;
1432    }
1433
1434    #[tokio::test]
1435    async fn test_snapshot_backfill_with_upstream_on_hummock() {
1436        let source_env = prepare_hummock_test_env().await;
1437        source_env.register_table(source_table_pb()).await;
1438        let progress_env = prepare_hummock_test_env().await;
1439        progress_env.register_table(progress_table_pb()).await;
1440
1441        let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1442        let source_table = source_batch_table(source_env.storage.clone());
1443        let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1444
1445        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1446        source_env
1447            .storage
1448            .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1449        source_state_table.init_epoch(epoch).await.unwrap();
1450
1451        commit_insert_epoch(
1452            &source_env,
1453            &mut source_state_table,
1454            &mut epoch,
1455            HashSet::from_iter([SOURCE_TABLE_ID]),
1456            &[],
1457        )
1458        .await;
1459        commit_insert_epoch(
1460            &source_env,
1461            &mut source_state_table,
1462            &mut epoch,
1463            HashSet::from_iter([SOURCE_TABLE_ID]),
1464            &[],
1465        )
1466        .await;
1467        commit_insert_epoch(
1468            &source_env,
1469            &mut source_state_table,
1470            &mut epoch,
1471            HashSet::from_iter([SOURCE_TABLE_ID]),
1472            &[],
1473        )
1474        .await;
1475        commit_insert_epoch(
1476            &source_env,
1477            &mut source_state_table,
1478            &mut epoch,
1479            HashSet::from_iter([SOURCE_TABLE_ID]),
1480            &[4],
1481        )
1482        .await;
1483        start_progress_epochs(&progress_env, 6);
1484
1485        let barrier_manager = LocalBarrierManager::for_test();
1486        let progress = CreateMviewProgressReporter::for_test(barrier_manager.clone());
1487        let actor_ctx = ActorContext::for_test(1235);
1488        let (barrier_tx, barrier_rx) = unbounded_channel();
1489        let (upstream_tx, upstream_rx) = channel_for_test();
1490
1491        upstream_tx
1492            .send(
1493                DispatcherMessage::Barrier(
1494                    Barrier::new_test_barrier(test_epoch(5)).into_dispatcher(),
1495                )
1496                .into(),
1497            )
1498            .await
1499            .unwrap();
1500        barrier_tx
1501            .send(Barrier::new_test_barrier(test_epoch(1)))
1502            .unwrap();
1503
1504        let mut executor = SnapshotBackfillExecutor::new(
1505            source_table,
1506            progress_state_table,
1507            Some(make_upstream_input(
1508                barrier_manager,
1509                actor_ctx.clone(),
1510                upstream_rx,
1511            )),
1512            vec![0],
1513            actor_ctx,
1514            progress,
1515            1024,
1516            RateLimit::Disabled,
1517            barrier_rx,
1518            Arc::new(StreamingMetrics::unused()),
1519            Some(test_epoch(3)),
1520        )
1521        .boxed()
1522        .execute();
1523
1524        assert_eq!(
1525            expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1526                .await
1527                .epoch,
1528            Barrier::new_test_barrier(test_epoch(1)).epoch
1529        );
1530        expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1531        barrier_tx
1532            .send(Barrier::new_test_barrier(test_epoch(2)))
1533            .unwrap();
1534        assert_eq!(
1535            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1536                .await
1537                .epoch,
1538            Barrier::new_test_barrier(test_epoch(2)).epoch
1539        );
1540
1541        barrier_tx
1542            .send(Barrier::new_test_barrier(test_epoch(3)))
1543            .unwrap();
1544        assert_eq!(
1545            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1546                .await
1547                .epoch,
1548            Barrier::new_test_barrier(test_epoch(3)).epoch
1549        );
1550
1551        barrier_tx
1552            .send(Barrier::new_test_barrier(test_epoch(4)))
1553            .unwrap();
1554        assert_eq!(
1555            expect_barrier_with_timeout(&mut executor, "snapshot completion barrier 4")
1556                .await
1557                .epoch,
1558            Barrier::new_test_barrier(test_epoch(4)).epoch
1559        );
1560
1561        barrier_tx
1562            .send(Barrier::new_test_barrier(test_epoch(5)))
1563            .unwrap();
1564        assert_eq!(
1565            expect_chunk_with_timeout(&mut executor, "log-store replay chunk").await,
1566            StreamChunk::from_pretty(
1567                " I
1568                + 4"
1569            )
1570        );
1571        assert_eq!(
1572            expect_barrier_with_timeout(&mut executor, "log-store completion barrier")
1573                .await
1574                .epoch,
1575            Barrier::new_test_barrier(test_epoch(5)).epoch
1576        );
1577
1578        upstream_tx
1579            .send(DispatcherMessage::Chunk(StreamChunk::from_pretty(" I\n + 5")).into())
1580            .await
1581            .unwrap();
1582        let stop_barrier = Barrier::new_test_barrier(test_epoch(6)).with_stop();
1583        upstream_tx
1584            .send(DispatcherMessage::Barrier(stop_barrier.clone().into_dispatcher()).into())
1585            .await
1586            .unwrap();
1587        barrier_tx.send(stop_barrier.clone()).unwrap();
1588
1589        assert_eq!(
1590            expect_chunk_with_timeout(&mut executor, "live upstream chunk after handoff").await,
1591            StreamChunk::from_pretty(" I\n + 5")
1592        );
1593        assert_eq!(
1594            expect_barrier_with_timeout(&mut executor, "final stop barrier")
1595                .await
1596                .epoch,
1597            stop_barrier.epoch
1598        );
1599    }
1600}