Skip to main content

risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::batch_plan::ScanRange;
33use risingwave_pb::common::PbThrottleType;
34use risingwave_storage::StateStore;
35use risingwave_storage::store::PrefetchOptions;
36use risingwave_storage::table::ChangeLogRow;
37use risingwave_storage::table::batch_table::{BatchTable, PkScanRange};
38use tokio::select;
39use tokio::sync::mpsc::UnboundedReceiver;
40use tokio::time::sleep;
41
42use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
43use crate::executor::backfill::snapshot_backfill::state::{
44    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
45};
46use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
47use crate::executor::backfill::utils::{
48    UpstreamStreamKeyUpdateNormalizer, create_builder, mapping_message,
49};
50use crate::executor::monitor::StreamingMetrics;
51use crate::executor::prelude::{StateTable, StreamExt, try_stream};
52use crate::executor::{
53    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
54    MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
55    expect_first_barrier,
56};
57use crate::task::CreateMviewProgressReporter;
58
59pub struct SnapshotBackfillExecutor<S: StateStore> {
60    /// Upstream table
61    upstream_table: BatchTable<S>,
62
63    /// Backfill progress table
64    progress_state_table: StateTable<S>,
65
66    /// Upstream with the same schema with the upstream table.
67    upstream: Option<MergeExecutorInput>,
68
69    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
70    output_indices: Vec<usize>,
71
72    /// Current executor stream-key indices in the output schema.
73    stream_key: Vec<usize>,
74
75    progress: CreateMviewProgressReporter,
76
77    chunk_size: usize,
78    rate_limiter: MonitoredRateLimiter,
79
80    barrier_rx: UnboundedReceiver<Barrier>,
81
82    actor_ctx: ActorContextRef,
83    metrics: Arc<StreamingMetrics>,
84
85    snapshot_epoch: Option<u64>,
86    /// (`eq_prefix`, `range_bounds`) for pk scan range pushdown.
87    pk_scan_range: PkScanRange,
88}
89
90impl<S: StateStore> SnapshotBackfillExecutor<S> {
91    fn build_pk_scan_range(
92        pb_scan_range: Option<&ScanRange>,
93        upstream_table: &BatchTable<S>,
94    ) -> StreamExecutorResult<PkScanRange> {
95        match pb_scan_range {
96            Some(scan_range) => Ok(PkScanRange::new(
97                scan_range.clone(),
98                upstream_table.pk_serializer().get_data_types().to_vec(),
99            )?),
100            None => Ok(PkScanRange::full()),
101        }
102    }
103
104    #[expect(clippy::too_many_arguments)]
105    pub(crate) fn new(
106        upstream_table: BatchTable<S>,
107        progress_state_table: StateTable<S>,
108        upstream: Option<MergeExecutorInput>,
109        pb_pk_scan_range: Option<&ScanRange>,
110        output_indices: Vec<usize>,
111        stream_key: Vec<usize>,
112        actor_ctx: ActorContextRef,
113        progress: CreateMviewProgressReporter,
114        chunk_size: usize,
115        rate_limit: RateLimit,
116        barrier_rx: UnboundedReceiver<Barrier>,
117        metrics: Arc<StreamingMetrics>,
118        snapshot_epoch: Option<u64>,
119    ) -> StreamExecutorResult<Self> {
120        if let Some(upstream) = &upstream {
121            assert_eq!(&upstream.info.schema, upstream_table.schema());
122        }
123        if upstream_table.pk_in_output_indices().is_none() {
124            panic!(
125                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
126                upstream_table.pk_indices(),
127                upstream_table.output_indices(),
128                upstream_table.schema()
129            )
130        };
131        assert!(
132            stream_key.iter().all(|idx| *idx < output_indices.len()),
133            "stream key indices should refer to output schema: stream_key: {:?}, output_indices: {:?}",
134            stream_key,
135            output_indices
136        );
137        let pk_scan_range = Self::build_pk_scan_range(pb_pk_scan_range, &upstream_table)?;
138        if !matches!(rate_limit, RateLimit::Disabled) {
139            trace!(
140                ?rate_limit,
141                "create snapshot backfill executor with rate limit"
142            );
143        }
144        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
145        Ok(Self {
146            upstream_table,
147            progress_state_table,
148            upstream,
149            output_indices,
150            stream_key,
151            progress,
152            chunk_size,
153            rate_limiter,
154            barrier_rx,
155            actor_ctx,
156            metrics,
157            snapshot_epoch,
158            pk_scan_range,
159        })
160    }
161
162    #[try_stream(ok = Message, error = StreamExecutorError)]
163    async fn execute_inner(mut self) {
164        trace!("snapshot backfill executor start");
165        let upstream = if let Some(mut upstream) = self.upstream {
166            let first_upstream_barrier = expect_first_barrier(&mut upstream).await?;
167            trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
168            Some((first_upstream_barrier, upstream))
169        } else {
170            None
171        };
172        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
173        trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
174        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
175            self.snapshot_epoch
176        {
177            if let Some((first_upstream_barrier, _)) = &upstream {
178                if first_upstream_barrier.epoch != first_recv_barrier.epoch {
179                    assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
180                    Some(snapshot_epoch)
181                } else {
182                    None
183                }
184            } else {
185                // must go through snapshot backfill when having no upstream
186                Some(snapshot_epoch)
187            }
188        } else {
189            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
190            if cfg!(debug_assertions) {
191                panic!(
192                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
193                    upstream.map(|(first_upstream_barrier, _)| first_upstream_barrier.epoch),
194                    first_recv_barrier.epoch
195                );
196            } else {
197                let (first_upstream_barrier, _) = upstream
198                    .as_ref()
199                    .ok_or_else(|| anyhow!("no upstream while snapshot epoch not set"))?;
200                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
201                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
202                None
203            }
204        };
205        let first_recv_barrier_epoch = first_recv_barrier.epoch;
206        let initial_backfill_paused =
207            first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
208        yield Message::Barrier(first_recv_barrier);
209        let mut backfill_state = BackfillState::new(
210            self.progress_state_table,
211            first_recv_barrier_epoch,
212            self.upstream_table.pk_serializer().clone(),
213        )
214        .await?;
215
216        let (mut barrier_epoch, mut need_report_finish, upstream) = {
217            if let Some(snapshot_epoch) = should_snapshot_backfill {
218                let table_id_str = format!("{}", self.upstream_table.table_id());
219                let actor_id_str = format!("{}", self.actor_ctx.id);
220
221                let consume_upstream_row_count = self
222                    .metrics
223                    .snapshot_backfill_consume_row_count
224                    .with_guarded_label_values(&[
225                        table_id_str.as_str(),
226                        actor_id_str.as_str(),
227                        "consume_upstream",
228                    ]);
229
230                let mut upstream_buffer = if let Some((first_upstream_barrier, upstream)) = upstream
231                {
232                    SnapshotBackfillUpstream::Buffer(UpstreamBuffer::new(
233                        upstream,
234                        first_upstream_barrier,
235                        consume_upstream_row_count,
236                    ))
237                } else {
238                    SnapshotBackfillUpstream::Empty
239                };
240
241                // Phase 1: consume upstream snapshot
242                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
243                    < snapshot_epoch
244                {
245                    trace!(
246                        table_id = %self.upstream_table.table_id(),
247                        snapshot_epoch,
248                        barrier_epoch = ?first_recv_barrier_epoch,
249                        "start consuming snapshot"
250                    );
251                    {
252                        let consuming_snapshot_row_count = self
253                            .metrics
254                            .snapshot_backfill_consume_row_count
255                            .with_guarded_label_values(&[
256                                table_id_str.as_str(),
257                                actor_id_str.as_str(),
258                                "consuming_snapshot",
259                            ]);
260                        let snapshot_stream = make_consume_snapshot_stream(
261                            &self.upstream_table,
262                            snapshot_epoch,
263                            self.chunk_size,
264                            &self.rate_limiter,
265                            &mut self.barrier_rx,
266                            &mut self.progress,
267                            &mut backfill_state,
268                            first_recv_barrier_epoch,
269                            initial_backfill_paused,
270                            &self.actor_ctx,
271                            &self.pk_scan_range,
272                        );
273
274                        pin_mut!(snapshot_stream);
275
276                        while let Some(message) = upstream_buffer
277                            .run_future(snapshot_stream.try_next())
278                            .await?
279                        {
280                            if let Message::Chunk(chunk) = &message {
281                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
282                            }
283                            yield message;
284                        }
285                    }
286
287                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
288                    let recv_barrier_epoch = recv_barrier.epoch;
289                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
290                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
291                    yield Message::Barrier(recv_barrier);
292                    post_commit.post_yield_barrier(None).await?;
293                    (
294                        recv_barrier_epoch,
295                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
296                    )
297                } else {
298                    trace!(
299                        table_id = %self.upstream_table.table_id(),
300                        snapshot_epoch,
301                        barrier_epoch = ?first_recv_barrier_epoch,
302                        "skip consuming snapshot"
303                    );
304                    (
305                        first_recv_barrier_epoch,
306                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
307                    )
308                };
309
310                // Phase 2: consume upstream log store
311                match upstream_buffer {
312                    Either::Left(mut upstream_buffer) => {
313                        let initial_pending_lag =
314                            if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
315                                &upstream_buffer
316                            {
317                                Some(Duration::from_millis(
318                                    Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
319                                ))
320                            } else {
321                                None
322                            };
323                        trace!(
324                            ?barrier_epoch,
325                            table_id = %self.upstream_table.table_id(),
326                            ?initial_pending_lag,
327                            "start consuming log store"
328                        );
329
330                        let consuming_log_store_row_count = self
331                            .metrics
332                            .snapshot_backfill_consume_row_count
333                            .with_guarded_label_values(&[
334                                table_id_str.as_str(),
335                                actor_id_str.as_str(),
336                                "consuming_log_store",
337                            ]);
338                        let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
339                        loop {
340                            let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
341                            assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
342                            let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
343                            // Disable calling next_epoch, because, if barrier_epoch.prev is a checkpoint epoch,
344                            // next_epoch(barrier_epoch.prev) is actually waiting for the committed epoch.
345                            // However, upstream_buffer's is_polling_epoch_data can be false, since just received
346                            // the checkpoint barrier_epoch.prev. And then the upstream_buffer may stop polling upstream
347                            // when the max_pending_epoch_lag is small. When upstream is not polled, the barrier of the next
348                            // committed epoch cannot be collected.
349                            // {
350                            //     // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
351                            //     // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
352                            //     let next_prev_epoch = upstream_buffer
353                            //         .run_future(self.upstream_table.next_epoch(barrier_epoch.prev))
354                            //         .await?;
355                            //     assert_eq!(next_prev_epoch, barrier.epoch.prev);
356                            // }
357                            barrier_epoch = barrier.epoch;
358                            if barrier.kind.is_checkpoint() {
359                                let pending_non_checkpoint_barrier =
360                                    take(&mut pending_non_checkpoint_barrier);
361                                let end_epoch = barrier_epoch.prev;
362                                let start_epoch = pending_non_checkpoint_barrier
363                                    .first()
364                                    .map(|epoch| epoch.prev)
365                                    .unwrap_or(end_epoch);
366                                trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
367                                // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
368                                // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
369                                // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
370                                let mut stream = upstream_buffer
371                                    .run_future(make_log_stream(
372                                        &self.upstream_table,
373                                        start_epoch,
374                                        end_epoch,
375                                        None,
376                                        self.chunk_size,
377                                    ))
378                                    .await?;
379                                while let Some(chunk) =
380                                    upstream_buffer.run_future(stream.try_next()).await?
381                                {
382                                    trace!(
383                                        ?barrier_epoch,
384                                        size = chunk.cardinality(),
385                                        "consume change log yield chunk",
386                                    );
387                                    consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
388                                    yield Message::Chunk(chunk);
389                                }
390
391                                trace!(?barrier_epoch, "after consume change log");
392
393                                stream
394                                    .for_vnode_pk_progress(|vnode, row_count, progress| {
395                                        assert_eq!(progress, None);
396                                        backfill_state.finish_epoch(
397                                            vnode,
398                                            barrier.epoch.prev,
399                                            row_count,
400                                        );
401                                    })
402                                    .await?;
403                            } else {
404                                pending_non_checkpoint_barrier.push(barrier.epoch);
405                            }
406
407                            if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
408                                &upstream_buffer
409                            {
410                                if is_finished {
411                                    assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
412                                    assert!(pending_non_checkpoint_barrier.is_empty());
413                                    self.progress.finish_consuming_log_store(barrier.epoch);
414                                } else {
415                                    self.progress.update_create_mview_log_store_progress(
416                                        barrier.epoch,
417                                        upstream_buffer.pending_epoch_lag(),
418                                    );
419                                }
420                            }
421
422                            let post_commit = backfill_state.commit(barrier.epoch).await?;
423                            let update_vnode_bitmap =
424                                barrier.as_update_vnode_bitmap(self.actor_ctx.id);
425                            yield Message::Barrier(barrier);
426                            post_commit.post_yield_barrier(None).await?;
427                            if update_vnode_bitmap.is_some() {
428                                return Err(anyhow!(
429                                    "should not update vnode bitmap during consuming log store"
430                                )
431                                .into());
432                            }
433
434                            if is_finished {
435                                assert!(
436                                    pending_non_checkpoint_barrier.is_empty(),
437                                    "{pending_non_checkpoint_barrier:?}"
438                                );
439                                break;
440                            }
441                        }
442                        trace!(
443                            ?barrier_epoch,
444                            table_id = %self.upstream_table.table_id(),
445                            "finish consuming log store"
446                        );
447
448                        (
449                            barrier_epoch,
450                            false,
451                            upstream_buffer.start_consuming_upstream(),
452                        )
453                    }
454                    Either::Right(upstream) => {
455                        trace!(
456                            ?barrier_epoch,
457                            table_id = %self.upstream_table.table_id(),
458                            "skip consuming log store and start consuming upstream directly"
459                        );
460
461                        (barrier_epoch, true, upstream)
462                    }
463                }
464            } else {
465                let (first_upstream_barrier, _) = upstream
466                    .as_ref()
467                    .expect("should have upstream when skipping snapshot backfill");
468                backfill_state
469                    .latest_progress()
470                    .for_each(|(vnode, progress)| {
471                        let progress = progress.expect("should not be empty");
472                        assert_eq!(
473                            progress.epoch, first_upstream_barrier.epoch.prev,
474                            "vnode: {:?}",
475                            vnode
476                        );
477                        assert_eq!(
478                            progress.progress,
479                            EpochBackfillProgress::Consumed,
480                            "vnode: {:?}",
481                            vnode
482                        );
483                    });
484                trace!(
485                    table_id = %self.upstream_table.table_id(),
486                    "skip backfill"
487                );
488                let (first_upstream_barrier, upstream) =
489                    upstream.expect("should have upstream when skipping snapshot backfill");
490                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
491                (first_upstream_barrier.epoch, true, upstream)
492            }
493        };
494        let current_stream_key_indices = self
495            .stream_key
496            .iter()
497            .map(|idx| self.output_indices[*idx])
498            .collect();
499        let update_normalizer = UpstreamStreamKeyUpdateNormalizer::new(
500            &upstream.info.stream_key,
501            current_stream_key_indices,
502        );
503        let mut upstream = upstream.into_executor(self.barrier_rx).execute();
504        let mut epoch_row_count = 0;
505        // Phase 3: consume upstream
506        while let Some(msg) = upstream.try_next().await? {
507            let Some(msg) = update_normalizer.normalize_message(msg) else {
508                continue;
509            };
510            match msg {
511                Message::Barrier(barrier) => {
512                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
513                    self.upstream_table
514                        .vnodes()
515                        .iter_vnodes()
516                        .for_each(|vnode| {
517                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
518                            // executor.
519                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
520                        });
521                    epoch_row_count = 0;
522                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
523                    barrier_epoch = barrier.epoch;
524                    if need_report_finish {
525                        need_report_finish = false;
526                        self.progress.finish_consuming_log_store(barrier_epoch);
527                    }
528                    let post_commit = backfill_state.commit(barrier.epoch).await?;
529                    yield Message::Barrier(barrier);
530                    if let Some(new_vnode_bitmap) =
531                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
532                    {
533                        let _prev_vnode_bitmap =
534                            self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
535                        backfill_state
536                            .latest_progress()
537                            .for_each(|(vnode, progress)| {
538                                let progress = progress.expect("should not be empty");
539                                assert_eq!(
540                                    progress.epoch, barrier_epoch.prev,
541                                    "vnode {:?} has unexpected progress epoch",
542                                    vnode
543                                );
544                                assert_eq!(
545                                    progress.progress,
546                                    EpochBackfillProgress::Consumed,
547                                    "vnode {:?} has unexpected progress",
548                                    vnode
549                                );
550                            });
551                    }
552                }
553                msg => {
554                    if let Message::Chunk(chunk) = &msg {
555                        epoch_row_count += chunk.cardinality();
556                    }
557                    yield msg;
558                }
559            }
560        }
561    }
562}
563
564impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
565    fn execute(self: Box<Self>) -> BoxedMessageStream {
566        let output_indices = self.output_indices.clone();
567        self.execute_inner()
568            .filter_map(move |result| {
569                ready({
570                    match result {
571                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
572                        Err(e) => Some(Err(e)),
573                    }
574                })
575            })
576            .boxed()
577    }
578}
579
580struct ConsumingSnapshot;
581struct ConsumingLogStore;
582
583#[derive(Debug)]
584struct PendingBarriers {
585    first_upstream_barrier_epoch: EpochPair,
586
587    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
588    /// Newer barrier at the front
589    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
590
591    /// In the outer `VecDeque`, newer barriers at the front.
592    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
593    /// and others as non-checkpoint barrier
594    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
595}
596
597impl PendingBarriers {
598    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
599        Self {
600            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
601            pending_non_checkpoint_barriers: Default::default(),
602            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
603                first_upstream_barrier,
604            ])]),
605        }
606    }
607
608    fn add(&mut self, barrier: DispatcherBarrier) {
609        let is_checkpoint = barrier.kind.is_checkpoint();
610        self.pending_non_checkpoint_barriers.push_front(barrier);
611        if is_checkpoint {
612            self.checkpoint_barrier_groups
613                .push_front(take(&mut self.pending_non_checkpoint_barriers));
614        }
615    }
616
617    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
618        self.checkpoint_barrier_groups.pop_back()
619    }
620
621    fn consume_epoch(&mut self, epoch: EpochPair) {
622        let barriers = self
623            .checkpoint_barrier_groups
624            .back_mut()
625            .expect("non-empty");
626        let oldest_upstream_barrier = barriers.back().expect("non-empty");
627        assert!(
628            oldest_upstream_barrier.epoch.prev >= epoch.prev,
629            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
630            oldest_upstream_barrier.epoch,
631            epoch
632        );
633        if oldest_upstream_barrier.epoch.prev == epoch.prev {
634            assert_eq!(oldest_upstream_barrier.epoch, epoch);
635            barriers.pop_back();
636            if barriers.is_empty() {
637                self.checkpoint_barrier_groups.pop_back();
638            }
639        }
640    }
641
642    fn latest_epoch(&self) -> Option<EpochPair> {
643        self.pending_non_checkpoint_barriers
644            .front()
645            .or_else(|| {
646                self.checkpoint_barrier_groups
647                    .front()
648                    .and_then(|barriers| barriers.front())
649            })
650            .map(|barrier| barrier.epoch)
651    }
652
653    fn checkpoint_epoch_count(&self) -> usize {
654        self.checkpoint_barrier_groups.len()
655    }
656
657    fn has_checkpoint_epoch(&self) -> bool {
658        !self.checkpoint_barrier_groups.is_empty()
659    }
660}
661
662enum SnapshotBackfillUpstream<S> {
663    Empty,
664    Buffer(UpstreamBuffer<S>),
665}
666
667impl<S> SnapshotBackfillUpstream<S> {
668    async fn run_future<T, E: Into<StreamExecutorError>>(
669        &mut self,
670        future: impl Future<Output = Result<T, E>>,
671    ) -> StreamExecutorResult<T> {
672        match self {
673            SnapshotBackfillUpstream::Empty => future.await.map_err(Into::into),
674            SnapshotBackfillUpstream::Buffer(buffer) => buffer.run_future(future).await,
675        }
676    }
677}
678
679impl SnapshotBackfillUpstream<ConsumingSnapshot> {
680    fn start_consuming_log_store(
681        self,
682        consumed_epoch: u64,
683    ) -> Either<SnapshotBackfillUpstream<ConsumingLogStore>, MergeExecutorInput> {
684        match self {
685            SnapshotBackfillUpstream::Empty => Either::Left(SnapshotBackfillUpstream::Empty),
686            SnapshotBackfillUpstream::Buffer(buffer) => {
687                match buffer.start_consuming_log_store(consumed_epoch) {
688                    Either::Left(buffer) => Either::Left(SnapshotBackfillUpstream::Buffer(buffer)),
689                    Either::Right(input) => Either::Right(input),
690                }
691            }
692        }
693    }
694}
695
696impl SnapshotBackfillUpstream<ConsumingLogStore> {
697    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
698        match self {
699            SnapshotBackfillUpstream::Empty => Ok(false),
700            SnapshotBackfillUpstream::Buffer(buffer) => buffer.consumed_epoch(epoch).await,
701        }
702    }
703
704    fn start_consuming_upstream(self) -> MergeExecutorInput {
705        match self {
706            SnapshotBackfillUpstream::Empty => {
707                unreachable!("unlike to start consuming upstream when having no upstream")
708            }
709            SnapshotBackfillUpstream::Buffer(buffer) => buffer.start_consuming_upstream(),
710        }
711    }
712}
713
714struct UpstreamBuffer<S> {
715    upstream: MergeExecutorInput,
716    max_pending_epoch_lag: u64,
717    consumed_epoch: u64,
718    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
719    upstream_pending_barriers: PendingBarriers,
720    /// Whether we have started polling any upstream data before the next checkpoint barrier.
721    /// When `true`, we should continue polling until the next checkpoint barrier, because
722    /// some data in this epoch have been discarded and data in this epoch
723    /// must be read from log store
724    is_polling_epoch_data: bool,
725    consume_upstream_row_count: LabelGuardedIntCounter,
726    _phase: S,
727}
728
729impl UpstreamBuffer<ConsumingSnapshot> {
730    fn new(
731        upstream: MergeExecutorInput,
732        first_upstream_barrier: DispatcherBarrier,
733        consume_upstream_row_count: LabelGuardedIntCounter,
734    ) -> Self {
735        Self {
736            upstream,
737            is_polling_epoch_data: false,
738            consume_upstream_row_count,
739            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
740            // no limit on the number of pending barrier in the beginning
741            max_pending_epoch_lag: u64::MAX,
742            consumed_epoch: 0,
743            _phase: ConsumingSnapshot {},
744        }
745    }
746
747    fn start_consuming_log_store(
748        mut self,
749        consumed_epoch: u64,
750    ) -> Either<UpstreamBuffer<ConsumingLogStore>, MergeExecutorInput> {
751        if self
752            .upstream_pending_barriers
753            .first_upstream_barrier_epoch
754            .prev
755            == consumed_epoch
756        {
757            assert_eq!(
758                1,
759                self.upstream_pending_barriers
760                    .pop()
761                    .expect("non-empty")
762                    .len()
763            );
764        }
765        let max_pending_epoch_lag = self.pending_epoch_lag();
766        let buffer = UpstreamBuffer {
767            upstream: self.upstream,
768            upstream_pending_barriers: self.upstream_pending_barriers,
769            max_pending_epoch_lag,
770            is_polling_epoch_data: self.is_polling_epoch_data,
771            consume_upstream_row_count: self.consume_upstream_row_count,
772            consumed_epoch,
773            _phase: ConsumingLogStore {},
774        };
775        if buffer.is_finished() {
776            Either::Right(buffer.upstream)
777        } else {
778            Either::Left(buffer)
779        }
780    }
781}
782
783impl<S> UpstreamBuffer<S> {
784    fn can_consume_upstream(&self) -> bool {
785        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
786    }
787
788    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
789        {
790            loop {
791                if let Err(e) = try {
792                    if !self.can_consume_upstream() {
793                        // pause the future to block consuming upstream
794                        sleep(Duration::from_secs(30)).await;
795                        warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
796                        return pending().await;
797                    }
798                    self.consume_until_next_checkpoint_barrier().await?;
799                } {
800                    break e;
801                }
802            }
803        }
804    }
805
806    /// Consume the upstream until seeing the next barrier.
807    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
808        loop {
809            let msg: DispatcherMessage = self
810                .upstream
811                .try_next()
812                .await?
813                .ok_or_else(|| anyhow!("end of upstream"))?;
814            match msg {
815                DispatcherMessage::Chunk(chunk) => {
816                    self.is_polling_epoch_data = true;
817                    self.consume_upstream_row_count
818                        .inc_by(chunk.cardinality() as _);
819                }
820                DispatcherMessage::Barrier(barrier) => {
821                    let is_checkpoint = barrier.kind.is_checkpoint();
822                    self.upstream_pending_barriers.add(barrier);
823                    if is_checkpoint {
824                        self.is_polling_epoch_data = false;
825                        break;
826                    } else {
827                        self.is_polling_epoch_data = true;
828                    }
829                }
830                DispatcherMessage::Watermark(_) => {
831                    self.is_polling_epoch_data = true;
832                }
833            }
834        }
835        Ok(())
836    }
837}
838
839impl UpstreamBuffer<ConsumingLogStore> {
840    #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
841    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
842        assert!(!self.is_finished());
843        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
844            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
845            // we must have returned true to indicate finish, and should not be called again.
846            assert!(self.is_polling_epoch_data);
847            self.consume_until_next_checkpoint_barrier().await?;
848            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
849        }
850        self.upstream_pending_barriers.consume_epoch(epoch);
851
852        {
853            {
854                let prev_epoch = epoch.prev;
855                assert!(self.consumed_epoch < prev_epoch);
856                let elapsed_epoch = prev_epoch - self.consumed_epoch;
857                self.consumed_epoch = prev_epoch;
858                if self.upstream_pending_barriers.has_checkpoint_epoch() {
859                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
860                    while self.can_consume_upstream()
861                        && let Some(result) =
862                            self.consume_until_next_checkpoint_barrier().now_or_never()
863                    {
864                        result?;
865                    }
866                }
867                // sub to ensure that the lag is monotonically decreasing.
868                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
869                // the upstream can at least progress for one epoch.
870                self.max_pending_epoch_lag = min(
871                    self.pending_epoch_lag(),
872                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
873                );
874            }
875        }
876        Ok(self.is_finished())
877    }
878
879    fn is_finished(&self) -> bool {
880        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
881            assert!(
882                self.upstream_pending_barriers
883                    .pending_non_checkpoint_barriers
884                    .is_empty()
885            )
886        }
887        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
888    }
889
890    fn start_consuming_upstream(self) -> MergeExecutorInput {
891        assert!(self.is_finished());
892        assert_eq!(self.pending_epoch_lag(), 0);
893        self.upstream
894    }
895}
896
897impl<S> UpstreamBuffer<S> {
898    /// Run a future while concurrently polling the upstream so that the upstream
899    /// won't be back-pressured.
900    async fn run_future<T, E: Into<StreamExecutorError>>(
901        &mut self,
902        future: impl Future<Output = Result<T, E>>,
903    ) -> StreamExecutorResult<T> {
904        select! {
905            biased;
906            e = self.concurrently_consume_upstream() => {
907                Err(e)
908            }
909            // this arm won't be starved, because the first arm is always pending unless returning with error
910            result = future => {
911                result.map_err(Into::into)
912            }
913        }
914    }
915
916    fn pending_epoch_lag(&self) -> u64 {
917        self.upstream_pending_barriers
918            .latest_epoch()
919            .map(|epoch| {
920                epoch
921                    .prev
922                    .checked_sub(self.consumed_epoch)
923                    .expect("pending epoch must be later than consumed_epoch")
924            })
925            .unwrap_or(0)
926    }
927}
928
929#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
930async fn make_log_stream(
931    upstream_table: &BatchTable<impl StateStore>,
932    start_epoch: u64,
933    end_epoch: u64,
934    start_pk: Option<OwnedRow>,
935    chunk_size: usize,
936) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
937    let data_types = upstream_table.schema().data_types();
938    let start_pk = start_pk.as_ref();
939    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
940    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
941        upstream_table
942            .batch_iter_vnode_log(
943                start_epoch,
944                HummockReadEpoch::Committed(end_epoch),
945                start_pk,
946                vnode,
947            )
948            .map_ok(move |stream| {
949                let stream = stream.map_err(Into::into);
950                (vnode, stream, 0)
951            })
952    }))
953    .await?;
954    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
955    Ok(VnodeStream::new(
956        vnode_streams,
957        upstream_table.pk_in_output_indices().expect("should exist"),
958        builder,
959    ))
960}
961
962async fn make_snapshot_stream(
963    upstream_table: &BatchTable<impl StateStore>,
964    snapshot_epoch: u64,
965    backfill_state: &BackfillState<impl StateStore>,
966    rate_limit: RateLimit,
967    chunk_size: usize,
968    snapshot_rebuild_interval: Duration,
969    pk_scan_range: &PkScanRange,
970) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
971    let data_types = upstream_table.schema().data_types();
972    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
973        move |(vnode, progress)| {
974            let start_pk = match progress {
975                None => Some((None, 0)),
976                Some(VnodeBackfillProgress {
977                    row_count,
978                    progress: EpochBackfillProgress::Consuming { latest_pk },
979                    ..
980                }) => Some((Some(latest_pk), *row_count)),
981                Some(VnodeBackfillProgress {
982                    progress: EpochBackfillProgress::Consumed,
983                    ..
984                }) => None,
985            };
986            start_pk.map(|(start_pk, row_count)| {
987                upstream_table
988                    .batch_iter_vnode_with_pk_range(
989                        HummockReadEpoch::Committed(snapshot_epoch),
990                        start_pk,
991                        &pk_scan_range.pk_prefix,
992                        &pk_scan_range.range_bounds,
993                        vnode,
994                        PrefetchOptions::prefetch_for_large_range_scan(),
995                        snapshot_rebuild_interval,
996                    )
997                    .map_ok(move |stream| {
998                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
999                        (vnode, stream, row_count)
1000                    })
1001            })
1002        },
1003    ))
1004    .await?;
1005    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
1006    Ok(VnodeStream::new(
1007        vnode_streams,
1008        upstream_table.pk_in_output_indices().expect("should exist"),
1009        builder,
1010    ))
1011}
1012
1013#[expect(clippy::too_many_arguments)]
1014#[try_stream(ok = Message, error = StreamExecutorError)]
1015async fn make_consume_snapshot_stream<'a, S: StateStore>(
1016    upstream_table: &'a BatchTable<S>,
1017    snapshot_epoch: u64,
1018    chunk_size: usize,
1019    rate_limiter: &'a MonitoredRateLimiter,
1020    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
1021    progress: &'a mut CreateMviewProgressReporter,
1022    backfill_state: &'a mut BackfillState<S>,
1023    first_recv_barrier_epoch: EpochPair,
1024    initial_backfill_paused: bool,
1025    actor_ctx: &'a ActorContextRef,
1026    pk_scan_range: &'a PkScanRange,
1027) {
1028    let mut barrier_epoch = first_recv_barrier_epoch;
1029
1030    // start consume upstream snapshot
1031    let mut snapshot_stream = make_snapshot_stream(
1032        upstream_table,
1033        snapshot_epoch,
1034        &*backfill_state,
1035        rate_limiter.rate_limit(),
1036        chunk_size,
1037        actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
1038        pk_scan_range,
1039    )
1040    .await?;
1041
1042    async fn select_barrier_and_snapshot_stream(
1043        barrier_rx: &mut UnboundedReceiver<Barrier>,
1044        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
1045        throttle_snapshot_stream: bool,
1046        backfill_paused: bool,
1047    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
1048        select!(
1049            result = receive_next_barrier(barrier_rx) => {
1050                Ok(Either::Left(result?))
1051            },
1052            result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
1053                Ok(Either::Right(result?))
1054            }
1055        )
1056    }
1057
1058    let mut backfill_paused = initial_backfill_paused;
1059    loop {
1060        let throttle_snapshot_stream = matches!(rate_limiter.rate_limit(), RateLimit::Pause);
1061        match select_barrier_and_snapshot_stream(
1062            barrier_rx,
1063            &mut snapshot_stream,
1064            throttle_snapshot_stream,
1065            backfill_paused,
1066        )
1067        .await?
1068        {
1069            Either::Left(barrier) => {
1070                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1071                barrier_epoch = barrier.epoch;
1072
1073                if barrier_epoch.curr >= snapshot_epoch {
1074                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
1075                }
1076                if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
1077                    backfill_paused = false;
1078                }
1079                if let Some(chunk) = snapshot_stream.consume_builder() {
1080                    rate_limiter.wait(chunk.cardinality() as _).await;
1081                    yield Message::Chunk(chunk);
1082                }
1083                snapshot_stream
1084                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1085                        if let Some(pk) = pk_progress {
1086                            backfill_state.update_epoch_progress(
1087                                vnode,
1088                                snapshot_epoch,
1089                                row_count,
1090                                pk,
1091                            );
1092                        } else {
1093                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1094                        }
1095                    })
1096                    .await?;
1097                let count = backfill_state.total_row_count();
1098                let post_commit = backfill_state.commit(barrier.epoch).await?;
1099                trace!(?barrier_epoch, count, "update progress");
1100                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
1101
1102                let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
1103                    if let Mutation::Throttle(config) = &**m
1104                        && let Some(config) = config.get(&actor_ctx.fragment_id)
1105                        && config.throttle_type() == PbThrottleType::Backfill
1106                    {
1107                        Some(config.rate_limit)
1108                    } else {
1109                        None
1110                    }
1111                });
1112                yield Message::Barrier(barrier);
1113                post_commit.post_yield_barrier(None).await?;
1114
1115                if let Some(new_rate_limit) = new_rate_limit {
1116                    let new_rate_limit = new_rate_limit.into();
1117                    rate_limiter.update(new_rate_limit);
1118                    snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
1119                }
1120            }
1121            Either::Right(Some(chunk)) => {
1122                if backfill_paused {
1123                    return Err(
1124                        anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
1125                    );
1126                }
1127                rate_limiter.wait(chunk.cardinality() as _).await;
1128                yield Message::Chunk(chunk);
1129            }
1130            Either::Right(None) => {
1131                break;
1132            }
1133        }
1134    }
1135
1136    // finish consuming upstream snapshot, report finish
1137    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
1138    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
1139    barrier_epoch = barrier_to_report_finish.epoch;
1140    snapshot_stream
1141        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1142            assert_eq!(pk_progress, None);
1143            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1144        })
1145        .await?;
1146    let count = backfill_state.total_row_count();
1147    trace!(?barrier_epoch, count, "report finish");
1148    let post_commit = backfill_state.commit(barrier_epoch).await?;
1149    progress.finish(barrier_epoch, count as _);
1150    yield Message::Barrier(barrier_to_report_finish);
1151    post_commit.post_yield_barrier(None).await?;
1152
1153    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
1154    loop {
1155        let barrier = receive_next_barrier(barrier_rx).await?;
1156        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1157        barrier_epoch = barrier.epoch;
1158        let post_commit = backfill_state.commit(barrier.epoch).await?;
1159        yield Message::Barrier(barrier);
1160        post_commit.post_yield_barrier(None).await?;
1161        if barrier_epoch.curr == snapshot_epoch {
1162            break;
1163        }
1164    }
1165    trace!(?barrier_epoch, "finish consuming snapshot");
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170    use std::collections::HashSet;
1171    use std::sync::Arc;
1172
1173    use risingwave_common::array::StreamChunk;
1174    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1175    use risingwave_common::row::OwnedRow;
1176    use risingwave_common::test_prelude::StreamChunkTestExt;
1177    use risingwave_common::types::DataType;
1178    use risingwave_common::util::epoch::{EpochPair, test_epoch};
1179    use risingwave_common::util::sort_util::OrderType;
1180    use risingwave_hummock_test::test_utils::{HummockTestEnv, prepare_hummock_test_env};
1181    use risingwave_rpc_client::HummockMetaClient;
1182    use risingwave_storage::hummock::HummockStorage;
1183    use risingwave_storage::table::batch_table::BatchTable;
1184    use tokio::sync::mpsc::unbounded_channel;
1185    use tokio::time::{Duration, timeout};
1186
1187    use super::*;
1188    use crate::common::table::state_table::{
1189        StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
1190    };
1191    use crate::common::table::test_utils::gen_pbtable_with_value_indices;
1192    use crate::executor::exchange::input::{Input, LocalInput};
1193    use crate::executor::exchange::permit::channel_for_test;
1194    use crate::executor::{ActorContext, DispatcherMessage, ExecutorInfo, MergeExecutorUpstream};
1195    use crate::task::LocalBarrierManager;
1196
1197    const SOURCE_TABLE_ID: TableId = TableId::new(0x233);
1198    const PROGRESS_TABLE_ID: TableId = TableId::new(0x234);
1199
1200    fn source_table_pb() -> risingwave_pb::catalog::PbTable {
1201        gen_pbtable_with_value_indices(
1202            SOURCE_TABLE_ID,
1203            vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1204            vec![OrderType::ascending()],
1205            vec![0],
1206            0,
1207            vec![0],
1208        )
1209    }
1210
1211    fn progress_table_pb() -> risingwave_pb::catalog::PbTable {
1212        gen_pbtable_with_value_indices(
1213            PROGRESS_TABLE_ID,
1214            vec![
1215                ColumnDesc::unnamed(ColumnId::new(0), DataType::Int16),
1216                ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1217                ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1218                ColumnDesc::unnamed(ColumnId::new(3), DataType::Boolean),
1219                ColumnDesc::unnamed(ColumnId::new(4), DataType::Int64),
1220            ],
1221            vec![OrderType::ascending()],
1222            vec![0],
1223            1,
1224            vec![1, 2, 3, 4],
1225        )
1226    }
1227
1228    fn source_batch_table(store: HummockStorage) -> BatchTable<HummockStorage> {
1229        BatchTable::for_test(
1230            store,
1231            SOURCE_TABLE_ID,
1232            vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1233            vec![OrderType::ascending()],
1234            vec![0],
1235            vec![0],
1236        )
1237    }
1238
1239    async fn source_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1240        StateTableBuilder::new(&source_table_pb(), store, None)
1241            .with_op_consistency_level(StateTableOpConsistencyLevel::LogStoreEnabled)
1242            .forbid_preload_all_rows()
1243            .build()
1244            .await
1245    }
1246
1247    async fn progress_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1248        StateTable::from_table_catalog(&progress_table_pb(), store, None).await
1249    }
1250
1251    async fn commit_insert_epoch(
1252        test_env: &HummockTestEnv,
1253        source_state_table: &mut StateTable<HummockStorage>,
1254        epoch: &mut EpochPair,
1255        table_ids: HashSet<TableId>,
1256        values: &[i64],
1257    ) {
1258        for value in values {
1259            source_state_table.insert(OwnedRow::new(vec![Some((*value).into())]));
1260        }
1261        epoch.inc_for_test();
1262        test_env.storage.start_epoch(epoch.curr, table_ids);
1263        source_state_table.commit_for_test(*epoch).await.unwrap();
1264        let res = test_env
1265            .storage
1266            .seal_and_sync_epoch(epoch.prev, HashSet::from_iter([SOURCE_TABLE_ID]))
1267            .await
1268            .unwrap();
1269        test_env
1270            .meta_client
1271            .commit_epoch_with_change_log(epoch.prev, res, Some(vec![epoch.prev]))
1272            .await
1273            .unwrap();
1274        test_env
1275            .storage
1276            .wait_version(test_env.manager.get_current_version().await)
1277            .await;
1278    }
1279
1280    fn start_progress_epochs(test_env: &HummockTestEnv, max_epoch: u64) {
1281        for epoch in 1..=max_epoch {
1282            test_env
1283                .storage
1284                .start_epoch(test_epoch(epoch), HashSet::from_iter([PROGRESS_TABLE_ID]));
1285        }
1286    }
1287
1288    fn make_upstream_input(
1289        barrier_manager: LocalBarrierManager,
1290        actor_ctx: ActorContextRef,
1291        rx: crate::executor::exchange::permit::Receiver,
1292    ) -> MergeExecutorInput {
1293        MergeExecutorInput::new(
1294            MergeExecutorUpstream::Singleton(LocalInput::new(rx, 1001.into()).boxed_input()),
1295            actor_ctx,
1296            1919.into(),
1297            barrier_manager,
1298            Arc::new(StreamingMetrics::unused()),
1299            ExecutorInfo::for_test(
1300                Schema::new(vec![Field::unnamed(DataType::Int64)]),
1301                vec![0],
1302                "SnapshotBackfillUpstream".to_owned(),
1303                0,
1304            ),
1305        )
1306    }
1307
1308    async fn expect_barrier_with_timeout(
1309        executor: &mut BoxedMessageStream,
1310        reason: &str,
1311    ) -> Barrier {
1312        let message = timeout(Duration::from_secs(10), executor.next())
1313            .await
1314            .unwrap_or_else(|_| panic!("timed out waiting for barrier: {reason}"))
1315            .unwrap()
1316            .unwrap();
1317        match message {
1318            Message::Barrier(barrier) => barrier,
1319            other => panic!("expected barrier for {reason}, got {other:?}"),
1320        }
1321    }
1322
1323    async fn expect_chunk_with_timeout(
1324        executor: &mut BoxedMessageStream,
1325        reason: &str,
1326    ) -> StreamChunk {
1327        let message = timeout(Duration::from_secs(10), executor.next())
1328            .await
1329            .unwrap_or_else(|_| panic!("timed out waiting for chunk: {reason}"))
1330            .unwrap()
1331            .unwrap();
1332        match message {
1333            Message::Chunk(chunk) => chunk,
1334            other => panic!("expected chunk for {reason}, got {other:?}"),
1335        }
1336    }
1337
1338    async fn expect_pending_with_timeout(executor: &mut BoxedMessageStream, reason: &str) {
1339        assert!(
1340            timeout(Duration::from_millis(200), executor.next())
1341                .await
1342                .is_err(),
1343            "executor unexpectedly produced a message while waiting for {reason}"
1344        );
1345    }
1346
1347    #[tokio::test]
1348    async fn test_snapshot_backfill_without_upstream_on_hummock() {
1349        let source_env = prepare_hummock_test_env().await;
1350        source_env.register_table(source_table_pb()).await;
1351        let progress_env = prepare_hummock_test_env().await;
1352        progress_env.register_table(progress_table_pb()).await;
1353
1354        let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1355        let source_table = source_batch_table(source_env.storage.clone());
1356        let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1357
1358        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1359        source_env
1360            .storage
1361            .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1362        source_state_table.init_epoch(epoch).await.unwrap();
1363
1364        commit_insert_epoch(
1365            &source_env,
1366            &mut source_state_table,
1367            &mut epoch,
1368            HashSet::from_iter([SOURCE_TABLE_ID]),
1369            &[1],
1370        )
1371        .await;
1372        commit_insert_epoch(
1373            &source_env,
1374            &mut source_state_table,
1375            &mut epoch,
1376            HashSet::from_iter([SOURCE_TABLE_ID]),
1377            &[2],
1378        )
1379        .await;
1380        commit_insert_epoch(
1381            &source_env,
1382            &mut source_state_table,
1383            &mut epoch,
1384            HashSet::from_iter([SOURCE_TABLE_ID]),
1385            &[3],
1386        )
1387        .await;
1388        commit_insert_epoch(
1389            &source_env,
1390            &mut source_state_table,
1391            &mut epoch,
1392            HashSet::from_iter([SOURCE_TABLE_ID]),
1393            &[],
1394        )
1395        .await;
1396        start_progress_epochs(&progress_env, 5);
1397
1398        let barrier_manager = LocalBarrierManager::for_test();
1399        let progress = CreateMviewProgressReporter::for_test(barrier_manager);
1400        let actor_ctx = ActorContext::for_test(1234);
1401        let (barrier_tx, barrier_rx) = unbounded_channel();
1402        barrier_tx
1403            .send(Barrier::new_test_barrier(test_epoch(1)))
1404            .unwrap();
1405
1406        let mut executor = SnapshotBackfillExecutor::new(
1407            source_table,
1408            progress_state_table,
1409            None,
1410            None,
1411            vec![0],
1412            vec![0],
1413            actor_ctx,
1414            progress,
1415            1024,
1416            RateLimit::Disabled,
1417            barrier_rx,
1418            Arc::new(StreamingMetrics::unused()),
1419            Some(test_epoch(3)),
1420        )
1421        .expect("snapshot backfill executor should be created")
1422        .boxed()
1423        .execute();
1424
1425        assert_eq!(
1426            expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1427                .await
1428                .epoch,
1429            Barrier::new_test_barrier(test_epoch(1)).epoch
1430        );
1431        assert_eq!(
1432            expect_chunk_with_timeout(&mut executor, "snapshot chunk without upstream").await,
1433            StreamChunk::from_pretty(
1434                " I
1435                + 1
1436                + 2
1437                + 3"
1438            )
1439        );
1440        expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1441
1442        barrier_tx
1443            .send(Barrier::new_test_barrier(test_epoch(2)))
1444            .unwrap();
1445        assert_eq!(
1446            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1447                .await
1448                .epoch,
1449            Barrier::new_test_barrier(test_epoch(2)).epoch
1450        );
1451
1452        barrier_tx
1453            .send(Barrier::new_test_barrier(test_epoch(3)))
1454            .unwrap();
1455        assert_eq!(
1456            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1457                .await
1458                .epoch,
1459            Barrier::new_test_barrier(test_epoch(3)).epoch
1460        );
1461
1462        barrier_tx
1463            .send(Barrier::new_test_barrier(test_epoch(4)))
1464            .unwrap();
1465        assert_eq!(
1466            expect_barrier_with_timeout(&mut executor, "post-snapshot barrier 4")
1467                .await
1468                .epoch,
1469            Barrier::new_test_barrier(test_epoch(4)).epoch
1470        );
1471
1472        barrier_tx
1473            .send(Barrier::new_test_barrier(test_epoch(5)))
1474            .unwrap();
1475        assert_eq!(
1476            expect_barrier_with_timeout(&mut executor, "steady-state barrier 5")
1477                .await
1478                .epoch,
1479            Barrier::new_test_barrier(test_epoch(5)).epoch
1480        );
1481
1482        expect_pending_with_timeout(&mut executor, "next local barrier").await;
1483    }
1484
1485    #[tokio::test]
1486    async fn test_snapshot_backfill_with_upstream_on_hummock() {
1487        let source_env = prepare_hummock_test_env().await;
1488        source_env.register_table(source_table_pb()).await;
1489        let progress_env = prepare_hummock_test_env().await;
1490        progress_env.register_table(progress_table_pb()).await;
1491
1492        let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1493        let source_table = source_batch_table(source_env.storage.clone());
1494        let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1495
1496        let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1497        source_env
1498            .storage
1499            .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1500        source_state_table.init_epoch(epoch).await.unwrap();
1501
1502        commit_insert_epoch(
1503            &source_env,
1504            &mut source_state_table,
1505            &mut epoch,
1506            HashSet::from_iter([SOURCE_TABLE_ID]),
1507            &[],
1508        )
1509        .await;
1510        commit_insert_epoch(
1511            &source_env,
1512            &mut source_state_table,
1513            &mut epoch,
1514            HashSet::from_iter([SOURCE_TABLE_ID]),
1515            &[],
1516        )
1517        .await;
1518        commit_insert_epoch(
1519            &source_env,
1520            &mut source_state_table,
1521            &mut epoch,
1522            HashSet::from_iter([SOURCE_TABLE_ID]),
1523            &[],
1524        )
1525        .await;
1526        commit_insert_epoch(
1527            &source_env,
1528            &mut source_state_table,
1529            &mut epoch,
1530            HashSet::from_iter([SOURCE_TABLE_ID]),
1531            &[4],
1532        )
1533        .await;
1534        start_progress_epochs(&progress_env, 6);
1535
1536        let barrier_manager = LocalBarrierManager::for_test();
1537        let progress = CreateMviewProgressReporter::for_test(barrier_manager.clone());
1538        let actor_ctx = ActorContext::for_test(1235);
1539        let (barrier_tx, barrier_rx) = unbounded_channel();
1540        let (upstream_tx, upstream_rx) = channel_for_test();
1541
1542        upstream_tx
1543            .send(
1544                DispatcherMessage::Barrier(
1545                    Barrier::new_test_barrier(test_epoch(5)).into_dispatcher(),
1546                )
1547                .into(),
1548            )
1549            .await
1550            .unwrap();
1551        barrier_tx
1552            .send(Barrier::new_test_barrier(test_epoch(1)))
1553            .unwrap();
1554
1555        let mut executor = SnapshotBackfillExecutor::new(
1556            source_table,
1557            progress_state_table,
1558            Some(make_upstream_input(
1559                barrier_manager,
1560                actor_ctx.clone(),
1561                upstream_rx,
1562            )),
1563            None,
1564            vec![0],
1565            vec![0],
1566            actor_ctx,
1567            progress,
1568            1024,
1569            RateLimit::Disabled,
1570            barrier_rx,
1571            Arc::new(StreamingMetrics::unused()),
1572            Some(test_epoch(3)),
1573        )
1574        .expect("snapshot backfill executor should be created")
1575        .boxed()
1576        .execute();
1577
1578        assert_eq!(
1579            expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1580                .await
1581                .epoch,
1582            Barrier::new_test_barrier(test_epoch(1)).epoch
1583        );
1584        expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1585        barrier_tx
1586            .send(Barrier::new_test_barrier(test_epoch(2)))
1587            .unwrap();
1588        assert_eq!(
1589            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1590                .await
1591                .epoch,
1592            Barrier::new_test_barrier(test_epoch(2)).epoch
1593        );
1594
1595        barrier_tx
1596            .send(Barrier::new_test_barrier(test_epoch(3)))
1597            .unwrap();
1598        assert_eq!(
1599            expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1600                .await
1601                .epoch,
1602            Barrier::new_test_barrier(test_epoch(3)).epoch
1603        );
1604
1605        barrier_tx
1606            .send(Barrier::new_test_barrier(test_epoch(4)))
1607            .unwrap();
1608        assert_eq!(
1609            expect_barrier_with_timeout(&mut executor, "snapshot completion barrier 4")
1610                .await
1611                .epoch,
1612            Barrier::new_test_barrier(test_epoch(4)).epoch
1613        );
1614
1615        barrier_tx
1616            .send(Barrier::new_test_barrier(test_epoch(5)))
1617            .unwrap();
1618        assert_eq!(
1619            expect_chunk_with_timeout(&mut executor, "log-store replay chunk").await,
1620            StreamChunk::from_pretty(
1621                " I
1622                + 4"
1623            )
1624        );
1625        assert_eq!(
1626            expect_barrier_with_timeout(&mut executor, "log-store completion barrier")
1627                .await
1628                .epoch,
1629            Barrier::new_test_barrier(test_epoch(5)).epoch
1630        );
1631
1632        upstream_tx
1633            .send(DispatcherMessage::Chunk(StreamChunk::from_pretty(" I\n + 5")).into())
1634            .await
1635            .unwrap();
1636        let stop_barrier = Barrier::new_test_barrier(test_epoch(6)).with_stop();
1637        upstream_tx
1638            .send(DispatcherMessage::Barrier(stop_barrier.clone().into_dispatcher()).into())
1639            .await
1640            .unwrap();
1641        barrier_tx.send(stop_barrier.clone()).unwrap();
1642
1643        assert_eq!(
1644            expect_chunk_with_timeout(&mut executor, "live upstream chunk after handoff").await,
1645            StreamChunk::from_pretty(" I\n + 5")
1646        );
1647        assert_eq!(
1648            expect_barrier_with_timeout(&mut executor, "final stop barrier")
1649                .await
1650                .epoch,
1651            stop_barrier.epoch
1652        );
1653    }
1654}