risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::batch_plan::ScanRange;
33use risingwave_pb::common::PbThrottleType;
34use risingwave_storage::StateStore;
35use risingwave_storage::store::PrefetchOptions;
36use risingwave_storage::table::ChangeLogRow;
37use risingwave_storage::table::batch_table::{BatchTable, PkScanRange};
38use tokio::select;
39use tokio::sync::mpsc::UnboundedReceiver;
40use tokio::time::sleep;
41
42use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
43use crate::executor::backfill::snapshot_backfill::state::{
44    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
45};
46use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
47use crate::executor::backfill::utils::{
48    UpstreamStreamKeyUpdateNormalizer, create_builder, mapping_message,
49};
50use crate::executor::monitor::StreamingMetrics;
51use crate::executor::prelude::{StateTable, StreamExt, try_stream};
52use crate::executor::{
53    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
54    MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
55    expect_first_barrier,
56};
57use crate::task::CreateMviewProgressReporter;
58
59pub struct SnapshotBackfillExecutor<S: StateStore> {
60    /// Upstream table
61    upstream_table: BatchTable<S>,
62
63    /// Backfill progress table
64    progress_state_table: StateTable<S>,
65
66    /// Upstream with the same schema with the upstream table.
67    upstream: Option<MergeExecutorInput>,
68
69    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
70    output_indices: Vec<usize>,
71
72    /// Current executor stream-key indices in the output schema.
73    stream_key: Vec<usize>,
74
75    progress: CreateMviewProgressReporter,
76
77    chunk_size: usize,
78    rate_limit: RateLimit,
79
80    barrier_rx: UnboundedReceiver<Barrier>,
81
82    actor_ctx: ActorContextRef,
83    metrics: Arc<StreamingMetrics>,
84
85    snapshot_epoch: Option<u64>,
86    /// (`eq_prefix`, `range_bounds`) for pk scan range pushdown.
87    pk_scan_range: PkScanRange,
88}
89
90impl<S: StateStore> SnapshotBackfillExecutor<S> {
91    fn build_pk_scan_range(
92        pb_scan_range: Option<&ScanRange>,
93        upstream_table: &BatchTable<S>,
94    ) -> StreamExecutorResult<PkScanRange> {
95        match pb_scan_range {
96            Some(scan_range) => Ok(PkScanRange::new(
97                scan_range.clone(),
98                upstream_table.pk_serializer().get_data_types().to_vec(),
99            )?),
100            None => Ok(PkScanRange::full()),
101        }
102    }
103
104    #[expect(clippy::too_many_arguments)]
105    pub(crate) fn new(
106        upstream_table: BatchTable<S>,
107        progress_state_table: StateTable<S>,
108        upstream: Option<MergeExecutorInput>,
109        pb_pk_scan_range: Option<&ScanRange>,
110        output_indices: Vec<usize>,
111        stream_key: Vec<usize>,
112        actor_ctx: ActorContextRef,
113        progress: CreateMviewProgressReporter,
114        chunk_size: usize,
115        rate_limit: RateLimit,
116        barrier_rx: UnboundedReceiver<Barrier>,
117        metrics: Arc<StreamingMetrics>,
118        snapshot_epoch: Option<u64>,
119    ) -> StreamExecutorResult<Self> {
120        if let Some(upstream) = &upstream {
121            assert_eq!(&upstream.info.schema, upstream_table.schema());
122        }
123        if upstream_table.pk_in_output_indices().is_none() {
124            panic!(
125                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
126                upstream_table.pk_indices(),
127                upstream_table.output_indices(),
128                upstream_table.schema()
129            )
130        };
131        assert!(
132            stream_key.iter().all(|idx| *idx < output_indices.len()),
133            "stream key indices should refer to output schema: stream_key: {:?}, output_indices: {:?}",
134            stream_key,
135            output_indices
136        );
137        let pk_scan_range = Self::build_pk_scan_range(pb_pk_scan_range, &upstream_table)?;
138        if !matches!(rate_limit, RateLimit::Disabled) {
139            trace!(
140                ?rate_limit,
141                "create snapshot backfill executor with rate limit"
142            );
143        }
144        Ok(Self {
145            upstream_table,
146            progress_state_table,
147            upstream,
148            output_indices,
149            stream_key,
150            progress,
151            chunk_size,
152            rate_limit,
153            barrier_rx,
154            actor_ctx,
155            metrics,
156            snapshot_epoch,
157            pk_scan_range,
158        })
159    }
160
161    #[try_stream(ok = Message, error = StreamExecutorError)]
162    async fn execute_inner(mut self) {
163        trace!("snapshot backfill executor start");
164        let upstream = if let Some(mut upstream) = self.upstream {
165            let first_upstream_barrier = expect_first_barrier(&mut upstream).await?;
166            trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
167            Some((first_upstream_barrier, upstream))
168        } else {
169            None
170        };
171        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
172        trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
173        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
174            self.snapshot_epoch
175        {
176            if let Some((first_upstream_barrier, _)) = &upstream {
177                if first_upstream_barrier.epoch != first_recv_barrier.epoch {
178                    assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
179                    Some(snapshot_epoch)
180                } else {
181                    None
182                }
183            } else {
184                // must go through snapshot backfill when having no upstream
185                Some(snapshot_epoch)
186            }
187        } else {
188            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
189            if cfg!(debug_assertions) {
190                panic!(
191                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
192                    upstream.map(|(first_upstream_barrier, _)| first_upstream_barrier.epoch),
193                    first_recv_barrier.epoch
194                );
195            } else {
196                let (first_upstream_barrier, _) = upstream
197                    .as_ref()
198                    .ok_or_else(|| anyhow!("no upstream while snapshot epoch not set"))?;
199                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
200                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
201                None
202            }
203        };
204        let first_recv_barrier_epoch = first_recv_barrier.epoch;
205        let initial_backfill_paused =
206            first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
207        yield Message::Barrier(first_recv_barrier);
208        let mut backfill_state = BackfillState::new(
209            self.progress_state_table,
210            first_recv_barrier_epoch,
211            self.upstream_table.pk_serializer().clone(),
212        )
213        .await?;
214
215        let (mut barrier_epoch, mut need_report_finish, upstream) = {
216            if let Some(snapshot_epoch) = should_snapshot_backfill {
217                let table_id_str = format!("{}", self.upstream_table.table_id());
218                let actor_id_str = format!("{}", self.actor_ctx.id);
219
220                let consume_upstream_row_count = self
221                    .metrics
222                    .snapshot_backfill_consume_row_count
223                    .with_guarded_label_values(&[
224                        table_id_str.as_str(),
225                        actor_id_str.as_str(),
226                        "consume_upstream",
227                    ]);
228
229                let mut upstream_buffer = if let Some((first_upstream_barrier, upstream)) = upstream
230                {
231                    SnapshotBackfillUpstream::Buffer(UpstreamBuffer::new(
232                        upstream,
233                        first_upstream_barrier,
234                        consume_upstream_row_count,
235                    ))
236                } else {
237                    SnapshotBackfillUpstream::Empty
238                };
239
240                // Phase 1: consume upstream snapshot
241                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
242                    < snapshot_epoch
243                {
244                    trace!(
245                        table_id = %self.upstream_table.table_id(),
246                        snapshot_epoch,
247                        barrier_epoch = ?first_recv_barrier_epoch,
248                        "start consuming snapshot"
249                    );
250                    {
251                        let consuming_snapshot_row_count = self
252                            .metrics
253                            .snapshot_backfill_consume_row_count
254                            .with_guarded_label_values(&[
255                                table_id_str.as_str(),
256                                actor_id_str.as_str(),
257                                "consuming_snapshot",
258                            ]);
259                        let snapshot_stream = make_consume_snapshot_stream(
260                            &self.upstream_table,
261                            snapshot_epoch,
262                            self.chunk_size,
263                            &mut self.rate_limit,
264                            &mut self.barrier_rx,
265                            &mut self.progress,
266                            &mut backfill_state,
267                            first_recv_barrier_epoch,
268                            initial_backfill_paused,
269                            &self.actor_ctx,
270                            &self.pk_scan_range,
271                        );
272
273                        pin_mut!(snapshot_stream);
274
275                        while let Some(message) = upstream_buffer
276                            .run_future(snapshot_stream.try_next())
277                            .await?
278                        {
279                            if let Message::Chunk(chunk) = &message {
280                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
281                            }
282                            yield message;
283                        }
284                    }
285
286                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
287                    let recv_barrier_epoch = recv_barrier.epoch;
288                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
289                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
290                    yield Message::Barrier(recv_barrier);
291                    post_commit.post_yield_barrier(None).await?;
292                    (
293                        recv_barrier_epoch,
294                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
295                    )
296                } else {
297                    trace!(
298                        table_id = %self.upstream_table.table_id(),
299                        snapshot_epoch,
300                        barrier_epoch = ?first_recv_barrier_epoch,
301                        "skip consuming snapshot"
302                    );
303                    (
304                        first_recv_barrier_epoch,
305                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
306                    )
307                };
308
309                // Phase 2: consume upstream log store
310                match upstream_buffer {
311                    Either::Left(mut upstream_buffer) => {
312                        let initial_pending_lag =
313                            if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
314                                &upstream_buffer
315                            {
316                                Some(Duration::from_millis(
317                                    Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
318                                ))
319                            } else {
320                                None
321                            };
322                        trace!(
323                            ?barrier_epoch,
324                            table_id = %self.upstream_table.table_id(),
325                            ?initial_pending_lag,
326                            "start consuming log store"
327                        );
328
329                        let consuming_log_store_row_count = self
330                            .metrics
331                            .snapshot_backfill_consume_row_count
332                            .with_guarded_label_values(&[
333                                table_id_str.as_str(),
334                                actor_id_str.as_str(),
335                                "consuming_log_store",
336                            ]);
337                        let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
338                        loop {
339                            let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
340                            assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
341                            let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
342                            // Disable calling next_epoch, because, if barrier_epoch.prev is a checkpoint epoch,
343                            // next_epoch(barrier_epoch.prev) is actually waiting for the committed epoch.
344                            // However, upstream_buffer's is_polling_epoch_data can be false, since just received
345                            // the checkpoint barrier_epoch.prev. And then the upstream_buffer may stop polling upstream
346                            // when the max_pending_epoch_lag is small. When upstream is not polled, the barrier of the next
347                            // committed epoch cannot be collected.
348                            // {
349                            //     // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
350                            //     // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
351                            //     let next_prev_epoch = upstream_buffer
352                            //         .run_future(self.upstream_table.next_epoch(barrier_epoch.prev))
353                            //         .await?;
354                            //     assert_eq!(next_prev_epoch, barrier.epoch.prev);
355                            // }
356                            barrier_epoch = barrier.epoch;
357                            if barrier.kind.is_checkpoint() {
358                                let pending_non_checkpoint_barrier =
359                                    take(&mut pending_non_checkpoint_barrier);
360                                let end_epoch = barrier_epoch.prev;
361                                let start_epoch = pending_non_checkpoint_barrier
362                                    .first()
363                                    .map(|epoch| epoch.prev)
364                                    .unwrap_or(end_epoch);
365                                trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
366                                // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
367                                // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
368                                // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
369                                let mut stream = upstream_buffer
370                                    .run_future(make_log_stream(
371                                        &self.upstream_table,
372                                        start_epoch,
373                                        end_epoch,
374                                        None,
375                                        self.chunk_size,
376                                    ))
377                                    .await?;
378                                while let Some(chunk) =
379                                    upstream_buffer.run_future(stream.try_next()).await?
380                                {
381                                    trace!(
382                                        ?barrier_epoch,
383                                        size = chunk.cardinality(),
384                                        "consume change log yield chunk",
385                                    );
386                                    consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
387                                    yield Message::Chunk(chunk);
388                                }
389
390                                trace!(?barrier_epoch, "after consume change log");
391
392                                stream
393                                    .for_vnode_pk_progress(|vnode, row_count, progress| {
394                                        assert_eq!(progress, None);
395                                        backfill_state.finish_epoch(
396                                            vnode,
397                                            barrier.epoch.prev,
398                                            row_count,
399                                        );
400                                    })
401                                    .await?;
402                            } else {
403                                pending_non_checkpoint_barrier.push(barrier.epoch);
404                            }
405
406                            if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
407                                &upstream_buffer
408                            {
409                                if is_finished {
410                                    assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
411                                    assert!(pending_non_checkpoint_barrier.is_empty());
412                                    self.progress.finish_consuming_log_store(barrier.epoch);
413                                } else {
414                                    self.progress.update_create_mview_log_store_progress(
415                                        barrier.epoch,
416                                        upstream_buffer.pending_epoch_lag(),
417                                    );
418                                }
419                            }
420
421                            let post_commit = backfill_state.commit(barrier.epoch).await?;
422                            let update_vnode_bitmap =
423                                barrier.as_update_vnode_bitmap(self.actor_ctx.id);
424                            yield Message::Barrier(barrier);
425                            post_commit.post_yield_barrier(None).await?;
426                            if update_vnode_bitmap.is_some() {
427                                return Err(anyhow!(
428                                    "should not update vnode bitmap during consuming log store"
429                                )
430                                .into());
431                            }
432
433                            if is_finished {
434                                assert!(
435                                    pending_non_checkpoint_barrier.is_empty(),
436                                    "{pending_non_checkpoint_barrier:?}"
437                                );
438                                break;
439                            }
440                        }
441                        trace!(
442                            ?barrier_epoch,
443                            table_id = %self.upstream_table.table_id(),
444                            "finish consuming log store"
445                        );
446
447                        (
448                            barrier_epoch,
449                            false,
450                            upstream_buffer.start_consuming_upstream(),
451                        )
452                    }
453                    Either::Right(upstream) => {
454                        trace!(
455                            ?barrier_epoch,
456                            table_id = %self.upstream_table.table_id(),
457                            "skip consuming log store and start consuming upstream directly"
458                        );
459
460                        (barrier_epoch, true, upstream)
461                    }
462                }
463            } else {
464                let (first_upstream_barrier, _) = upstream
465                    .as_ref()
466                    .expect("should have upstream when skipping snapshot backfill");
467                backfill_state
468                    .latest_progress()
469                    .for_each(|(vnode, progress)| {
470                        let progress = progress.expect("should not be empty");
471                        assert_eq!(
472                            progress.epoch, first_upstream_barrier.epoch.prev,
473                            "vnode: {:?}",
474                            vnode
475                        );
476                        assert_eq!(
477                            progress.progress,
478                            EpochBackfillProgress::Consumed,
479                            "vnode: {:?}",
480                            vnode
481                        );
482                    });
483                trace!(
484                    table_id = %self.upstream_table.table_id(),
485                    "skip backfill"
486                );
487                let (first_upstream_barrier, upstream) =
488                    upstream.expect("should have upstream when skipping snapshot backfill");
489                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
490                (first_upstream_barrier.epoch, true, upstream)
491            }
492        };
493        let current_stream_key_indices = self
494            .stream_key
495            .iter()
496            .map(|idx| self.output_indices[*idx])
497            .collect();
498        let update_normalizer = UpstreamStreamKeyUpdateNormalizer::new(
499            &upstream.info.stream_key,
500            current_stream_key_indices,
501        );
502        let mut upstream = upstream.into_executor(self.barrier_rx).execute();
503        let mut epoch_row_count = 0;
504        // Phase 3: consume upstream
505        while let Some(msg) = upstream.try_next().await? {
506            let Some(msg) = update_normalizer.normalize_message(msg) else {
507                continue;
508            };
509            match msg {
510                Message::Barrier(barrier) => {
511                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
512                    self.upstream_table
513                        .vnodes()
514                        .iter_vnodes()
515                        .for_each(|vnode| {
516                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
517                            // executor.
518                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
519                        });
520                    epoch_row_count = 0;
521                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
522                    barrier_epoch = barrier.epoch;
523                    if need_report_finish {
524                        need_report_finish = false;
525                        self.progress.finish_consuming_log_store(barrier_epoch);
526                    }
527                    let post_commit = backfill_state.commit(barrier.epoch).await?;
528                    yield Message::Barrier(barrier);
529                    if let Some(new_vnode_bitmap) =
530                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
531                    {
532                        let _prev_vnode_bitmap =
533                            self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
534                        backfill_state
535                            .latest_progress()
536                            .for_each(|(vnode, progress)| {
537                                let progress = progress.expect("should not be empty");
538                                assert_eq!(
539                                    progress.epoch, barrier_epoch.prev,
540                                    "vnode {:?} has unexpected progress epoch",
541                                    vnode
542                                );
543                                assert_eq!(
544                                    progress.progress,
545                                    EpochBackfillProgress::Consumed,
546                                    "vnode {:?} has unexpected progress",
547                                    vnode
548                                );
549                            });
550                    }
551                }
552                msg => {
553                    if let Message::Chunk(chunk) = &msg {
554                        epoch_row_count += chunk.cardinality();
555                    }
556                    yield msg;
557                }
558            }
559        }
560    }
561}
562
563impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
564    fn execute(self: Box<Self>) -> BoxedMessageStream {
565        let output_indices = self.output_indices.clone();
566        self.execute_inner()
567            .filter_map(move |result| {
568                ready({
569                    match result {
570                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
571                        Err(e) => Some(Err(e)),
572                    }
573                })
574            })
575            .boxed()
576    }
577}
578
579struct ConsumingSnapshot;
580struct ConsumingLogStore;
581
582#[derive(Debug)]
583struct PendingBarriers {
584    first_upstream_barrier_epoch: EpochPair,
585
586    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
587    /// Newer barrier at the front
588    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
589
590    /// In the outer `VecDeque`, newer barriers at the front.
591    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
592    /// and others as non-checkpoint barrier
593    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
594}
595
596impl PendingBarriers {
597    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
598        Self {
599            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
600            pending_non_checkpoint_barriers: Default::default(),
601            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
602                first_upstream_barrier,
603            ])]),
604        }
605    }
606
607    fn add(&mut self, barrier: DispatcherBarrier) {
608        let is_checkpoint = barrier.kind.is_checkpoint();
609        self.pending_non_checkpoint_barriers.push_front(barrier);
610        if is_checkpoint {
611            self.checkpoint_barrier_groups
612                .push_front(take(&mut self.pending_non_checkpoint_barriers));
613        }
614    }
615
616    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
617        self.checkpoint_barrier_groups.pop_back()
618    }
619
620    fn consume_epoch(&mut self, epoch: EpochPair) {
621        let barriers = self
622            .checkpoint_barrier_groups
623            .back_mut()
624            .expect("non-empty");
625        let oldest_upstream_barrier = barriers.back().expect("non-empty");
626        assert!(
627            oldest_upstream_barrier.epoch.prev >= epoch.prev,
628            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
629            oldest_upstream_barrier.epoch,
630            epoch
631        );
632        if oldest_upstream_barrier.epoch.prev == epoch.prev {
633            assert_eq!(oldest_upstream_barrier.epoch, epoch);
634            barriers.pop_back();
635            if barriers.is_empty() {
636                self.checkpoint_barrier_groups.pop_back();
637            }
638        }
639    }
640
641    fn latest_epoch(&self) -> Option<EpochPair> {
642        self.pending_non_checkpoint_barriers
643            .front()
644            .or_else(|| {
645                self.checkpoint_barrier_groups
646                    .front()
647                    .and_then(|barriers| barriers.front())
648            })
649            .map(|barrier| barrier.epoch)
650    }
651
652    fn checkpoint_epoch_count(&self) -> usize {
653        self.checkpoint_barrier_groups.len()
654    }
655
656    fn has_checkpoint_epoch(&self) -> bool {
657        !self.checkpoint_barrier_groups.is_empty()
658    }
659}
660
661enum SnapshotBackfillUpstream<S> {
662    Empty,
663    Buffer(UpstreamBuffer<S>),
664}
665
666impl<S> SnapshotBackfillUpstream<S> {
667    async fn run_future<T, E: Into<StreamExecutorError>>(
668        &mut self,
669        future: impl Future<Output = Result<T, E>>,
670    ) -> StreamExecutorResult<T> {
671        match self {
672            SnapshotBackfillUpstream::Empty => future.await.map_err(Into::into),
673            SnapshotBackfillUpstream::Buffer(buffer) => buffer.run_future(future).await,
674        }
675    }
676}
677
678impl SnapshotBackfillUpstream<ConsumingSnapshot> {
679    fn start_consuming_log_store(
680        self,
681        consumed_epoch: u64,
682    ) -> Either<SnapshotBackfillUpstream<ConsumingLogStore>, MergeExecutorInput> {
683        match self {
684            SnapshotBackfillUpstream::Empty => Either::Left(SnapshotBackfillUpstream::Empty),
685            SnapshotBackfillUpstream::Buffer(buffer) => {
686                match buffer.start_consuming_log_store(consumed_epoch) {
687                    Either::Left(buffer) => Either::Left(SnapshotBackfillUpstream::Buffer(buffer)),
688                    Either::Right(input) => Either::Right(input),
689                }
690            }
691        }
692    }
693}
694
695impl SnapshotBackfillUpstream<ConsumingLogStore> {
696    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
697        match self {
698            SnapshotBackfillUpstream::Empty => Ok(false),
699            SnapshotBackfillUpstream::Buffer(buffer) => buffer.consumed_epoch(epoch).await,
700        }
701    }
702
703    fn start_consuming_upstream(self) -> MergeExecutorInput {
704        match self {
705            SnapshotBackfillUpstream::Empty => {
706                unreachable!("unlike to start consuming upstream when having no upstream")
707            }
708            SnapshotBackfillUpstream::Buffer(buffer) => buffer.start_consuming_upstream(),
709        }
710    }
711}
712
713struct UpstreamBuffer<S> {
714    upstream: MergeExecutorInput,
715    max_pending_epoch_lag: u64,
716    consumed_epoch: u64,
717    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
718    upstream_pending_barriers: PendingBarriers,
719    /// Whether we have started polling any upstream data before the next checkpoint barrier.
720    /// When `true`, we should continue polling until the next checkpoint barrier, because
721    /// some data in this epoch have been discarded and data in this epoch
722    /// must be read from log store
723    is_polling_epoch_data: bool,
724    consume_upstream_row_count: LabelGuardedIntCounter,
725    _phase: S,
726}
727
728impl UpstreamBuffer<ConsumingSnapshot> {
729    fn new(
730        upstream: MergeExecutorInput,
731        first_upstream_barrier: DispatcherBarrier,
732        consume_upstream_row_count: LabelGuardedIntCounter,
733    ) -> Self {
734        Self {
735            upstream,
736            is_polling_epoch_data: false,
737            consume_upstream_row_count,
738            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
739            // no limit on the number of pending barrier in the beginning
740            max_pending_epoch_lag: u64::MAX,
741            consumed_epoch: 0,
742            _phase: ConsumingSnapshot {},
743        }
744    }
745
746    fn start_consuming_log_store(
747        mut self,
748        consumed_epoch: u64,
749    ) -> Either<UpstreamBuffer<ConsumingLogStore>, MergeExecutorInput> {
750        if self
751            .upstream_pending_barriers
752            .first_upstream_barrier_epoch
753            .prev
754            == consumed_epoch
755        {
756            assert_eq!(
757                1,
758                self.upstream_pending_barriers
759                    .pop()
760                    .expect("non-empty")
761                    .len()
762            );
763        }
764        let max_pending_epoch_lag = self.pending_epoch_lag();
765        let buffer = UpstreamBuffer {
766            upstream: self.upstream,
767            upstream_pending_barriers: self.upstream_pending_barriers,
768            max_pending_epoch_lag,
769            is_polling_epoch_data: self.is_polling_epoch_data,
770            consume_upstream_row_count: self.consume_upstream_row_count,
771            consumed_epoch,
772            _phase: ConsumingLogStore {},
773        };
774        if buffer.is_finished() {
775            Either::Right(buffer.upstream)
776        } else {
777            Either::Left(buffer)
778        }
779    }
780}
781
782impl<S> UpstreamBuffer<S> {
783    fn can_consume_upstream(&self) -> bool {
784        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
785    }
786
787    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
788        {
789            loop {
790                if let Err(e) = try {
791                    if !self.can_consume_upstream() {
792                        // pause the future to block consuming upstream
793                        sleep(Duration::from_secs(30)).await;
794                        warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
795                        return pending().await;
796                    }
797                    self.consume_until_next_checkpoint_barrier().await?;
798                } {
799                    break e;
800                }
801            }
802        }
803    }
804
805    /// Consume the upstream until seeing the next barrier.
806    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
807        loop {
808            let msg: DispatcherMessage = self
809                .upstream
810                .try_next()
811                .await?
812                .ok_or_else(|| anyhow!("end of upstream"))?;
813            match msg {
814                DispatcherMessage::Chunk(chunk) => {
815                    self.is_polling_epoch_data = true;
816                    self.consume_upstream_row_count
817                        .inc_by(chunk.cardinality() as _);
818                }
819                DispatcherMessage::Barrier(barrier) => {
820                    let is_checkpoint = barrier.kind.is_checkpoint();
821                    self.upstream_pending_barriers.add(barrier);
822                    if is_checkpoint {
823                        self.is_polling_epoch_data = false;
824                        break;
825                    } else {
826                        self.is_polling_epoch_data = true;
827                    }
828                }
829                DispatcherMessage::Watermark(_) => {
830                    self.is_polling_epoch_data = true;
831                }
832            }
833        }
834        Ok(())
835    }
836}
837
838impl UpstreamBuffer<ConsumingLogStore> {
839    #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
840    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
841        assert!(!self.is_finished());
842        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
843            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
844            // we must have returned true to indicate finish, and should not be called again.
845            assert!(self.is_polling_epoch_data);
846            self.consume_until_next_checkpoint_barrier().await?;
847            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
848        }
849        self.upstream_pending_barriers.consume_epoch(epoch);
850
851        {
852            {
853                let prev_epoch = epoch.prev;
854                assert!(self.consumed_epoch < prev_epoch);
855                let elapsed_epoch = prev_epoch - self.consumed_epoch;
856                self.consumed_epoch = prev_epoch;
857                if self.upstream_pending_barriers.has_checkpoint_epoch() {
858                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
859                    while self.can_consume_upstream()
860                        && let Some(result) =
861                            self.consume_until_next_checkpoint_barrier().now_or_never()
862                    {
863                        result?;
864                    }
865                }
866                // sub to ensure that the lag is monotonically decreasing.
867                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
868                // the upstream can at least progress for one epoch.
869                self.max_pending_epoch_lag = min(
870                    self.pending_epoch_lag(),
871                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
872                );
873            }
874        }
875        Ok(self.is_finished())
876    }
877
878    fn is_finished(&self) -> bool {
879        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
880            assert!(
881                self.upstream_pending_barriers
882                    .pending_non_checkpoint_barriers
883                    .is_empty()
884            )
885        }
886        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
887    }
888
889    fn start_consuming_upstream(self) -> MergeExecutorInput {
890        assert!(self.is_finished());
891        assert_eq!(self.pending_epoch_lag(), 0);
892        self.upstream
893    }
894}
895
896impl<S> UpstreamBuffer<S> {
897    /// Run a future while concurrently polling the upstream so that the upstream
898    /// won't be back-pressured.
899    async fn run_future<T, E: Into<StreamExecutorError>>(
900        &mut self,
901        future: impl Future<Output = Result<T, E>>,
902    ) -> StreamExecutorResult<T> {
903        select! {
904            biased;
905            e = self.concurrently_consume_upstream() => {
906                Err(e)
907            }
908            // this arm won't be starved, because the first arm is always pending unless returning with error
909            result = future => {
910                result.map_err(Into::into)
911            }
912        }
913    }
914
915    fn pending_epoch_lag(&self) -> u64 {
916        self.upstream_pending_barriers
917            .latest_epoch()
918            .map(|epoch| {
919                epoch
920                    .prev
921                    .checked_sub(self.consumed_epoch)
922                    .expect("pending epoch must be later than consumed_epoch")
923            })
924            .unwrap_or(0)
925    }
926}
927
928#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
929async fn make_log_stream(
930    upstream_table: &BatchTable<impl StateStore>,
931    start_epoch: u64,
932    end_epoch: u64,
933    start_pk: Option<OwnedRow>,
934    chunk_size: usize,
935) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
936    let data_types = upstream_table.schema().data_types();
937    let start_pk = start_pk.as_ref();
938    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
939    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
940        upstream_table
941            .batch_iter_vnode_log(
942                start_epoch,
943                HummockReadEpoch::Committed(end_epoch),
944                start_pk,
945                vnode,
946            )
947            .map_ok(move |stream| {
948                let stream = stream.map_err(Into::into);
949                (vnode, stream, 0)
950            })
951    }))
952    .await?;
953    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
954    Ok(VnodeStream::new(
955        vnode_streams,
956        upstream_table.pk_in_output_indices().expect("should exist"),
957        builder,
958    ))
959}
960
961async fn make_snapshot_stream(
962    upstream_table: &BatchTable<impl StateStore>,
963    snapshot_epoch: u64,
964    backfill_state: &BackfillState<impl StateStore>,
965    rate_limit: RateLimit,
966    chunk_size: usize,
967    snapshot_rebuild_interval: Duration,
968    pk_scan_range: &PkScanRange,
969) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
970    let data_types = upstream_table.schema().data_types();
971    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
972        move |(vnode, progress)| {
973            let start_pk = match progress {
974                None => Some((None, 0)),
975                Some(VnodeBackfillProgress {
976                    row_count,
977                    progress: EpochBackfillProgress::Consuming { latest_pk },
978                    ..
979                }) => Some((Some(latest_pk), *row_count)),
980                Some(VnodeBackfillProgress {
981                    progress: EpochBackfillProgress::Consumed,
982                    ..
983                }) => None,
984            };
985            start_pk.map(|(start_pk, row_count)| {
986                upstream_table
987                    .batch_iter_vnode_with_pk_range(
988                        HummockReadEpoch::Committed(snapshot_epoch),
989                        start_pk,
990                        &pk_scan_range.pk_prefix,
991                        &pk_scan_range.range_bounds,
992                        vnode,
993                        PrefetchOptions::prefetch_for_large_range_scan(),
994                        snapshot_rebuild_interval,
995                    )
996                    .map_ok(move |stream| {
997                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
998                        (vnode, stream, row_count)
999                    })
1000            })
1001        },
1002    ))
1003    .await?;
1004    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
1005    Ok(VnodeStream::new(
1006        vnode_streams,
1007        upstream_table.pk_in_output_indices().expect("should exist"),
1008        builder,
1009    ))
1010}
1011
1012#[expect(clippy::too_many_arguments)]
1013#[try_stream(ok = Message, error = StreamExecutorError)]
1014async fn make_consume_snapshot_stream<'a, S: StateStore>(
1015    upstream_table: &'a BatchTable<S>,
1016    snapshot_epoch: u64,
1017    chunk_size: usize,
1018    rate_limit: &'a mut RateLimit,
1019    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
1020    progress: &'a mut CreateMviewProgressReporter,
1021    backfill_state: &'a mut BackfillState<S>,
1022    first_recv_barrier_epoch: EpochPair,
1023    initial_backfill_paused: bool,
1024    actor_ctx: &'a ActorContextRef,
1025    pk_scan_range: &'a PkScanRange,
1026) {
1027    let mut barrier_epoch = first_recv_barrier_epoch;
1028
1029    // start consume upstream snapshot
1030    let mut snapshot_stream = make_snapshot_stream(
1031        upstream_table,
1032        snapshot_epoch,
1033        &*backfill_state,
1034        *rate_limit,
1035        chunk_size,
1036        actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
1037        pk_scan_range,
1038    )
1039    .await?;
1040
1041    async fn select_barrier_and_snapshot_stream(
1042        barrier_rx: &mut UnboundedReceiver<Barrier>,
1043        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
1044        throttle_snapshot_stream: bool,
1045        backfill_paused: bool,
1046    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
1047        select!(
1048            result = receive_next_barrier(barrier_rx) => {
1049                Ok(Either::Left(result?))
1050            },
1051            result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
1052                Ok(Either::Right(result?))
1053            }
1054        )
1055    }
1056
1057    let mut count = 0;
1058    let mut epoch_row_count = 0;
1059    let mut backfill_paused = initial_backfill_paused;
1060    loop {
1061        let throttle_snapshot_stream = epoch_row_count as u64 >= rate_limit.to_u64();
1062        match select_barrier_and_snapshot_stream(
1063            barrier_rx,
1064            &mut snapshot_stream,
1065            throttle_snapshot_stream,
1066            backfill_paused,
1067        )
1068        .await?
1069        {
1070            Either::Left(barrier) => {
1071                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1072                barrier_epoch = barrier.epoch;
1073
1074                if barrier_epoch.curr >= snapshot_epoch {
1075                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
1076                }
1077                if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
1078                    backfill_paused = false;
1079                }
1080                if let Some(chunk) = snapshot_stream.consume_builder() {
1081                    count += chunk.cardinality();
1082                    epoch_row_count += chunk.cardinality();
1083                    yield Message::Chunk(chunk);
1084                }
1085                snapshot_stream
1086                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1087                        if let Some(pk) = pk_progress {
1088                            backfill_state.update_epoch_progress(
1089                                vnode,
1090                                snapshot_epoch,
1091                                row_count,
1092                                pk,
1093                            );
1094                        } else {
1095                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1096                        }
1097                    })
1098                    .await?;
1099                let post_commit = backfill_state.commit(barrier.epoch).await?;
1100                trace!(?barrier_epoch, count, epoch_row_count, "update progress");
1101                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
1102                epoch_row_count = 0;
1103
1104                let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
1105                    if let Mutation::Throttle(config) = &**m
1106                        && let Some(config) = config.get(&actor_ctx.fragment_id)
1107                        && config.throttle_type() == PbThrottleType::Backfill
1108                    {
1109                        Some(config.rate_limit)
1110                    } else {
1111                        None
1112                    }
1113                });
1114                yield Message::Barrier(barrier);
1115                post_commit.post_yield_barrier(None).await?;
1116
1117                if let Some(new_rate_limit) = new_rate_limit {
1118                    let new_rate_limit = new_rate_limit.into();
1119                    *rate_limit = new_rate_limit;
1120                    snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
1121                }
1122            }
1123            Either::Right(Some(chunk)) => {
1124                if backfill_paused {
1125                    return Err(
1126                        anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
1127                    );
1128                }
1129                count += chunk.cardinality();
1130                epoch_row_count += chunk.cardinality();
1131                yield Message::Chunk(chunk);
1132            }
1133            Either::Right(None) => {
1134                break;
1135            }
1136        }
1137    }
1138
1139    // finish consuming upstream snapshot, report finish
1140    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
1141    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
1142    barrier_epoch = barrier_to_report_finish.epoch;
1143    trace!(?barrier_epoch, count, "report finish");
1144    snapshot_stream
1145        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1146            assert_eq!(pk_progress, None);
1147            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1148        })
1149        .await?;
1150    let post_commit = backfill_state.commit(barrier_epoch).await?;
1151    progress.finish(barrier_epoch, count as _);
1152    yield Message::Barrier(barrier_to_report_finish);
1153    post_commit.post_yield_barrier(None).await?;
1154
1155    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
1156    loop {
1157        let barrier = receive_next_barrier(barrier_rx).await?;
1158        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1159        barrier_epoch = barrier.epoch;
1160        let post_commit = backfill_state.commit(barrier.epoch).await?;
1161        yield Message::Barrier(barrier);
1162        post_commit.post_yield_barrier(None).await?;
1163        if barrier_epoch.curr == snapshot_epoch {
1164            break;
1165        }
1166    }
1167    trace!(?barrier_epoch, "finish consuming snapshot");
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use std::collections::HashSet;
1173    use std::sync::Arc;
1174
1175    use risingwave_common::array::StreamChunk;
1176    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1177    use risingwave_common::row::OwnedRow;
1178    use risingwave_common::test_prelude::StreamChunkTestExt;
1179    use risingwave_common::types::DataType;
1180    use risingwave_common::util::epoch::{EpochPair, test_epoch};
1181    use risingwave_common::util::sort_util::OrderType;
1182    use risingwave_hummock_test::test_utils::{HummockTestEnv, prepare_hummock_test_env};
1183    use risingwave_rpc_client::HummockMetaClient;
1184    use risingwave_storage::hummock::HummockStorage;
1185    use risingwave_storage::table::batch_table::BatchTable;
1186    use tokio::sync::mpsc::unbounded_channel;
1187    use tokio::time::{Duration, timeout};
1188
1189    use super::*;
1190    use crate::common::table::state_table::{
1191        StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
1192    };
1193    use crate::common::table::test_utils::gen_pbtable_with_value_indices;
1194    use crate::executor::exchange::input::{Input, LocalInput};
1195    use crate::executor::exchange::permit::channel_for_test;
1196    use crate::executor::{ActorContext, DispatcherMessage, ExecutorInfo, MergeExecutorUpstream};
1197    use crate::task::LocalBarrierManager;
1198
1199    const SOURCE_TABLE_ID: TableId = TableId::new(0x233);
1200    const PROGRESS_TABLE_ID: TableId = TableId::new(0x234);
1201
1202    fn source_table_pb() -> risingwave_pb::catalog::PbTable {
1203        gen_pbtable_with_value_indices(
1204            SOURCE_TABLE_ID,
1205            vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1206            vec![OrderType::ascending()],
1207            vec![0],
1208            0,
1209            vec![0],
1210        )
1211    }
1212
1213    fn progress_table_pb() -> risingwave_pb::catalog::PbTable {
1214        gen_pbtable_with_value_indices(
1215            PROGRESS_TABLE_ID,
1216            vec![
1217                ColumnDesc::unnamed(ColumnId::new(0), DataType::Int16),
1218                ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1219                ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1220                ColumnDesc::unnamed(ColumnId::new(3), DataType::Boolean),
1221                ColumnDesc::unnamed(ColumnId::new(4), DataType::Int64),
1222            ],
1223            vec![OrderType::ascending()],
1224            vec![0],
1225            1,
1226            vec![1, 2, 3, 4],
1227        )
1228    }
1229
1230    fn source_batch_table(store: HummockStorage) -> BatchTable<HummockStorage> {
1231        BatchTable::for_test(
1232            store,
1233            SOURCE_TABLE_ID,
1234            vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1235            vec![OrderType::ascending()],
1236            vec![0],
1237            vec![0],
1238        )
1239    }
1240
1241    async fn source_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1242        StateTableBuilder::new(&source_table_pb(), store, None)
1243            .with_op_consistency_level(StateTableOpConsistencyLevel::LogStoreEnabled)
1244            .forbid_preload_all_rows()
1245            .build()
1246            .await
1247    }
1248
1249    async fn progress_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1250        StateTable::from_table_catalog(&progress_table_pb(), store, None).await
1251    }
1252
1253    async fn commit_insert_epoch(
1254        test_env: &HummockTestEnv,
1255        source_state_table: &mut StateTable<HummockStorage>,
1256        epoch: &mut EpochPair,
1257        table_ids: HashSet<TableId>,
1258        values: &[i64],
1259    ) {
1260        for value in values {
1261            source_state_table.insert(OwnedRow::new(vec![Some((*value).into())]));
1262        }
1263        epoch.inc_for_test();
1264        test_env.storage.start_epoch(epoch.curr, table_ids);
1265        source_state_table.commit_for_test(*epoch).await.unwrap();
1266        let res = test_env
1267            .storage
1268            .seal_and_sync_epoch(epoch.prev, HashSet::from_iter([SOURCE_TABLE_ID]))
1269            .await
1270            .unwrap();
1271        test_env
1272            .meta_client
1273            .commit_epoch_with_change_log(epoch.prev, res, Some(vec![epoch.prev]))
1274            .await
1275            .unwrap();
1276        test_env
1277            .storage
1278            .wait_version(test_env.manager.get_current_version().await)
1279            .await;
1280    }
1281
1282    fn start_progress_epochs(test_env: &HummockTestEnv, max_epoch: u64) {
1283        for epoch in 1..=max_epoch {
1284            test_env
1285                .storage
1286                .start_epoch(test_epoch(epoch), HashSet::from_iter([PROGRESS_TABLE_ID]));
1287        }
1288    }
1289
1290    fn make_upstream_input(
1291        barrier_manager: LocalBarrierManager,
1292        actor_ctx: ActorContextRef,
1293        rx: crate::executor::exchange::permit::Receiver,
1294    ) -> MergeExecutorInput {
1295        MergeExecutorInput::new(
1296            MergeExecutorUpstream::Singleton(LocalInput::new(rx, 1001.into()).boxed_input()),
1297            actor_ctx,
1298            1919.into(),
1299            barrier_manager,
1300            Arc::new(StreamingMetrics::unused()),
1301            ExecutorInfo::for_test(
1302                Schema::new(vec![Field::unnamed(DataType::Int64)]),
1303                vec![0],
1304                "SnapshotBackfillUpstream".to_owned(),
1305                0,
1306            ),
1307        )
1308    }
1309
1310    async fn expect_barrier_with_timeout(
1311        executor: &mut BoxedMessageStream,
1312        reason: &str,
1313    ) -> Barrier {
1314        let message = timeout(Duration::from_secs(10), executor.next())
1315            .await
1316            .unwrap_or_else(|_| panic!("timed out waiting for barrier: {reason}"))
1317            .unwrap()
1318            .unwrap();
1319        match message {
1320            Message::Barrier(barrier) => barrier,
1321            other => panic!("expected barrier for {reason}, got {other:?}"),
1322        }
1323    }
1324
1325    async fn expect_chunk_with_timeout(
1326        executor: &mut BoxedMessageStream,
1327        reason: &str,
1328    ) -> StreamChunk {
1329        let message = timeout(Duration::from_secs(10), executor.next())
1330            .await
1331            .unwrap_or_else(|_| panic!("timed out waiting for chunk: {reason}"))
1332            .unwrap()
1333            .unwrap();
1334        match message {
1335            Message::Chunk(chunk) => chunk,
1336            other => panic!("expected chunk for {reason}, got {other:?}"),
1337        }
1338    }
1339
1340    async fn expect_pending_with_timeout(executor: &mut BoxedMessageStream, reason: &str) {
1341        assert!(
1342            timeout(Duration::from_millis(200), executor.next())
1343                .await
1344                .is_err(),
1345            "executor unexpectedly produced a message while waiting for {reason}"
1346        );
1347    }
1348
1349    #[tokio::test]
1350    async fn test_snapshot_backfill_without_upstream_on_hummock() {
1351        let source_env = prepare_hummock_test_env().await;
1352        source_env.register_table(source_table_pb()).await;
1353        let progress_env = prepare_hummock_test_env().await;
1354        progress_env.register_table(progress_table_pb()).await;
1355
1356        let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1357        let source_table = source_batch_table(source_env.storage.clone());
1358        let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1359
1360        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1361        source_env
1362            .storage
1363            .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1364        source_state_table.init_epoch(epoch).await.unwrap();
1365
1366        commit_insert_epoch(
1367            &source_env,
1368            &mut source_state_table,
1369            &mut epoch,
1370            HashSet::from_iter([SOURCE_TABLE_ID]),
1371            &[1],
1372        )
1373        .await;
1374        commit_insert_epoch(
1375            &source_env,
1376            &mut source_state_table,
1377            &mut epoch,
1378            HashSet::from_iter([SOURCE_TABLE_ID]),
1379            &[2],
1380        )
1381        .await;
1382        commit_insert_epoch(
1383            &source_env,
1384            &mut source_state_table,
1385            &mut epoch,
1386            HashSet::from_iter([SOURCE_TABLE_ID]),
1387            &[3],
1388        )
1389        .await;
1390        commit_insert_epoch(
1391            &source_env,
1392            &mut source_state_table,
1393            &mut epoch,
1394            HashSet::from_iter([SOURCE_TABLE_ID]),
1395            &[],
1396        )
1397        .await;
1398        start_progress_epochs(&progress_env, 5);
1399
1400        let barrier_manager = LocalBarrierManager::for_test();
1401        let progress = CreateMviewProgressReporter::for_test(barrier_manager);
1402        let actor_ctx = ActorContext::for_test(1234);
1403        let (barrier_tx, barrier_rx) = unbounded_channel();
1404        barrier_tx
1405            .send(Barrier::new_test_barrier(test_epoch(1)))
1406            .unwrap();
1407
1408        let mut executor = SnapshotBackfillExecutor::new(
1409            source_table,
1410            progress_state_table,
1411            None,
1412            None,
1413            vec![0],
1414            vec![0],
1415            actor_ctx,
1416            progress,
1417            1024,
1418            RateLimit::Disabled,
1419            barrier_rx,
1420            Arc::new(StreamingMetrics::unused()),
1421            Some(test_epoch(3)),
1422        )
1423        .expect("snapshot backfill executor should be created")
1424        .boxed()
1425        .execute();
1426
1427        assert_eq!(
1428            expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1429                .await
1430                .epoch,
1431            Barrier::new_test_barrier(test_epoch(1)).epoch
1432        );
1433        assert_eq!(
1434            expect_chunk_with_timeout(&mut executor, "snapshot chunk without upstream").await,
1435            StreamChunk::from_pretty(
1436                " I
1437                + 1
1438                + 2
1439                + 3"
1440            )
1441        );
1442        expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1443
1444        barrier_tx
1445            .send(Barrier::new_test_barrier(test_epoch(2)))
1446            .unwrap();
1447        assert_eq!(
1448            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1449                .await
1450                .epoch,
1451            Barrier::new_test_barrier(test_epoch(2)).epoch
1452        );
1453
1454        barrier_tx
1455            .send(Barrier::new_test_barrier(test_epoch(3)))
1456            .unwrap();
1457        assert_eq!(
1458            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1459                .await
1460                .epoch,
1461            Barrier::new_test_barrier(test_epoch(3)).epoch
1462        );
1463
1464        barrier_tx
1465            .send(Barrier::new_test_barrier(test_epoch(4)))
1466            .unwrap();
1467        assert_eq!(
1468            expect_barrier_with_timeout(&mut executor, "post-snapshot barrier 4")
1469                .await
1470                .epoch,
1471            Barrier::new_test_barrier(test_epoch(4)).epoch
1472        );
1473
1474        barrier_tx
1475            .send(Barrier::new_test_barrier(test_epoch(5)))
1476            .unwrap();
1477        assert_eq!(
1478            expect_barrier_with_timeout(&mut executor, "steady-state barrier 5")
1479                .await
1480                .epoch,
1481            Barrier::new_test_barrier(test_epoch(5)).epoch
1482        );
1483
1484        expect_pending_with_timeout(&mut executor, "next local barrier").await;
1485    }
1486
1487    #[tokio::test]
1488    async fn test_snapshot_backfill_with_upstream_on_hummock() {
1489        let source_env = prepare_hummock_test_env().await;
1490        source_env.register_table(source_table_pb()).await;
1491        let progress_env = prepare_hummock_test_env().await;
1492        progress_env.register_table(progress_table_pb()).await;
1493
1494        let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1495        let source_table = source_batch_table(source_env.storage.clone());
1496        let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1497
1498        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1499        source_env
1500            .storage
1501            .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1502        source_state_table.init_epoch(epoch).await.unwrap();
1503
1504        commit_insert_epoch(
1505            &source_env,
1506            &mut source_state_table,
1507            &mut epoch,
1508            HashSet::from_iter([SOURCE_TABLE_ID]),
1509            &[],
1510        )
1511        .await;
1512        commit_insert_epoch(
1513            &source_env,
1514            &mut source_state_table,
1515            &mut epoch,
1516            HashSet::from_iter([SOURCE_TABLE_ID]),
1517            &[],
1518        )
1519        .await;
1520        commit_insert_epoch(
1521            &source_env,
1522            &mut source_state_table,
1523            &mut epoch,
1524            HashSet::from_iter([SOURCE_TABLE_ID]),
1525            &[],
1526        )
1527        .await;
1528        commit_insert_epoch(
1529            &source_env,
1530            &mut source_state_table,
1531            &mut epoch,
1532            HashSet::from_iter([SOURCE_TABLE_ID]),
1533            &[4],
1534        )
1535        .await;
1536        start_progress_epochs(&progress_env, 6);
1537
1538        let barrier_manager = LocalBarrierManager::for_test();
1539        let progress = CreateMviewProgressReporter::for_test(barrier_manager.clone());
1540        let actor_ctx = ActorContext::for_test(1235);
1541        let (barrier_tx, barrier_rx) = unbounded_channel();
1542        let (upstream_tx, upstream_rx) = channel_for_test();
1543
1544        upstream_tx
1545            .send(
1546                DispatcherMessage::Barrier(
1547                    Barrier::new_test_barrier(test_epoch(5)).into_dispatcher(),
1548                )
1549                .into(),
1550            )
1551            .await
1552            .unwrap();
1553        barrier_tx
1554            .send(Barrier::new_test_barrier(test_epoch(1)))
1555            .unwrap();
1556
1557        let mut executor = SnapshotBackfillExecutor::new(
1558            source_table,
1559            progress_state_table,
1560            Some(make_upstream_input(
1561                barrier_manager,
1562                actor_ctx.clone(),
1563                upstream_rx,
1564            )),
1565            None,
1566            vec![0],
1567            vec![0],
1568            actor_ctx,
1569            progress,
1570            1024,
1571            RateLimit::Disabled,
1572            barrier_rx,
1573            Arc::new(StreamingMetrics::unused()),
1574            Some(test_epoch(3)),
1575        )
1576        .expect("snapshot backfill executor should be created")
1577        .boxed()
1578        .execute();
1579
1580        assert_eq!(
1581            expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1582                .await
1583                .epoch,
1584            Barrier::new_test_barrier(test_epoch(1)).epoch
1585        );
1586        expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1587        barrier_tx
1588            .send(Barrier::new_test_barrier(test_epoch(2)))
1589            .unwrap();
1590        assert_eq!(
1591            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1592                .await
1593                .epoch,
1594            Barrier::new_test_barrier(test_epoch(2)).epoch
1595        );
1596
1597        barrier_tx
1598            .send(Barrier::new_test_barrier(test_epoch(3)))
1599            .unwrap();
1600        assert_eq!(
1601            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1602                .await
1603                .epoch,
1604            Barrier::new_test_barrier(test_epoch(3)).epoch
1605        );
1606
1607        barrier_tx
1608            .send(Barrier::new_test_barrier(test_epoch(4)))
1609            .unwrap();
1610        assert_eq!(
1611            expect_barrier_with_timeout(&mut executor, "snapshot completion barrier 4")
1612                .await
1613                .epoch,
1614            Barrier::new_test_barrier(test_epoch(4)).epoch
1615        );
1616
1617        barrier_tx
1618            .send(Barrier::new_test_barrier(test_epoch(5)))
1619            .unwrap();
1620        assert_eq!(
1621            expect_chunk_with_timeout(&mut executor, "log-store replay chunk").await,
1622            StreamChunk::from_pretty(
1623                " I
1624                + 4"
1625            )
1626        );
1627        assert_eq!(
1628            expect_barrier_with_timeout(&mut executor, "log-store completion barrier")
1629                .await
1630                .epoch,
1631            Barrier::new_test_barrier(test_epoch(5)).epoch
1632        );
1633
1634        upstream_tx
1635            .send(DispatcherMessage::Chunk(StreamChunk::from_pretty(" I\n + 5")).into())
1636            .await
1637            .unwrap();
1638        let stop_barrier = Barrier::new_test_barrier(test_epoch(6)).with_stop();
1639        upstream_tx
1640            .send(DispatcherMessage::Barrier(stop_barrier.clone().into_dispatcher()).into())
1641            .await
1642            .unwrap();
1643        barrier_tx.send(stop_barrier.clone()).unwrap();
1644
1645        assert_eq!(
1646            expect_chunk_with_timeout(&mut executor, "live upstream chunk after handoff").await,
1647            StreamChunk::from_pretty(" I\n + 5")
1648        );
1649        assert_eq!(
1650            expect_barrier_with_timeout(&mut executor, "final stop barrier")
1651                .await
1652                .epoch,
1653            stop_barrier.epoch
1654        );
1655    }
1656}