risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::batch_plan::ScanRange;
33use risingwave_pb::common::PbThrottleType;
34use risingwave_storage::StateStore;
35use risingwave_storage::store::PrefetchOptions;
36use risingwave_storage::table::ChangeLogRow;
37use risingwave_storage::table::batch_table::{BatchTable, PkScanRange};
38use tokio::select;
39use tokio::sync::mpsc::UnboundedReceiver;
40use tokio::time::sleep;
41
42use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
43use crate::executor::backfill::snapshot_backfill::state::{
44    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
45};
46use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
47use crate::executor::backfill::utils::{
48    UpstreamStreamKeyUpdateNormalizer, create_builder, mapping_message,
49};
50use crate::executor::monitor::StreamingMetrics;
51use crate::executor::prelude::{StateTable, StreamExt, try_stream};
52use crate::executor::{
53    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
54    MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
55    expect_first_barrier,
56};
57use crate::task::CreateMviewProgressReporter;
58
59pub struct SnapshotBackfillExecutor<S: StateStore> {
60    /// Upstream table
61    upstream_table: BatchTable<S>,
62
63    /// Backfill progress table
64    progress_state_table: StateTable<S>,
65
66    /// Upstream with the same schema with the upstream table.
67    upstream: Option<MergeExecutorInput>,
68
69    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
70    output_indices: Vec<usize>,
71
72    /// Current executor stream-key indices in the output schema.
73    stream_key: Vec<usize>,
74
75    progress: CreateMviewProgressReporter,
76
77    chunk_size: usize,
78    rate_limit: RateLimit,
79
80    barrier_rx: UnboundedReceiver<Barrier>,
81
82    actor_ctx: ActorContextRef,
83    metrics: Arc<StreamingMetrics>,
84
85    snapshot_epoch: Option<u64>,
86    /// (`eq_prefix`, `range_bounds`) for pk scan range pushdown.
87    pk_scan_range: PkScanRange,
88}
89
90impl<S: StateStore> SnapshotBackfillExecutor<S> {
91    fn build_pk_scan_range(
92        pb_scan_range: Option<&ScanRange>,
93        upstream_table: &BatchTable<S>,
94    ) -> StreamExecutorResult<PkScanRange> {
95        match pb_scan_range {
96            Some(scan_range) => Ok(PkScanRange::new(
97                scan_range.clone(),
98                upstream_table.pk_serializer().get_data_types().to_vec(),
99            )?),
100            None => Ok(PkScanRange::full()),
101        }
102    }
103
104    #[expect(clippy::too_many_arguments)]
105    pub(crate) fn new(
106        upstream_table: BatchTable<S>,
107        progress_state_table: StateTable<S>,
108        upstream: Option<MergeExecutorInput>,
109        pb_pk_scan_range: Option<&ScanRange>,
110        output_indices: Vec<usize>,
111        stream_key: Vec<usize>,
112        actor_ctx: ActorContextRef,
113        progress: CreateMviewProgressReporter,
114        chunk_size: usize,
115        rate_limit: RateLimit,
116        barrier_rx: UnboundedReceiver<Barrier>,
117        metrics: Arc<StreamingMetrics>,
118        snapshot_epoch: Option<u64>,
119    ) -> StreamExecutorResult<Self> {
120        if let Some(upstream) = &upstream {
121            assert_eq!(&upstream.info.schema, upstream_table.schema());
122        }
123        if upstream_table.pk_in_output_indices().is_none() {
124            panic!(
125                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
126                upstream_table.pk_indices(),
127                upstream_table.output_indices(),
128                upstream_table.schema()
129            )
130        };
131        assert!(
132            stream_key.iter().all(|idx| *idx < output_indices.len()),
133            "stream key indices should refer to output schema: stream_key: {:?}, output_indices: {:?}",
134            stream_key,
135            output_indices
136        );
137        let pk_scan_range = Self::build_pk_scan_range(pb_pk_scan_range, &upstream_table)?;
138        if !matches!(rate_limit, RateLimit::Disabled) {
139            trace!(
140                ?rate_limit,
141                "create snapshot backfill executor with rate limit"
142            );
143        }
144        Ok(Self {
145            upstream_table,
146            progress_state_table,
147            upstream,
148            output_indices,
149            stream_key,
150            progress,
151            chunk_size,
152            rate_limit,
153            barrier_rx,
154            actor_ctx,
155            metrics,
156            snapshot_epoch,
157            pk_scan_range,
158        })
159    }
160
161    #[try_stream(ok = Message, error = StreamExecutorError)]
162    async fn execute_inner(mut self) {
163        trace!("snapshot backfill executor start");
164        let upstream = if let Some(mut upstream) = self.upstream {
165            let first_upstream_barrier = expect_first_barrier(&mut upstream).await?;
166            trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
167            Some((first_upstream_barrier, upstream))
168        } else {
169            None
170        };
171        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
172        trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
173        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
174            self.snapshot_epoch
175        {
176            if let Some((first_upstream_barrier, _)) = &upstream {
177                if first_upstream_barrier.epoch != first_recv_barrier.epoch {
178                    assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
179                    Some(snapshot_epoch)
180                } else {
181                    None
182                }
183            } else {
184                // must go through snapshot backfill when having no upstream
185                Some(snapshot_epoch)
186            }
187        } else {
188            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
189            if cfg!(debug_assertions) {
190                panic!(
191                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
192                    upstream.map(|(first_upstream_barrier, _)| first_upstream_barrier.epoch),
193                    first_recv_barrier.epoch
194                );
195            } else {
196                let (first_upstream_barrier, _) = upstream
197                    .as_ref()
198                    .ok_or_else(|| anyhow!("no upstream while snapshot epoch not set"))?;
199                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
200                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
201                None
202            }
203        };
204        let first_recv_barrier_epoch = first_recv_barrier.epoch;
205        let initial_backfill_paused =
206            first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
207        yield Message::Barrier(first_recv_barrier);
208        let mut backfill_state = BackfillState::new(
209            self.progress_state_table,
210            first_recv_barrier_epoch,
211            self.upstream_table.pk_serializer().clone(),
212        )
213        .await?;
214
215        let (mut barrier_epoch, mut need_report_finish, upstream) = {
216            if let Some(snapshot_epoch) = should_snapshot_backfill {
217                let table_id_str = format!("{}", self.upstream_table.table_id());
218                let actor_id_str = format!("{}", self.actor_ctx.id);
219
220                let consume_upstream_row_count = self
221                    .metrics
222                    .snapshot_backfill_consume_row_count
223                    .with_guarded_label_values(&[
224                        table_id_str.as_str(),
225                        actor_id_str.as_str(),
226                        "consume_upstream",
227                    ]);
228
229                let mut upstream_buffer = if let Some((first_upstream_barrier, upstream)) = upstream
230                {
231                    SnapshotBackfillUpstream::Buffer(UpstreamBuffer::new(
232                        upstream,
233                        first_upstream_barrier,
234                        consume_upstream_row_count,
235                    ))
236                } else {
237                    SnapshotBackfillUpstream::Empty
238                };
239
240                // Phase 1: consume upstream snapshot
241                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
242                    < snapshot_epoch
243                {
244                    trace!(
245                        table_id = %self.upstream_table.table_id(),
246                        snapshot_epoch,
247                        barrier_epoch = ?first_recv_barrier_epoch,
248                        "start consuming snapshot"
249                    );
250                    {
251                        let consuming_snapshot_row_count = self
252                            .metrics
253                            .snapshot_backfill_consume_row_count
254                            .with_guarded_label_values(&[
255                                table_id_str.as_str(),
256                                actor_id_str.as_str(),
257                                "consuming_snapshot",
258                            ]);
259                        let snapshot_stream = make_consume_snapshot_stream(
260                            &self.upstream_table,
261                            snapshot_epoch,
262                            self.chunk_size,
263                            &mut self.rate_limit,
264                            &mut self.barrier_rx,
265                            &mut self.progress,
266                            &mut backfill_state,
267                            first_recv_barrier_epoch,
268                            initial_backfill_paused,
269                            &self.actor_ctx,
270                            &self.pk_scan_range,
271                        );
272
273                        pin_mut!(snapshot_stream);
274
275                        while let Some(message) = upstream_buffer
276                            .run_future(snapshot_stream.try_next())
277                            .await?
278                        {
279                            if let Message::Chunk(chunk) = &message {
280                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
281                            }
282                            yield message;
283                        }
284                    }
285
286                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
287                    let recv_barrier_epoch = recv_barrier.epoch;
288                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
289                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
290                    yield Message::Barrier(recv_barrier);
291                    post_commit.post_yield_barrier(None).await?;
292                    (
293                        recv_barrier_epoch,
294                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
295                    )
296                } else {
297                    trace!(
298                        table_id = %self.upstream_table.table_id(),
299                        snapshot_epoch,
300                        barrier_epoch = ?first_recv_barrier_epoch,
301                        "skip consuming snapshot"
302                    );
303                    (
304                        first_recv_barrier_epoch,
305                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
306                    )
307                };
308
309                // Phase 2: consume upstream log store
310                match upstream_buffer {
311                    Either::Left(mut upstream_buffer) => {
312                        let initial_pending_lag =
313                            if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
314                                &upstream_buffer
315                            {
316                                Some(Duration::from_millis(
317                                    Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
318                                ))
319                            } else {
320                                None
321                            };
322                        trace!(
323                            ?barrier_epoch,
324                            table_id = %self.upstream_table.table_id(),
325                            ?initial_pending_lag,
326                            "start consuming log store"
327                        );
328
329                        let consuming_log_store_row_count = self
330                            .metrics
331                            .snapshot_backfill_consume_row_count
332                            .with_guarded_label_values(&[
333                                table_id_str.as_str(),
334                                actor_id_str.as_str(),
335                                "consuming_log_store",
336                            ]);
337                        let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
338                        loop {
339                            let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
340                            assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
341                            let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
342                            // Disable calling next_epoch, because, if barrier_epoch.prev is a checkpoint epoch,
343                            // next_epoch(barrier_epoch.prev) is actually waiting for the committed epoch.
344                            // However, upstream_buffer's is_polling_epoch_data can be false, since just received
345                            // the checkpoint barrier_epoch.prev. And then the upstream_buffer may stop polling upstream
346                            // when the max_pending_epoch_lag is small. When upstream is not polled, the barrier of the next
347                            // committed epoch cannot be collected.
348                            // {
349                            //     // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
350                            //     // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
351                            //     let next_prev_epoch = upstream_buffer
352                            //         .run_future(self.upstream_table.next_epoch(barrier_epoch.prev))
353                            //         .await?;
354                            //     assert_eq!(next_prev_epoch, barrier.epoch.prev);
355                            // }
356                            barrier_epoch = barrier.epoch;
357                            if barrier.kind.is_checkpoint() {
358                                let pending_non_checkpoint_barrier =
359                                    take(&mut pending_non_checkpoint_barrier);
360                                let end_epoch = barrier_epoch.prev;
361                                let start_epoch = pending_non_checkpoint_barrier
362                                    .first()
363                                    .map(|epoch| epoch.prev)
364                                    .unwrap_or(end_epoch);
365                                trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
366                                // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
367                                // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
368                                // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
369                                let mut stream = upstream_buffer
370                                    .run_future(make_log_stream(
371                                        &self.upstream_table,
372                                        start_epoch,
373                                        end_epoch,
374                                        None,
375                                        self.chunk_size,
376                                    ))
377                                    .await?;
378                                while let Some(chunk) =
379                                    upstream_buffer.run_future(stream.try_next()).await?
380                                {
381                                    trace!(
382                                        ?barrier_epoch,
383                                        size = chunk.cardinality(),
384                                        "consume change log yield chunk",
385                                    );
386                                    consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
387                                    yield Message::Chunk(chunk);
388                                }
389
390                                trace!(?barrier_epoch, "after consume change log");
391
392                                stream
393                                    .for_vnode_pk_progress(|vnode, row_count, progress| {
394                                        assert_eq!(progress, None);
395                                        backfill_state.finish_epoch(
396                                            vnode,
397                                            barrier.epoch.prev,
398                                            row_count,
399                                        );
400                                    })
401                                    .await?;
402                            } else {
403                                pending_non_checkpoint_barrier.push(barrier.epoch);
404                            }
405
406                            if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
407                                &upstream_buffer
408                            {
409                                if is_finished {
410                                    assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
411                                    assert!(pending_non_checkpoint_barrier.is_empty());
412                                    self.progress.finish_consuming_log_store(barrier.epoch);
413                                } else {
414                                    self.progress.update_create_mview_log_store_progress(
415                                        barrier.epoch,
416                                        upstream_buffer.pending_epoch_lag(),
417                                    );
418                                }
419                            }
420
421                            let post_commit = backfill_state.commit(barrier.epoch).await?;
422                            let update_vnode_bitmap =
423                                barrier.as_update_vnode_bitmap(self.actor_ctx.id);
424                            yield Message::Barrier(barrier);
425                            post_commit.post_yield_barrier(None).await?;
426                            if update_vnode_bitmap.is_some() {
427                                return Err(anyhow!(
428                                    "should not update vnode bitmap during consuming log store"
429                                )
430                                .into());
431                            }
432
433                            if is_finished {
434                                assert!(
435                                    pending_non_checkpoint_barrier.is_empty(),
436                                    "{pending_non_checkpoint_barrier:?}"
437                                );
438                                break;
439                            }
440                        }
441                        trace!(
442                            ?barrier_epoch,
443                            table_id = %self.upstream_table.table_id(),
444                            "finish consuming log store"
445                        );
446
447                        (
448                            barrier_epoch,
449                            false,
450                            upstream_buffer.start_consuming_upstream(),
451                        )
452                    }
453                    Either::Right(upstream) => {
454                        trace!(
455                            ?barrier_epoch,
456                            table_id = %self.upstream_table.table_id(),
457                            "skip consuming log store and start consuming upstream directly"
458                        );
459
460                        (barrier_epoch, true, upstream)
461                    }
462                }
463            } else {
464                let (first_upstream_barrier, _) = upstream
465                    .as_ref()
466                    .expect("should have upstream when skipping snapshot backfill");
467                backfill_state
468                    .latest_progress()
469                    .for_each(|(vnode, progress)| {
470                        let progress = progress.expect("should not be empty");
471                        assert_eq!(
472                            progress.epoch, first_upstream_barrier.epoch.prev,
473                            "vnode: {:?}",
474                            vnode
475                        );
476                        assert_eq!(
477                            progress.progress,
478                            EpochBackfillProgress::Consumed,
479                            "vnode: {:?}",
480                            vnode
481                        );
482                    });
483                trace!(
484                    table_id = %self.upstream_table.table_id(),
485                    "skip backfill"
486                );
487                let (first_upstream_barrier, upstream) =
488                    upstream.expect("should have upstream when skipping snapshot backfill");
489                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
490                (first_upstream_barrier.epoch, true, upstream)
491            }
492        };
493        let current_stream_key_indices = self
494            .stream_key
495            .iter()
496            .map(|idx| self.output_indices[*idx])
497            .collect();
498        let update_normalizer = UpstreamStreamKeyUpdateNormalizer::new(
499            &upstream.info.stream_key,
500            current_stream_key_indices,
501        );
502        let mut upstream = upstream.into_executor(self.barrier_rx).execute();
503        let mut epoch_row_count = 0;
504        // Phase 3: consume upstream
505        while let Some(msg) = upstream.try_next().await? {
506            let Some(msg) = update_normalizer.normalize_message(msg) else {
507                continue;
508            };
509            match msg {
510                Message::Barrier(barrier) => {
511                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
512                    self.upstream_table
513                        .vnodes()
514                        .iter_vnodes()
515                        .for_each(|vnode| {
516                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
517                            // executor.
518                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
519                        });
520                    epoch_row_count = 0;
521                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
522                    barrier_epoch = barrier.epoch;
523                    if need_report_finish {
524                        need_report_finish = false;
525                        self.progress.finish_consuming_log_store(barrier_epoch);
526                    }
527                    let post_commit = backfill_state.commit(barrier.epoch).await?;
528                    yield Message::Barrier(barrier);
529                    if let Some(new_vnode_bitmap) =
530                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
531                    {
532                        let _prev_vnode_bitmap =
533                            self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
534                        backfill_state
535                            .latest_progress()
536                            .for_each(|(vnode, progress)| {
537                                let progress = progress.expect("should not be empty");
538                                assert_eq!(
539                                    progress.epoch, barrier_epoch.prev,
540                                    "vnode {:?} has unexpected progress epoch",
541                                    vnode
542                                );
543                                assert_eq!(
544                                    progress.progress,
545                                    EpochBackfillProgress::Consumed,
546                                    "vnode {:?} has unexpected progress",
547                                    vnode
548                                );
549                            });
550                    }
551                }
552                msg => {
553                    if let Message::Chunk(chunk) = &msg {
554                        epoch_row_count += chunk.cardinality();
555                    }
556                    yield msg;
557                }
558            }
559        }
560    }
561}
562
563impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
564    fn execute(self: Box<Self>) -> BoxedMessageStream {
565        let output_indices = self.output_indices.clone();
566        self.execute_inner()
567            .filter_map(move |result| {
568                ready({
569                    match result {
570                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
571                        Err(e) => Some(Err(e)),
572                    }
573                })
574            })
575            .boxed()
576    }
577}
578
579struct ConsumingSnapshot;
580struct ConsumingLogStore;
581
582#[derive(Debug)]
583struct PendingBarriers {
584    first_upstream_barrier_epoch: EpochPair,
585
586    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
587    /// Newer barrier at the front
588    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
589
590    /// In the outer `VecDeque`, newer barriers at the front.
591    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
592    /// and others as non-checkpoint barrier
593    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
594}
595
596impl PendingBarriers {
597    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
598        Self {
599            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
600            pending_non_checkpoint_barriers: Default::default(),
601            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
602                first_upstream_barrier,
603            ])]),
604        }
605    }
606
607    fn add(&mut self, barrier: DispatcherBarrier) {
608        let is_checkpoint = barrier.kind.is_checkpoint();
609        self.pending_non_checkpoint_barriers.push_front(barrier);
610        if is_checkpoint {
611            self.checkpoint_barrier_groups
612                .push_front(take(&mut self.pending_non_checkpoint_barriers));
613        }
614    }
615
616    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
617        self.checkpoint_barrier_groups.pop_back()
618    }
619
620    fn consume_epoch(&mut self, epoch: EpochPair) {
621        let barriers = self
622            .checkpoint_barrier_groups
623            .back_mut()
624            .expect("non-empty");
625        let oldest_upstream_barrier = barriers.back().expect("non-empty");
626        assert!(
627            oldest_upstream_barrier.epoch.prev >= epoch.prev,
628            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
629            oldest_upstream_barrier.epoch,
630            epoch
631        );
632        if oldest_upstream_barrier.epoch.prev == epoch.prev {
633            assert_eq!(oldest_upstream_barrier.epoch, epoch);
634            barriers.pop_back();
635            if barriers.is_empty() {
636                self.checkpoint_barrier_groups.pop_back();
637            }
638        }
639    }
640
641    fn latest_epoch(&self) -> Option<EpochPair> {
642        self.pending_non_checkpoint_barriers
643            .front()
644            .or_else(|| {
645                self.checkpoint_barrier_groups
646                    .front()
647                    .and_then(|barriers| barriers.front())
648            })
649            .map(|barrier| barrier.epoch)
650    }
651
652    fn checkpoint_epoch_count(&self) -> usize {
653        self.checkpoint_barrier_groups.len()
654    }
655
656    fn has_checkpoint_epoch(&self) -> bool {
657        !self.checkpoint_barrier_groups.is_empty()
658    }
659}
660
661enum SnapshotBackfillUpstream<S> {
662    Empty,
663    Buffer(UpstreamBuffer<S>),
664}
665
666impl<S> SnapshotBackfillUpstream<S> {
667    async fn run_future<T, E: Into<StreamExecutorError>>(
668        &mut self,
669        future: impl Future<Output = Result<T, E>>,
670    ) -> StreamExecutorResult<T> {
671        match self {
672            SnapshotBackfillUpstream::Empty => future.await.map_err(Into::into),
673            SnapshotBackfillUpstream::Buffer(buffer) => buffer.run_future(future).await,
674        }
675    }
676}
677
678impl SnapshotBackfillUpstream<ConsumingSnapshot> {
679    fn start_consuming_log_store(
680        self,
681        consumed_epoch: u64,
682    ) -> Either<SnapshotBackfillUpstream<ConsumingLogStore>, MergeExecutorInput> {
683        match self {
684            SnapshotBackfillUpstream::Empty => Either::Left(SnapshotBackfillUpstream::Empty),
685            SnapshotBackfillUpstream::Buffer(buffer) => {
686                match buffer.start_consuming_log_store(consumed_epoch) {
687                    Either::Left(buffer) => Either::Left(SnapshotBackfillUpstream::Buffer(buffer)),
688                    Either::Right(input) => Either::Right(input),
689                }
690            }
691        }
692    }
693}
694
695impl SnapshotBackfillUpstream<ConsumingLogStore> {
696    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
697        match self {
698            SnapshotBackfillUpstream::Empty => Ok(false),
699            SnapshotBackfillUpstream::Buffer(buffer) => buffer.consumed_epoch(epoch).await,
700        }
701    }
702
703    fn start_consuming_upstream(self) -> MergeExecutorInput {
704        match self {
705            SnapshotBackfillUpstream::Empty => {
706                unreachable!("unlike to start consuming upstream when having no upstream")
707            }
708            SnapshotBackfillUpstream::Buffer(buffer) => buffer.start_consuming_upstream(),
709        }
710    }
711}
712
713struct UpstreamBuffer<S> {
714    upstream: MergeExecutorInput,
715    max_pending_epoch_lag: u64,
716    consumed_epoch: u64,
717    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
718    upstream_pending_barriers: PendingBarriers,
719    /// Whether we have started polling any upstream data before the next checkpoint barrier.
720    /// When `true`, we should continue polling until the next checkpoint barrier, because
721    /// some data in this epoch have been discarded and data in this epoch
722    /// must be read from log store
723    is_polling_epoch_data: bool,
724    consume_upstream_row_count: LabelGuardedIntCounter,
725    _phase: S,
726}
727
728impl UpstreamBuffer<ConsumingSnapshot> {
729    fn new(
730        upstream: MergeExecutorInput,
731        first_upstream_barrier: DispatcherBarrier,
732        consume_upstream_row_count: LabelGuardedIntCounter,
733    ) -> Self {
734        Self {
735            upstream,
736            is_polling_epoch_data: false,
737            consume_upstream_row_count,
738            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
739            // no limit on the number of pending barrier in the beginning
740            max_pending_epoch_lag: u64::MAX,
741            consumed_epoch: 0,
742            _phase: ConsumingSnapshot {},
743        }
744    }
745
746    fn start_consuming_log_store(
747        mut self,
748        consumed_epoch: u64,
749    ) -> Either<UpstreamBuffer<ConsumingLogStore>, MergeExecutorInput> {
750        if self
751            .upstream_pending_barriers
752            .first_upstream_barrier_epoch
753            .prev
754            == consumed_epoch
755        {
756            assert_eq!(
757                1,
758                self.upstream_pending_barriers
759                    .pop()
760                    .expect("non-empty")
761                    .len()
762            );
763        }
764        let max_pending_epoch_lag = self.pending_epoch_lag();
765        let buffer = UpstreamBuffer {
766            upstream: self.upstream,
767            upstream_pending_barriers: self.upstream_pending_barriers,
768            max_pending_epoch_lag,
769            is_polling_epoch_data: self.is_polling_epoch_data,
770            consume_upstream_row_count: self.consume_upstream_row_count,
771            consumed_epoch,
772            _phase: ConsumingLogStore {},
773        };
774        if buffer.is_finished() {
775            Either::Right(buffer.upstream)
776        } else {
777            Either::Left(buffer)
778        }
779    }
780}
781
782impl<S> UpstreamBuffer<S> {
783    fn can_consume_upstream(&self) -> bool {
784        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
785    }
786
787    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
788        {
789            loop {
790                if let Err(e) = try {
791                    if !self.can_consume_upstream() {
792                        // pause the future to block consuming upstream
793                        sleep(Duration::from_secs(30)).await;
794                        warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
795                        return pending().await;
796                    }
797                    self.consume_until_next_checkpoint_barrier().await?;
798                } {
799                    break e;
800                }
801            }
802        }
803    }
804
805    /// Consume the upstream until seeing the next barrier.
806    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
807        loop {
808            let msg: DispatcherMessage = self
809                .upstream
810                .try_next()
811                .await?
812                .ok_or_else(|| anyhow!("end of upstream"))?;
813            match msg {
814                DispatcherMessage::Chunk(chunk) => {
815                    self.is_polling_epoch_data = true;
816                    self.consume_upstream_row_count
817                        .inc_by(chunk.cardinality() as _);
818                }
819                DispatcherMessage::Barrier(barrier) => {
820                    let is_checkpoint = barrier.kind.is_checkpoint();
821                    self.upstream_pending_barriers.add(barrier);
822                    if is_checkpoint {
823                        self.is_polling_epoch_data = false;
824                        break;
825                    } else {
826                        self.is_polling_epoch_data = true;
827                    }
828                }
829                DispatcherMessage::Watermark(_) => {
830                    self.is_polling_epoch_data = true;
831                }
832            }
833        }
834        Ok(())
835    }
836}
837
838impl UpstreamBuffer<ConsumingLogStore> {
839    #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
840    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
841        assert!(!self.is_finished());
842        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
843            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
844            // we must have returned true to indicate finish, and should not be called again.
845            assert!(self.is_polling_epoch_data);
846            self.consume_until_next_checkpoint_barrier().await?;
847            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
848        }
849        self.upstream_pending_barriers.consume_epoch(epoch);
850
851        {
852            {
853                let prev_epoch = epoch.prev;
854                assert!(self.consumed_epoch < prev_epoch);
855                let elapsed_epoch = prev_epoch - self.consumed_epoch;
856                self.consumed_epoch = prev_epoch;
857                if self.upstream_pending_barriers.has_checkpoint_epoch() {
858                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
859                    while self.can_consume_upstream()
860                        && let Some(result) =
861                            self.consume_until_next_checkpoint_barrier().now_or_never()
862                    {
863                        result?;
864                    }
865                }
866                // sub to ensure that the lag is monotonically decreasing.
867                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
868                // the upstream can at least progress for one epoch.
869                self.max_pending_epoch_lag = min(
870                    self.pending_epoch_lag(),
871                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
872                );
873            }
874        }
875        Ok(self.is_finished())
876    }
877
878    fn is_finished(&self) -> bool {
879        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
880            assert!(
881                self.upstream_pending_barriers
882                    .pending_non_checkpoint_barriers
883                    .is_empty()
884            )
885        }
886        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
887    }
888
889    fn start_consuming_upstream(self) -> MergeExecutorInput {
890        assert!(self.is_finished());
891        assert_eq!(self.pending_epoch_lag(), 0);
892        self.upstream
893    }
894}
895
896impl<S> UpstreamBuffer<S> {
897    /// Run a future while concurrently polling the upstream so that the upstream
898    /// won't be back-pressured.
899    async fn run_future<T, E: Into<StreamExecutorError>>(
900        &mut self,
901        future: impl Future<Output = Result<T, E>>,
902    ) -> StreamExecutorResult<T> {
903        select! {
904            biased;
905            e = self.concurrently_consume_upstream() => {
906                Err(e)
907            }
908            // this arm won't be starved, because the first arm is always pending unless returning with error
909            result = future => {
910                result.map_err(Into::into)
911            }
912        }
913    }
914
915    fn pending_epoch_lag(&self) -> u64 {
916        self.upstream_pending_barriers
917            .latest_epoch()
918            .map(|epoch| {
919                epoch
920                    .prev
921                    .checked_sub(self.consumed_epoch)
922                    .expect("pending epoch must be later than consumed_epoch")
923            })
924            .unwrap_or(0)
925    }
926}
927
928#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
929async fn make_log_stream(
930    upstream_table: &BatchTable<impl StateStore>,
931    start_epoch: u64,
932    end_epoch: u64,
933    start_pk: Option<OwnedRow>,
934    chunk_size: usize,
935) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
936    let data_types = upstream_table.schema().data_types();
937    let start_pk = start_pk.as_ref();
938    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
939    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
940        upstream_table
941            .batch_iter_vnode_log(
942                start_epoch,
943                HummockReadEpoch::Committed(end_epoch),
944                start_pk,
945                vnode,
946            )
947            .map_ok(move |stream| {
948                let stream = stream.map_err(Into::into);
949                (vnode, stream, 0)
950            })
951    }))
952    .await?;
953    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
954    Ok(VnodeStream::new(
955        vnode_streams,
956        upstream_table.pk_in_output_indices().expect("should exist"),
957        builder,
958    ))
959}
960
961async fn make_snapshot_stream(
962    upstream_table: &BatchTable<impl StateStore>,
963    snapshot_epoch: u64,
964    backfill_state: &BackfillState<impl StateStore>,
965    rate_limit: RateLimit,
966    chunk_size: usize,
967    snapshot_rebuild_interval: Duration,
968    pk_scan_range: &PkScanRange,
969) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
970    let data_types = upstream_table.schema().data_types();
971    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
972        move |(vnode, progress)| {
973            let start_pk = match progress {
974                None => Some((None, 0)),
975                Some(VnodeBackfillProgress {
976                    row_count,
977                    progress: EpochBackfillProgress::Consuming { latest_pk },
978                    ..
979                }) => Some((Some(latest_pk), *row_count)),
980                Some(VnodeBackfillProgress {
981                    progress: EpochBackfillProgress::Consumed,
982                    ..
983                }) => None,
984            };
985            start_pk.map(|(start_pk, row_count)| {
986                upstream_table
987                    .batch_iter_vnode_with_pk_range(
988                        HummockReadEpoch::Committed(snapshot_epoch),
989                        start_pk,
990                        &pk_scan_range.pk_prefix,
991                        &pk_scan_range.range_bounds,
992                        vnode,
993                        PrefetchOptions::prefetch_for_large_range_scan(),
994                        snapshot_rebuild_interval,
995                    )
996                    .map_ok(move |stream| {
997                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
998                        (vnode, stream, row_count)
999                    })
1000            })
1001        },
1002    ))
1003    .await?;
1004    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
1005    Ok(VnodeStream::new(
1006        vnode_streams,
1007        upstream_table.pk_in_output_indices().expect("should exist"),
1008        builder,
1009    ))
1010}
1011
1012#[expect(clippy::too_many_arguments)]
1013#[try_stream(ok = Message, error = StreamExecutorError)]
1014async fn make_consume_snapshot_stream<'a, S: StateStore>(
1015    upstream_table: &'a BatchTable<S>,
1016    snapshot_epoch: u64,
1017    chunk_size: usize,
1018    rate_limit: &'a mut RateLimit,
1019    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
1020    progress: &'a mut CreateMviewProgressReporter,
1021    backfill_state: &'a mut BackfillState<S>,
1022    first_recv_barrier_epoch: EpochPair,
1023    initial_backfill_paused: bool,
1024    actor_ctx: &'a ActorContextRef,
1025    pk_scan_range: &'a PkScanRange,
1026) {
1027    let mut barrier_epoch = first_recv_barrier_epoch;
1028
1029    // start consume upstream snapshot
1030    let mut snapshot_stream = make_snapshot_stream(
1031        upstream_table,
1032        snapshot_epoch,
1033        &*backfill_state,
1034        *rate_limit,
1035        chunk_size,
1036        actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
1037        pk_scan_range,
1038    )
1039    .await?;
1040
1041    async fn select_barrier_and_snapshot_stream(
1042        barrier_rx: &mut UnboundedReceiver<Barrier>,
1043        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
1044        throttle_snapshot_stream: bool,
1045        backfill_paused: bool,
1046    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
1047        select!(
1048            result = receive_next_barrier(barrier_rx) => {
1049                Ok(Either::Left(result?))
1050            },
1051            result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
1052                Ok(Either::Right(result?))
1053            }
1054        )
1055    }
1056
1057    let mut epoch_row_count = 0;
1058    let mut backfill_paused = initial_backfill_paused;
1059    loop {
1060        let throttle_snapshot_stream = epoch_row_count as u64 >= rate_limit.to_u64();
1061        match select_barrier_and_snapshot_stream(
1062            barrier_rx,
1063            &mut snapshot_stream,
1064            throttle_snapshot_stream,
1065            backfill_paused,
1066        )
1067        .await?
1068        {
1069            Either::Left(barrier) => {
1070                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1071                barrier_epoch = barrier.epoch;
1072
1073                if barrier_epoch.curr >= snapshot_epoch {
1074                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
1075                }
1076                if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
1077                    backfill_paused = false;
1078                }
1079                if let Some(chunk) = snapshot_stream.consume_builder() {
1080                    epoch_row_count += chunk.cardinality();
1081                    yield Message::Chunk(chunk);
1082                }
1083                snapshot_stream
1084                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1085                        if let Some(pk) = pk_progress {
1086                            backfill_state.update_epoch_progress(
1087                                vnode,
1088                                snapshot_epoch,
1089                                row_count,
1090                                pk,
1091                            );
1092                        } else {
1093                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1094                        }
1095                    })
1096                    .await?;
1097                let count = backfill_state.total_row_count();
1098                let post_commit = backfill_state.commit(barrier.epoch).await?;
1099                trace!(?barrier_epoch, count, epoch_row_count, "update progress");
1100                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
1101                epoch_row_count = 0;
1102
1103                let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
1104                    if let Mutation::Throttle(config) = &**m
1105                        && let Some(config) = config.get(&actor_ctx.fragment_id)
1106                        && config.throttle_type() == PbThrottleType::Backfill
1107                    {
1108                        Some(config.rate_limit)
1109                    } else {
1110                        None
1111                    }
1112                });
1113                yield Message::Barrier(barrier);
1114                post_commit.post_yield_barrier(None).await?;
1115
1116                if let Some(new_rate_limit) = new_rate_limit {
1117                    let new_rate_limit = new_rate_limit.into();
1118                    *rate_limit = new_rate_limit;
1119                    snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
1120                }
1121            }
1122            Either::Right(Some(chunk)) => {
1123                if backfill_paused {
1124                    return Err(
1125                        anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
1126                    );
1127                }
1128                epoch_row_count += chunk.cardinality();
1129                yield Message::Chunk(chunk);
1130            }
1131            Either::Right(None) => {
1132                break;
1133            }
1134        }
1135    }
1136
1137    // finish consuming upstream snapshot, report finish
1138    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
1139    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
1140    barrier_epoch = barrier_to_report_finish.epoch;
1141    snapshot_stream
1142        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1143            assert_eq!(pk_progress, None);
1144            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1145        })
1146        .await?;
1147    let count = backfill_state.total_row_count();
1148    trace!(?barrier_epoch, count, "report finish");
1149    let post_commit = backfill_state.commit(barrier_epoch).await?;
1150    progress.finish(barrier_epoch, count as _);
1151    yield Message::Barrier(barrier_to_report_finish);
1152    post_commit.post_yield_barrier(None).await?;
1153
1154    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
1155    loop {
1156        let barrier = receive_next_barrier(barrier_rx).await?;
1157        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1158        barrier_epoch = barrier.epoch;
1159        let post_commit = backfill_state.commit(barrier.epoch).await?;
1160        yield Message::Barrier(barrier);
1161        post_commit.post_yield_barrier(None).await?;
1162        if barrier_epoch.curr == snapshot_epoch {
1163            break;
1164        }
1165    }
1166    trace!(?barrier_epoch, "finish consuming snapshot");
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use std::collections::HashSet;
1172    use std::sync::Arc;
1173
1174    use risingwave_common::array::StreamChunk;
1175    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1176    use risingwave_common::row::OwnedRow;
1177    use risingwave_common::test_prelude::StreamChunkTestExt;
1178    use risingwave_common::types::DataType;
1179    use risingwave_common::util::epoch::{EpochPair, test_epoch};
1180    use risingwave_common::util::sort_util::OrderType;
1181    use risingwave_hummock_test::test_utils::{HummockTestEnv, prepare_hummock_test_env};
1182    use risingwave_rpc_client::HummockMetaClient;
1183    use risingwave_storage::hummock::HummockStorage;
1184    use risingwave_storage::table::batch_table::BatchTable;
1185    use tokio::sync::mpsc::unbounded_channel;
1186    use tokio::time::{Duration, timeout};
1187
1188    use super::*;
1189    use crate::common::table::state_table::{
1190        StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
1191    };
1192    use crate::common::table::test_utils::gen_pbtable_with_value_indices;
1193    use crate::executor::exchange::input::{Input, LocalInput};
1194    use crate::executor::exchange::permit::channel_for_test;
1195    use crate::executor::{ActorContext, DispatcherMessage, ExecutorInfo, MergeExecutorUpstream};
1196    use crate::task::LocalBarrierManager;
1197
1198    const SOURCE_TABLE_ID: TableId = TableId::new(0x233);
1199    const PROGRESS_TABLE_ID: TableId = TableId::new(0x234);
1200
1201    fn source_table_pb() -> risingwave_pb::catalog::PbTable {
1202        gen_pbtable_with_value_indices(
1203            SOURCE_TABLE_ID,
1204            vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1205            vec![OrderType::ascending()],
1206            vec![0],
1207            0,
1208            vec![0],
1209        )
1210    }
1211
1212    fn progress_table_pb() -> risingwave_pb::catalog::PbTable {
1213        gen_pbtable_with_value_indices(
1214            PROGRESS_TABLE_ID,
1215            vec![
1216                ColumnDesc::unnamed(ColumnId::new(0), DataType::Int16),
1217                ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1218                ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1219                ColumnDesc::unnamed(ColumnId::new(3), DataType::Boolean),
1220                ColumnDesc::unnamed(ColumnId::new(4), DataType::Int64),
1221            ],
1222            vec![OrderType::ascending()],
1223            vec![0],
1224            1,
1225            vec![1, 2, 3, 4],
1226        )
1227    }
1228
1229    fn source_batch_table(store: HummockStorage) -> BatchTable<HummockStorage> {
1230        BatchTable::for_test(
1231            store,
1232            SOURCE_TABLE_ID,
1233            vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1234            vec![OrderType::ascending()],
1235            vec![0],
1236            vec![0],
1237        )
1238    }
1239
1240    async fn source_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1241        StateTableBuilder::new(&source_table_pb(), store, None)
1242            .with_op_consistency_level(StateTableOpConsistencyLevel::LogStoreEnabled)
1243            .forbid_preload_all_rows()
1244            .build()
1245            .await
1246    }
1247
1248    async fn progress_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1249        StateTable::from_table_catalog(&progress_table_pb(), store, None).await
1250    }
1251
1252    async fn commit_insert_epoch(
1253        test_env: &HummockTestEnv,
1254        source_state_table: &mut StateTable<HummockStorage>,
1255        epoch: &mut EpochPair,
1256        table_ids: HashSet<TableId>,
1257        values: &[i64],
1258    ) {
1259        for value in values {
1260            source_state_table.insert(OwnedRow::new(vec![Some((*value).into())]));
1261        }
1262        epoch.inc_for_test();
1263        test_env.storage.start_epoch(epoch.curr, table_ids);
1264        source_state_table.commit_for_test(*epoch).await.unwrap();
1265        let res = test_env
1266            .storage
1267            .seal_and_sync_epoch(epoch.prev, HashSet::from_iter([SOURCE_TABLE_ID]))
1268            .await
1269            .unwrap();
1270        test_env
1271            .meta_client
1272            .commit_epoch_with_change_log(epoch.prev, res, Some(vec![epoch.prev]))
1273            .await
1274            .unwrap();
1275        test_env
1276            .storage
1277            .wait_version(test_env.manager.get_current_version().await)
1278            .await;
1279    }
1280
1281    fn start_progress_epochs(test_env: &HummockTestEnv, max_epoch: u64) {
1282        for epoch in 1..=max_epoch {
1283            test_env
1284                .storage
1285                .start_epoch(test_epoch(epoch), HashSet::from_iter([PROGRESS_TABLE_ID]));
1286        }
1287    }
1288
1289    fn make_upstream_input(
1290        barrier_manager: LocalBarrierManager,
1291        actor_ctx: ActorContextRef,
1292        rx: crate::executor::exchange::permit::Receiver,
1293    ) -> MergeExecutorInput {
1294        MergeExecutorInput::new(
1295            MergeExecutorUpstream::Singleton(LocalInput::new(rx, 1001.into()).boxed_input()),
1296            actor_ctx,
1297            1919.into(),
1298            barrier_manager,
1299            Arc::new(StreamingMetrics::unused()),
1300            ExecutorInfo::for_test(
1301                Schema::new(vec![Field::unnamed(DataType::Int64)]),
1302                vec![0],
1303                "SnapshotBackfillUpstream".to_owned(),
1304                0,
1305            ),
1306        )
1307    }
1308
1309    async fn expect_barrier_with_timeout(
1310        executor: &mut BoxedMessageStream,
1311        reason: &str,
1312    ) -> Barrier {
1313        let message = timeout(Duration::from_secs(10), executor.next())
1314            .await
1315            .unwrap_or_else(|_| panic!("timed out waiting for barrier: {reason}"))
1316            .unwrap()
1317            .unwrap();
1318        match message {
1319            Message::Barrier(barrier) => barrier,
1320            other => panic!("expected barrier for {reason}, got {other:?}"),
1321        }
1322    }
1323
1324    async fn expect_chunk_with_timeout(
1325        executor: &mut BoxedMessageStream,
1326        reason: &str,
1327    ) -> StreamChunk {
1328        let message = timeout(Duration::from_secs(10), executor.next())
1329            .await
1330            .unwrap_or_else(|_| panic!("timed out waiting for chunk: {reason}"))
1331            .unwrap()
1332            .unwrap();
1333        match message {
1334            Message::Chunk(chunk) => chunk,
1335            other => panic!("expected chunk for {reason}, got {other:?}"),
1336        }
1337    }
1338
1339    async fn expect_pending_with_timeout(executor: &mut BoxedMessageStream, reason: &str) {
1340        assert!(
1341            timeout(Duration::from_millis(200), executor.next())
1342                .await
1343                .is_err(),
1344            "executor unexpectedly produced a message while waiting for {reason}"
1345        );
1346    }
1347
1348    #[tokio::test]
1349    async fn test_snapshot_backfill_without_upstream_on_hummock() {
1350        let source_env = prepare_hummock_test_env().await;
1351        source_env.register_table(source_table_pb()).await;
1352        let progress_env = prepare_hummock_test_env().await;
1353        progress_env.register_table(progress_table_pb()).await;
1354
1355        let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1356        let source_table = source_batch_table(source_env.storage.clone());
1357        let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1358
1359        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1360        source_env
1361            .storage
1362            .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1363        source_state_table.init_epoch(epoch).await.unwrap();
1364
1365        commit_insert_epoch(
1366            &source_env,
1367            &mut source_state_table,
1368            &mut epoch,
1369            HashSet::from_iter([SOURCE_TABLE_ID]),
1370            &[1],
1371        )
1372        .await;
1373        commit_insert_epoch(
1374            &source_env,
1375            &mut source_state_table,
1376            &mut epoch,
1377            HashSet::from_iter([SOURCE_TABLE_ID]),
1378            &[2],
1379        )
1380        .await;
1381        commit_insert_epoch(
1382            &source_env,
1383            &mut source_state_table,
1384            &mut epoch,
1385            HashSet::from_iter([SOURCE_TABLE_ID]),
1386            &[3],
1387        )
1388        .await;
1389        commit_insert_epoch(
1390            &source_env,
1391            &mut source_state_table,
1392            &mut epoch,
1393            HashSet::from_iter([SOURCE_TABLE_ID]),
1394            &[],
1395        )
1396        .await;
1397        start_progress_epochs(&progress_env, 5);
1398
1399        let barrier_manager = LocalBarrierManager::for_test();
1400        let progress = CreateMviewProgressReporter::for_test(barrier_manager);
1401        let actor_ctx = ActorContext::for_test(1234);
1402        let (barrier_tx, barrier_rx) = unbounded_channel();
1403        barrier_tx
1404            .send(Barrier::new_test_barrier(test_epoch(1)))
1405            .unwrap();
1406
1407        let mut executor = SnapshotBackfillExecutor::new(
1408            source_table,
1409            progress_state_table,
1410            None,
1411            None,
1412            vec![0],
1413            vec![0],
1414            actor_ctx,
1415            progress,
1416            1024,
1417            RateLimit::Disabled,
1418            barrier_rx,
1419            Arc::new(StreamingMetrics::unused()),
1420            Some(test_epoch(3)),
1421        )
1422        .expect("snapshot backfill executor should be created")
1423        .boxed()
1424        .execute();
1425
1426        assert_eq!(
1427            expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1428                .await
1429                .epoch,
1430            Barrier::new_test_barrier(test_epoch(1)).epoch
1431        );
1432        assert_eq!(
1433            expect_chunk_with_timeout(&mut executor, "snapshot chunk without upstream").await,
1434            StreamChunk::from_pretty(
1435                " I
1436                + 1
1437                + 2
1438                + 3"
1439            )
1440        );
1441        expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1442
1443        barrier_tx
1444            .send(Barrier::new_test_barrier(test_epoch(2)))
1445            .unwrap();
1446        assert_eq!(
1447            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1448                .await
1449                .epoch,
1450            Barrier::new_test_barrier(test_epoch(2)).epoch
1451        );
1452
1453        barrier_tx
1454            .send(Barrier::new_test_barrier(test_epoch(3)))
1455            .unwrap();
1456        assert_eq!(
1457            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1458                .await
1459                .epoch,
1460            Barrier::new_test_barrier(test_epoch(3)).epoch
1461        );
1462
1463        barrier_tx
1464            .send(Barrier::new_test_barrier(test_epoch(4)))
1465            .unwrap();
1466        assert_eq!(
1467            expect_barrier_with_timeout(&mut executor, "post-snapshot barrier 4")
1468                .await
1469                .epoch,
1470            Barrier::new_test_barrier(test_epoch(4)).epoch
1471        );
1472
1473        barrier_tx
1474            .send(Barrier::new_test_barrier(test_epoch(5)))
1475            .unwrap();
1476        assert_eq!(
1477            expect_barrier_with_timeout(&mut executor, "steady-state barrier 5")
1478                .await
1479                .epoch,
1480            Barrier::new_test_barrier(test_epoch(5)).epoch
1481        );
1482
1483        expect_pending_with_timeout(&mut executor, "next local barrier").await;
1484    }
1485
1486    #[tokio::test]
1487    async fn test_snapshot_backfill_with_upstream_on_hummock() {
1488        let source_env = prepare_hummock_test_env().await;
1489        source_env.register_table(source_table_pb()).await;
1490        let progress_env = prepare_hummock_test_env().await;
1491        progress_env.register_table(progress_table_pb()).await;
1492
1493        let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1494        let source_table = source_batch_table(source_env.storage.clone());
1495        let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1496
1497        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1498        source_env
1499            .storage
1500            .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1501        source_state_table.init_epoch(epoch).await.unwrap();
1502
1503        commit_insert_epoch(
1504            &source_env,
1505            &mut source_state_table,
1506            &mut epoch,
1507            HashSet::from_iter([SOURCE_TABLE_ID]),
1508            &[],
1509        )
1510        .await;
1511        commit_insert_epoch(
1512            &source_env,
1513            &mut source_state_table,
1514            &mut epoch,
1515            HashSet::from_iter([SOURCE_TABLE_ID]),
1516            &[],
1517        )
1518        .await;
1519        commit_insert_epoch(
1520            &source_env,
1521            &mut source_state_table,
1522            &mut epoch,
1523            HashSet::from_iter([SOURCE_TABLE_ID]),
1524            &[],
1525        )
1526        .await;
1527        commit_insert_epoch(
1528            &source_env,
1529            &mut source_state_table,
1530            &mut epoch,
1531            HashSet::from_iter([SOURCE_TABLE_ID]),
1532            &[4],
1533        )
1534        .await;
1535        start_progress_epochs(&progress_env, 6);
1536
1537        let barrier_manager = LocalBarrierManager::for_test();
1538        let progress = CreateMviewProgressReporter::for_test(barrier_manager.clone());
1539        let actor_ctx = ActorContext::for_test(1235);
1540        let (barrier_tx, barrier_rx) = unbounded_channel();
1541        let (upstream_tx, upstream_rx) = channel_for_test();
1542
1543        upstream_tx
1544            .send(
1545                DispatcherMessage::Barrier(
1546                    Barrier::new_test_barrier(test_epoch(5)).into_dispatcher(),
1547                )
1548                .into(),
1549            )
1550            .await
1551            .unwrap();
1552        barrier_tx
1553            .send(Barrier::new_test_barrier(test_epoch(1)))
1554            .unwrap();
1555
1556        let mut executor = SnapshotBackfillExecutor::new(
1557            source_table,
1558            progress_state_table,
1559            Some(make_upstream_input(
1560                barrier_manager,
1561                actor_ctx.clone(),
1562                upstream_rx,
1563            )),
1564            None,
1565            vec![0],
1566            vec![0],
1567            actor_ctx,
1568            progress,
1569            1024,
1570            RateLimit::Disabled,
1571            barrier_rx,
1572            Arc::new(StreamingMetrics::unused()),
1573            Some(test_epoch(3)),
1574        )
1575        .expect("snapshot backfill executor should be created")
1576        .boxed()
1577        .execute();
1578
1579        assert_eq!(
1580            expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1581                .await
1582                .epoch,
1583            Barrier::new_test_barrier(test_epoch(1)).epoch
1584        );
1585        expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1586        barrier_tx
1587            .send(Barrier::new_test_barrier(test_epoch(2)))
1588            .unwrap();
1589        assert_eq!(
1590            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1591                .await
1592                .epoch,
1593            Barrier::new_test_barrier(test_epoch(2)).epoch
1594        );
1595
1596        barrier_tx
1597            .send(Barrier::new_test_barrier(test_epoch(3)))
1598            .unwrap();
1599        assert_eq!(
1600            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1601                .await
1602                .epoch,
1603            Barrier::new_test_barrier(test_epoch(3)).epoch
1604        );
1605
1606        barrier_tx
1607            .send(Barrier::new_test_barrier(test_epoch(4)))
1608            .unwrap();
1609        assert_eq!(
1610            expect_barrier_with_timeout(&mut executor, "snapshot completion barrier 4")
1611                .await
1612                .epoch,
1613            Barrier::new_test_barrier(test_epoch(4)).epoch
1614        );
1615
1616        barrier_tx
1617            .send(Barrier::new_test_barrier(test_epoch(5)))
1618            .unwrap();
1619        assert_eq!(
1620            expect_chunk_with_timeout(&mut executor, "log-store replay chunk").await,
1621            StreamChunk::from_pretty(
1622                " I
1623                + 4"
1624            )
1625        );
1626        assert_eq!(
1627            expect_barrier_with_timeout(&mut executor, "log-store completion barrier")
1628                .await
1629                .epoch,
1630            Barrier::new_test_barrier(test_epoch(5)).epoch
1631        );
1632
1633        upstream_tx
1634            .send(DispatcherMessage::Chunk(StreamChunk::from_pretty(" I\n + 5")).into())
1635            .await
1636            .unwrap();
1637        let stop_barrier = Barrier::new_test_barrier(test_epoch(6)).with_stop();
1638        upstream_tx
1639            .send(DispatcherMessage::Barrier(stop_barrier.clone().into_dispatcher()).into())
1640            .await
1641            .unwrap();
1642        barrier_tx.send(stop_barrier.clone()).unwrap();
1643
1644        assert_eq!(
1645            expect_chunk_with_timeout(&mut executor, "live upstream chunk after handoff").await,
1646            StreamChunk::from_pretty(" I\n + 5")
1647        );
1648        assert_eq!(
1649            expect_barrier_with_timeout(&mut executor, "final stop barrier")
1650                .await
1651                .epoch,
1652            stop_barrier.epoch
1653        );
1654    }
1655}