1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::common::PbThrottleType;
33use risingwave_storage::StateStore;
34use risingwave_storage::store::PrefetchOptions;
35use risingwave_storage::table::ChangeLogRow;
36use risingwave_storage::table::batch_table::BatchTable;
37use tokio::select;
38use tokio::sync::mpsc::UnboundedReceiver;
39use tokio::time::sleep;
40
41use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
42use crate::executor::backfill::snapshot_backfill::state::{
43 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
44};
45use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
46use crate::executor::backfill::utils::{create_builder, mapping_message};
47use crate::executor::monitor::StreamingMetrics;
48use crate::executor::prelude::{StateTable, StreamExt, try_stream};
49use crate::executor::{
50 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
51 MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
52 expect_first_barrier,
53};
54use crate::task::CreateMviewProgressReporter;
55
56pub struct SnapshotBackfillExecutor<S: StateStore> {
57 upstream_table: BatchTable<S>,
59
60 progress_state_table: StateTable<S>,
62
63 upstream: Option<MergeExecutorInput>,
65
66 output_indices: Vec<usize>,
68
69 progress: CreateMviewProgressReporter,
70
71 chunk_size: usize,
72 rate_limit: RateLimit,
73
74 barrier_rx: UnboundedReceiver<Barrier>,
75
76 actor_ctx: ActorContextRef,
77 metrics: Arc<StreamingMetrics>,
78
79 snapshot_epoch: Option<u64>,
80}
81
82impl<S: StateStore> SnapshotBackfillExecutor<S> {
83 #[expect(clippy::too_many_arguments)]
84 pub(crate) fn new(
85 upstream_table: BatchTable<S>,
86 progress_state_table: StateTable<S>,
87 upstream: Option<MergeExecutorInput>,
88 output_indices: Vec<usize>,
89 actor_ctx: ActorContextRef,
90 progress: CreateMviewProgressReporter,
91 chunk_size: usize,
92 rate_limit: RateLimit,
93 barrier_rx: UnboundedReceiver<Barrier>,
94 metrics: Arc<StreamingMetrics>,
95 snapshot_epoch: Option<u64>,
96 ) -> Self {
97 if let Some(upstream) = &upstream {
98 assert_eq!(&upstream.info.schema, upstream_table.schema());
99 }
100 if upstream_table.pk_in_output_indices().is_none() {
101 panic!(
102 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
103 upstream_table.pk_indices(),
104 upstream_table.output_indices(),
105 upstream_table.schema()
106 )
107 };
108 if !matches!(rate_limit, RateLimit::Disabled) {
109 trace!(
110 ?rate_limit,
111 "create snapshot backfill executor with rate limit"
112 );
113 }
114 Self {
115 upstream_table,
116 progress_state_table,
117 upstream,
118 output_indices,
119 progress,
120 chunk_size,
121 rate_limit,
122 barrier_rx,
123 actor_ctx,
124 metrics,
125 snapshot_epoch,
126 }
127 }
128
129 #[try_stream(ok = Message, error = StreamExecutorError)]
130 async fn execute_inner(mut self) {
131 trace!("snapshot backfill executor start");
132 let upstream = if let Some(mut upstream) = self.upstream {
133 let first_upstream_barrier = expect_first_barrier(&mut upstream).await?;
134 trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
135 Some((first_upstream_barrier, upstream))
136 } else {
137 None
138 };
139 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
140 trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
141 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
142 self.snapshot_epoch
143 {
144 if let Some((first_upstream_barrier, _)) = &upstream {
145 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
146 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
147 Some(snapshot_epoch)
148 } else {
149 None
150 }
151 } else {
152 Some(snapshot_epoch)
154 }
155 } else {
156 if cfg!(debug_assertions) {
158 panic!(
159 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
160 upstream.map(|(first_upstream_barrier, _)| first_upstream_barrier.epoch),
161 first_recv_barrier.epoch
162 );
163 } else {
164 let (first_upstream_barrier, _) = upstream
165 .as_ref()
166 .ok_or_else(|| anyhow!("no upstream while snapshot epoch not set"))?;
167 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
168 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
169 None
170 }
171 };
172 let first_recv_barrier_epoch = first_recv_barrier.epoch;
173 let initial_backfill_paused =
174 first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
175 yield Message::Barrier(first_recv_barrier);
176 let mut backfill_state = BackfillState::new(
177 self.progress_state_table,
178 first_recv_barrier_epoch,
179 self.upstream_table.pk_serializer().clone(),
180 )
181 .await?;
182
183 let (mut barrier_epoch, mut need_report_finish, upstream) = {
184 if let Some(snapshot_epoch) = should_snapshot_backfill {
185 let table_id_str = format!("{}", self.upstream_table.table_id());
186 let actor_id_str = format!("{}", self.actor_ctx.id);
187
188 let consume_upstream_row_count = self
189 .metrics
190 .snapshot_backfill_consume_row_count
191 .with_guarded_label_values(&[
192 table_id_str.as_str(),
193 actor_id_str.as_str(),
194 "consume_upstream",
195 ]);
196
197 let mut upstream_buffer = if let Some((first_upstream_barrier, upstream)) = upstream
198 {
199 SnapshotBackfillUpstream::Buffer(UpstreamBuffer::new(
200 upstream,
201 first_upstream_barrier,
202 consume_upstream_row_count,
203 ))
204 } else {
205 SnapshotBackfillUpstream::Empty
206 };
207
208 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
210 < snapshot_epoch
211 {
212 trace!(
213 table_id = %self.upstream_table.table_id(),
214 snapshot_epoch,
215 barrier_epoch = ?first_recv_barrier_epoch,
216 "start consuming snapshot"
217 );
218 {
219 let consuming_snapshot_row_count = self
220 .metrics
221 .snapshot_backfill_consume_row_count
222 .with_guarded_label_values(&[
223 table_id_str.as_str(),
224 actor_id_str.as_str(),
225 "consuming_snapshot",
226 ]);
227 let snapshot_stream = make_consume_snapshot_stream(
228 &self.upstream_table,
229 snapshot_epoch,
230 self.chunk_size,
231 &mut self.rate_limit,
232 &mut self.barrier_rx,
233 &mut self.progress,
234 &mut backfill_state,
235 first_recv_barrier_epoch,
236 initial_backfill_paused,
237 &self.actor_ctx,
238 );
239
240 pin_mut!(snapshot_stream);
241
242 while let Some(message) = upstream_buffer
243 .run_future(snapshot_stream.try_next())
244 .await?
245 {
246 if let Message::Chunk(chunk) = &message {
247 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
248 }
249 yield message;
250 }
251 }
252
253 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
254 let recv_barrier_epoch = recv_barrier.epoch;
255 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
256 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
257 yield Message::Barrier(recv_barrier);
258 post_commit.post_yield_barrier(None).await?;
259 (
260 recv_barrier_epoch,
261 upstream_buffer.start_consuming_log_store(snapshot_epoch),
262 )
263 } else {
264 trace!(
265 table_id = %self.upstream_table.table_id(),
266 snapshot_epoch,
267 barrier_epoch = ?first_recv_barrier_epoch,
268 "skip consuming snapshot"
269 );
270 (
271 first_recv_barrier_epoch,
272 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
273 )
274 };
275
276 match upstream_buffer {
278 Either::Left(mut upstream_buffer) => {
279 let initial_pending_lag =
280 if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
281 &upstream_buffer
282 {
283 Some(Duration::from_millis(
284 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
285 ))
286 } else {
287 None
288 };
289 trace!(
290 ?barrier_epoch,
291 table_id = %self.upstream_table.table_id(),
292 ?initial_pending_lag,
293 "start consuming log store"
294 );
295
296 let consuming_log_store_row_count = self
297 .metrics
298 .snapshot_backfill_consume_row_count
299 .with_guarded_label_values(&[
300 table_id_str.as_str(),
301 actor_id_str.as_str(),
302 "consuming_log_store",
303 ]);
304 let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
305 loop {
306 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
307 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
308 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
309 barrier_epoch = barrier.epoch;
324 if barrier.kind.is_checkpoint() {
325 let pending_non_checkpoint_barrier =
326 take(&mut pending_non_checkpoint_barrier);
327 let end_epoch = barrier_epoch.prev;
328 let start_epoch = pending_non_checkpoint_barrier
329 .first()
330 .map(|epoch| epoch.prev)
331 .unwrap_or(end_epoch);
332 trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
333 let mut stream = upstream_buffer
337 .run_future(make_log_stream(
338 &self.upstream_table,
339 start_epoch,
340 end_epoch,
341 None,
342 self.chunk_size,
343 ))
344 .await?;
345 while let Some(chunk) =
346 upstream_buffer.run_future(stream.try_next()).await?
347 {
348 trace!(
349 ?barrier_epoch,
350 size = chunk.cardinality(),
351 "consume change log yield chunk",
352 );
353 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
354 yield Message::Chunk(chunk);
355 }
356
357 trace!(?barrier_epoch, "after consume change log");
358
359 stream
360 .for_vnode_pk_progress(|vnode, row_count, progress| {
361 assert_eq!(progress, None);
362 backfill_state.finish_epoch(
363 vnode,
364 barrier.epoch.prev,
365 row_count,
366 );
367 })
368 .await?;
369 } else {
370 pending_non_checkpoint_barrier.push(barrier.epoch);
371 }
372
373 if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
374 &upstream_buffer
375 {
376 if is_finished {
377 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
378 assert!(pending_non_checkpoint_barrier.is_empty());
379 self.progress.finish_consuming_log_store(barrier.epoch);
380 } else {
381 self.progress.update_create_mview_log_store_progress(
382 barrier.epoch,
383 upstream_buffer.pending_epoch_lag(),
384 );
385 }
386 }
387
388 let post_commit = backfill_state.commit(barrier.epoch).await?;
389 let update_vnode_bitmap =
390 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
391 yield Message::Barrier(barrier);
392 post_commit.post_yield_barrier(None).await?;
393 if update_vnode_bitmap.is_some() {
394 return Err(anyhow!(
395 "should not update vnode bitmap during consuming log store"
396 )
397 .into());
398 }
399
400 if is_finished {
401 assert!(
402 pending_non_checkpoint_barrier.is_empty(),
403 "{pending_non_checkpoint_barrier:?}"
404 );
405 break;
406 }
407 }
408 trace!(
409 ?barrier_epoch,
410 table_id = %self.upstream_table.table_id(),
411 "finish consuming log store"
412 );
413
414 (
415 barrier_epoch,
416 false,
417 upstream_buffer.start_consuming_upstream(),
418 )
419 }
420 Either::Right(upstream) => {
421 trace!(
422 ?barrier_epoch,
423 table_id = %self.upstream_table.table_id(),
424 "skip consuming log store and start consuming upstream directly"
425 );
426
427 (barrier_epoch, true, upstream)
428 }
429 }
430 } else {
431 let (first_upstream_barrier, _) = upstream
432 .as_ref()
433 .expect("should have upstream when skipping snapshot backfill");
434 backfill_state
435 .latest_progress()
436 .for_each(|(vnode, progress)| {
437 let progress = progress.expect("should not be empty");
438 assert_eq!(
439 progress.epoch, first_upstream_barrier.epoch.prev,
440 "vnode: {:?}",
441 vnode
442 );
443 assert_eq!(
444 progress.progress,
445 EpochBackfillProgress::Consumed,
446 "vnode: {:?}",
447 vnode
448 );
449 });
450 trace!(
451 table_id = %self.upstream_table.table_id(),
452 "skip backfill"
453 );
454 let (first_upstream_barrier, upstream) =
455 upstream.expect("should have upstream when skipping snapshot backfill");
456 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
457 (first_upstream_barrier.epoch, true, upstream)
458 }
459 };
460 let mut upstream = upstream.into_executor(self.barrier_rx).execute();
461 let mut epoch_row_count = 0;
462 while let Some(msg) = upstream.try_next().await? {
464 match msg {
465 Message::Barrier(barrier) => {
466 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
467 self.upstream_table
468 .vnodes()
469 .iter_vnodes()
470 .for_each(|vnode| {
471 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
474 });
475 epoch_row_count = 0;
476 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
477 barrier_epoch = barrier.epoch;
478 if need_report_finish {
479 need_report_finish = false;
480 self.progress.finish_consuming_log_store(barrier_epoch);
481 }
482 let post_commit = backfill_state.commit(barrier.epoch).await?;
483 yield Message::Barrier(barrier);
484 if let Some(new_vnode_bitmap) =
485 post_commit.post_yield_barrier(update_vnode_bitmap).await?
486 {
487 let _prev_vnode_bitmap =
488 self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
489 backfill_state
490 .latest_progress()
491 .for_each(|(vnode, progress)| {
492 let progress = progress.expect("should not be empty");
493 assert_eq!(
494 progress.epoch, barrier_epoch.prev,
495 "vnode {:?} has unexpected progress epoch",
496 vnode
497 );
498 assert_eq!(
499 progress.progress,
500 EpochBackfillProgress::Consumed,
501 "vnode {:?} has unexpected progress",
502 vnode
503 );
504 });
505 }
506 }
507 msg => {
508 if let Message::Chunk(chunk) = &msg {
509 epoch_row_count += chunk.cardinality();
510 }
511 yield msg;
512 }
513 }
514 }
515 }
516}
517
518impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
519 fn execute(self: Box<Self>) -> BoxedMessageStream {
520 let output_indices = self.output_indices.clone();
521 self.execute_inner()
522 .filter_map(move |result| {
523 ready({
524 match result {
525 Ok(message) => mapping_message(message, &output_indices).map(Ok),
526 Err(e) => Some(Err(e)),
527 }
528 })
529 })
530 .boxed()
531 }
532}
533
534struct ConsumingSnapshot;
535struct ConsumingLogStore;
536
537#[derive(Debug)]
538struct PendingBarriers {
539 first_upstream_barrier_epoch: EpochPair,
540
541 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
544
545 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
549}
550
551impl PendingBarriers {
552 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
553 Self {
554 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
555 pending_non_checkpoint_barriers: Default::default(),
556 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
557 first_upstream_barrier,
558 ])]),
559 }
560 }
561
562 fn add(&mut self, barrier: DispatcherBarrier) {
563 let is_checkpoint = barrier.kind.is_checkpoint();
564 self.pending_non_checkpoint_barriers.push_front(barrier);
565 if is_checkpoint {
566 self.checkpoint_barrier_groups
567 .push_front(take(&mut self.pending_non_checkpoint_barriers));
568 }
569 }
570
571 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
572 self.checkpoint_barrier_groups.pop_back()
573 }
574
575 fn consume_epoch(&mut self, epoch: EpochPair) {
576 let barriers = self
577 .checkpoint_barrier_groups
578 .back_mut()
579 .expect("non-empty");
580 let oldest_upstream_barrier = barriers.back().expect("non-empty");
581 assert!(
582 oldest_upstream_barrier.epoch.prev >= epoch.prev,
583 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
584 oldest_upstream_barrier.epoch,
585 epoch
586 );
587 if oldest_upstream_barrier.epoch.prev == epoch.prev {
588 assert_eq!(oldest_upstream_barrier.epoch, epoch);
589 barriers.pop_back();
590 if barriers.is_empty() {
591 self.checkpoint_barrier_groups.pop_back();
592 }
593 }
594 }
595
596 fn latest_epoch(&self) -> Option<EpochPair> {
597 self.pending_non_checkpoint_barriers
598 .front()
599 .or_else(|| {
600 self.checkpoint_barrier_groups
601 .front()
602 .and_then(|barriers| barriers.front())
603 })
604 .map(|barrier| barrier.epoch)
605 }
606
607 fn checkpoint_epoch_count(&self) -> usize {
608 self.checkpoint_barrier_groups.len()
609 }
610
611 fn has_checkpoint_epoch(&self) -> bool {
612 !self.checkpoint_barrier_groups.is_empty()
613 }
614}
615
616enum SnapshotBackfillUpstream<S> {
617 Empty,
618 Buffer(UpstreamBuffer<S>),
619}
620
621impl<S> SnapshotBackfillUpstream<S> {
622 async fn run_future<T, E: Into<StreamExecutorError>>(
623 &mut self,
624 future: impl Future<Output = Result<T, E>>,
625 ) -> StreamExecutorResult<T> {
626 match self {
627 SnapshotBackfillUpstream::Empty => future.await.map_err(Into::into),
628 SnapshotBackfillUpstream::Buffer(buffer) => buffer.run_future(future).await,
629 }
630 }
631}
632
633impl SnapshotBackfillUpstream<ConsumingSnapshot> {
634 fn start_consuming_log_store(
635 self,
636 consumed_epoch: u64,
637 ) -> Either<SnapshotBackfillUpstream<ConsumingLogStore>, MergeExecutorInput> {
638 match self {
639 SnapshotBackfillUpstream::Empty => Either::Left(SnapshotBackfillUpstream::Empty),
640 SnapshotBackfillUpstream::Buffer(buffer) => {
641 match buffer.start_consuming_log_store(consumed_epoch) {
642 Either::Left(buffer) => Either::Left(SnapshotBackfillUpstream::Buffer(buffer)),
643 Either::Right(input) => Either::Right(input),
644 }
645 }
646 }
647 }
648}
649
650impl SnapshotBackfillUpstream<ConsumingLogStore> {
651 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
652 match self {
653 SnapshotBackfillUpstream::Empty => Ok(false),
654 SnapshotBackfillUpstream::Buffer(buffer) => buffer.consumed_epoch(epoch).await,
655 }
656 }
657
658 fn start_consuming_upstream(self) -> MergeExecutorInput {
659 match self {
660 SnapshotBackfillUpstream::Empty => {
661 unreachable!("unlike to start consuming upstream when having no upstream")
662 }
663 SnapshotBackfillUpstream::Buffer(buffer) => buffer.start_consuming_upstream(),
664 }
665 }
666}
667
668struct UpstreamBuffer<S> {
669 upstream: MergeExecutorInput,
670 max_pending_epoch_lag: u64,
671 consumed_epoch: u64,
672 upstream_pending_barriers: PendingBarriers,
674 is_polling_epoch_data: bool,
679 consume_upstream_row_count: LabelGuardedIntCounter,
680 _phase: S,
681}
682
683impl UpstreamBuffer<ConsumingSnapshot> {
684 fn new(
685 upstream: MergeExecutorInput,
686 first_upstream_barrier: DispatcherBarrier,
687 consume_upstream_row_count: LabelGuardedIntCounter,
688 ) -> Self {
689 Self {
690 upstream,
691 is_polling_epoch_data: false,
692 consume_upstream_row_count,
693 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
694 max_pending_epoch_lag: u64::MAX,
696 consumed_epoch: 0,
697 _phase: ConsumingSnapshot {},
698 }
699 }
700
701 fn start_consuming_log_store(
702 mut self,
703 consumed_epoch: u64,
704 ) -> Either<UpstreamBuffer<ConsumingLogStore>, MergeExecutorInput> {
705 if self
706 .upstream_pending_barriers
707 .first_upstream_barrier_epoch
708 .prev
709 == consumed_epoch
710 {
711 assert_eq!(
712 1,
713 self.upstream_pending_barriers
714 .pop()
715 .expect("non-empty")
716 .len()
717 );
718 }
719 let max_pending_epoch_lag = self.pending_epoch_lag();
720 let buffer = UpstreamBuffer {
721 upstream: self.upstream,
722 upstream_pending_barriers: self.upstream_pending_barriers,
723 max_pending_epoch_lag,
724 is_polling_epoch_data: self.is_polling_epoch_data,
725 consume_upstream_row_count: self.consume_upstream_row_count,
726 consumed_epoch,
727 _phase: ConsumingLogStore {},
728 };
729 if buffer.is_finished() {
730 Either::Right(buffer.upstream)
731 } else {
732 Either::Left(buffer)
733 }
734 }
735}
736
737impl<S> UpstreamBuffer<S> {
738 fn can_consume_upstream(&self) -> bool {
739 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
740 }
741
742 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
743 {
744 loop {
745 if let Err(e) = try {
746 if !self.can_consume_upstream() {
747 sleep(Duration::from_secs(30)).await;
749 warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
750 return pending().await;
751 }
752 self.consume_until_next_checkpoint_barrier().await?;
753 } {
754 break e;
755 }
756 }
757 }
758 }
759
760 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
762 loop {
763 let msg: DispatcherMessage = self
764 .upstream
765 .try_next()
766 .await?
767 .ok_or_else(|| anyhow!("end of upstream"))?;
768 match msg {
769 DispatcherMessage::Chunk(chunk) => {
770 self.is_polling_epoch_data = true;
771 self.consume_upstream_row_count
772 .inc_by(chunk.cardinality() as _);
773 }
774 DispatcherMessage::Barrier(barrier) => {
775 let is_checkpoint = barrier.kind.is_checkpoint();
776 self.upstream_pending_barriers.add(barrier);
777 if is_checkpoint {
778 self.is_polling_epoch_data = false;
779 break;
780 } else {
781 self.is_polling_epoch_data = true;
782 }
783 }
784 DispatcherMessage::Watermark(_) => {
785 self.is_polling_epoch_data = true;
786 }
787 }
788 }
789 Ok(())
790 }
791}
792
793impl UpstreamBuffer<ConsumingLogStore> {
794 #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
795 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
796 assert!(!self.is_finished());
797 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
798 assert!(self.is_polling_epoch_data);
801 self.consume_until_next_checkpoint_barrier().await?;
802 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
803 }
804 self.upstream_pending_barriers.consume_epoch(epoch);
805
806 {
807 {
808 let prev_epoch = epoch.prev;
809 assert!(self.consumed_epoch < prev_epoch);
810 let elapsed_epoch = prev_epoch - self.consumed_epoch;
811 self.consumed_epoch = prev_epoch;
812 if self.upstream_pending_barriers.has_checkpoint_epoch() {
813 while self.can_consume_upstream()
815 && let Some(result) =
816 self.consume_until_next_checkpoint_barrier().now_or_never()
817 {
818 result?;
819 }
820 }
821 self.max_pending_epoch_lag = min(
825 self.pending_epoch_lag(),
826 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
827 );
828 }
829 }
830 Ok(self.is_finished())
831 }
832
833 fn is_finished(&self) -> bool {
834 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
835 assert!(
836 self.upstream_pending_barriers
837 .pending_non_checkpoint_barriers
838 .is_empty()
839 )
840 }
841 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
842 }
843
844 fn start_consuming_upstream(self) -> MergeExecutorInput {
845 assert!(self.is_finished());
846 assert_eq!(self.pending_epoch_lag(), 0);
847 self.upstream
848 }
849}
850
851impl<S> UpstreamBuffer<S> {
852 async fn run_future<T, E: Into<StreamExecutorError>>(
855 &mut self,
856 future: impl Future<Output = Result<T, E>>,
857 ) -> StreamExecutorResult<T> {
858 select! {
859 biased;
860 e = self.concurrently_consume_upstream() => {
861 Err(e)
862 }
863 result = future => {
865 result.map_err(Into::into)
866 }
867 }
868 }
869
870 fn pending_epoch_lag(&self) -> u64 {
871 self.upstream_pending_barriers
872 .latest_epoch()
873 .map(|epoch| {
874 epoch
875 .prev
876 .checked_sub(self.consumed_epoch)
877 .expect("pending epoch must be later than consumed_epoch")
878 })
879 .unwrap_or(0)
880 }
881}
882
883#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
884async fn make_log_stream(
885 upstream_table: &BatchTable<impl StateStore>,
886 start_epoch: u64,
887 end_epoch: u64,
888 start_pk: Option<OwnedRow>,
889 chunk_size: usize,
890) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
891 let data_types = upstream_table.schema().data_types();
892 let start_pk = start_pk.as_ref();
893 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
895 upstream_table
896 .batch_iter_vnode_log(
897 start_epoch,
898 HummockReadEpoch::Committed(end_epoch),
899 start_pk,
900 vnode,
901 )
902 .map_ok(move |stream| {
903 let stream = stream.map_err(Into::into);
904 (vnode, stream, 0)
905 })
906 }))
907 .await?;
908 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
909 Ok(VnodeStream::new(
910 vnode_streams,
911 upstream_table.pk_in_output_indices().expect("should exist"),
912 builder,
913 ))
914}
915
916async fn make_snapshot_stream(
917 upstream_table: &BatchTable<impl StateStore>,
918 snapshot_epoch: u64,
919 backfill_state: &BackfillState<impl StateStore>,
920 rate_limit: RateLimit,
921 chunk_size: usize,
922 snapshot_rebuild_interval: Duration,
923) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
924 let data_types = upstream_table.schema().data_types();
925 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
926 move |(vnode, progress)| {
927 let start_pk = match progress {
928 None => Some((None, 0)),
929 Some(VnodeBackfillProgress {
930 row_count,
931 progress: EpochBackfillProgress::Consuming { latest_pk },
932 ..
933 }) => Some((Some(latest_pk), *row_count)),
934 Some(VnodeBackfillProgress {
935 progress: EpochBackfillProgress::Consumed,
936 ..
937 }) => None,
938 };
939 start_pk.map(|(start_pk, row_count)| {
940 upstream_table
941 .batch_iter_vnode(
942 HummockReadEpoch::Committed(snapshot_epoch),
943 start_pk,
944 vnode,
945 PrefetchOptions::prefetch_for_large_range_scan(),
946 snapshot_rebuild_interval,
947 )
948 .map_ok(move |stream| {
949 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
950 (vnode, stream, row_count)
951 })
952 })
953 },
954 ))
955 .await?;
956 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
957 Ok(VnodeStream::new(
958 vnode_streams,
959 upstream_table.pk_in_output_indices().expect("should exist"),
960 builder,
961 ))
962}
963
964#[expect(clippy::too_many_arguments)]
965#[try_stream(ok = Message, error = StreamExecutorError)]
966async fn make_consume_snapshot_stream<'a, S: StateStore>(
967 upstream_table: &'a BatchTable<S>,
968 snapshot_epoch: u64,
969 chunk_size: usize,
970 rate_limit: &'a mut RateLimit,
971 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
972 progress: &'a mut CreateMviewProgressReporter,
973 backfill_state: &'a mut BackfillState<S>,
974 first_recv_barrier_epoch: EpochPair,
975 initial_backfill_paused: bool,
976 actor_ctx: &'a ActorContextRef,
977) {
978 let mut barrier_epoch = first_recv_barrier_epoch;
979
980 let mut snapshot_stream = make_snapshot_stream(
982 upstream_table,
983 snapshot_epoch,
984 &*backfill_state,
985 *rate_limit,
986 chunk_size,
987 actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
988 )
989 .await?;
990
991 async fn select_barrier_and_snapshot_stream(
992 barrier_rx: &mut UnboundedReceiver<Barrier>,
993 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
994 throttle_snapshot_stream: bool,
995 backfill_paused: bool,
996 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
997 select!(
998 result = receive_next_barrier(barrier_rx) => {
999 Ok(Either::Left(result?))
1000 },
1001 result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
1002 Ok(Either::Right(result?))
1003 }
1004 )
1005 }
1006
1007 let mut count = 0;
1008 let mut epoch_row_count = 0;
1009 let mut backfill_paused = initial_backfill_paused;
1010 loop {
1011 let throttle_snapshot_stream = epoch_row_count as u64 >= rate_limit.to_u64();
1012 match select_barrier_and_snapshot_stream(
1013 barrier_rx,
1014 &mut snapshot_stream,
1015 throttle_snapshot_stream,
1016 backfill_paused,
1017 )
1018 .await?
1019 {
1020 Either::Left(barrier) => {
1021 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1022 barrier_epoch = barrier.epoch;
1023
1024 if barrier_epoch.curr >= snapshot_epoch {
1025 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
1026 }
1027 if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
1028 backfill_paused = false;
1029 }
1030 if let Some(chunk) = snapshot_stream.consume_builder() {
1031 count += chunk.cardinality();
1032 epoch_row_count += chunk.cardinality();
1033 yield Message::Chunk(chunk);
1034 }
1035 snapshot_stream
1036 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1037 if let Some(pk) = pk_progress {
1038 backfill_state.update_epoch_progress(
1039 vnode,
1040 snapshot_epoch,
1041 row_count,
1042 pk,
1043 );
1044 } else {
1045 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1046 }
1047 })
1048 .await?;
1049 let post_commit = backfill_state.commit(barrier.epoch).await?;
1050 trace!(?barrier_epoch, count, epoch_row_count, "update progress");
1051 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
1052 epoch_row_count = 0;
1053
1054 let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
1055 if let Mutation::Throttle(config) = &**m
1056 && let Some(config) = config.get(&actor_ctx.fragment_id)
1057 && config.throttle_type() == PbThrottleType::Backfill
1058 {
1059 Some(config.rate_limit)
1060 } else {
1061 None
1062 }
1063 });
1064 yield Message::Barrier(barrier);
1065 post_commit.post_yield_barrier(None).await?;
1066
1067 if let Some(new_rate_limit) = new_rate_limit {
1068 let new_rate_limit = new_rate_limit.into();
1069 *rate_limit = new_rate_limit;
1070 snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
1071 }
1072 }
1073 Either::Right(Some(chunk)) => {
1074 if backfill_paused {
1075 return Err(
1076 anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
1077 );
1078 }
1079 count += chunk.cardinality();
1080 epoch_row_count += chunk.cardinality();
1081 yield Message::Chunk(chunk);
1082 }
1083 Either::Right(None) => {
1084 break;
1085 }
1086 }
1087 }
1088
1089 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
1091 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
1092 barrier_epoch = barrier_to_report_finish.epoch;
1093 trace!(?barrier_epoch, count, "report finish");
1094 snapshot_stream
1095 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1096 assert_eq!(pk_progress, None);
1097 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1098 })
1099 .await?;
1100 let post_commit = backfill_state.commit(barrier_epoch).await?;
1101 progress.finish(barrier_epoch, count as _);
1102 yield Message::Barrier(barrier_to_report_finish);
1103 post_commit.post_yield_barrier(None).await?;
1104
1105 loop {
1107 let barrier = receive_next_barrier(barrier_rx).await?;
1108 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1109 barrier_epoch = barrier.epoch;
1110 let post_commit = backfill_state.commit(barrier.epoch).await?;
1111 yield Message::Barrier(barrier);
1112 post_commit.post_yield_barrier(None).await?;
1113 if barrier_epoch.curr == snapshot_epoch {
1114 break;
1115 }
1116 }
1117 trace!(?barrier_epoch, "finish consuming snapshot");
1118}
1119
1120#[cfg(test)]
1121mod tests {
1122 use std::collections::HashSet;
1123 use std::sync::Arc;
1124
1125 use risingwave_common::array::StreamChunk;
1126 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1127 use risingwave_common::row::OwnedRow;
1128 use risingwave_common::test_prelude::StreamChunkTestExt;
1129 use risingwave_common::types::DataType;
1130 use risingwave_common::util::epoch::{EpochPair, test_epoch};
1131 use risingwave_common::util::sort_util::OrderType;
1132 use risingwave_hummock_test::test_utils::{HummockTestEnv, prepare_hummock_test_env};
1133 use risingwave_rpc_client::HummockMetaClient;
1134 use risingwave_storage::hummock::HummockStorage;
1135 use risingwave_storage::table::batch_table::BatchTable;
1136 use tokio::sync::mpsc::unbounded_channel;
1137 use tokio::time::{Duration, timeout};
1138
1139 use super::*;
1140 use crate::common::table::state_table::{
1141 StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
1142 };
1143 use crate::common::table::test_utils::gen_pbtable_with_value_indices;
1144 use crate::executor::exchange::input::{Input, LocalInput};
1145 use crate::executor::exchange::permit::channel_for_test;
1146 use crate::executor::{ActorContext, DispatcherMessage, ExecutorInfo, MergeExecutorUpstream};
1147 use crate::task::LocalBarrierManager;
1148
1149 const SOURCE_TABLE_ID: TableId = TableId::new(0x233);
1150 const PROGRESS_TABLE_ID: TableId = TableId::new(0x234);
1151
1152 fn source_table_pb() -> risingwave_pb::catalog::PbTable {
1153 gen_pbtable_with_value_indices(
1154 SOURCE_TABLE_ID,
1155 vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1156 vec![OrderType::ascending()],
1157 vec![0],
1158 0,
1159 vec![0],
1160 )
1161 }
1162
1163 fn progress_table_pb() -> risingwave_pb::catalog::PbTable {
1164 gen_pbtable_with_value_indices(
1165 PROGRESS_TABLE_ID,
1166 vec![
1167 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int16),
1168 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1169 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1170 ColumnDesc::unnamed(ColumnId::new(3), DataType::Boolean),
1171 ColumnDesc::unnamed(ColumnId::new(4), DataType::Int64),
1172 ],
1173 vec![OrderType::ascending()],
1174 vec![0],
1175 1,
1176 vec![1, 2, 3, 4],
1177 )
1178 }
1179
1180 fn source_batch_table(store: HummockStorage) -> BatchTable<HummockStorage> {
1181 BatchTable::for_test(
1182 store,
1183 SOURCE_TABLE_ID,
1184 vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1185 vec![OrderType::ascending()],
1186 vec![0],
1187 vec![0],
1188 )
1189 }
1190
1191 async fn source_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1192 StateTableBuilder::new(&source_table_pb(), store, None)
1193 .with_op_consistency_level(StateTableOpConsistencyLevel::LogStoreEnabled)
1194 .forbid_preload_all_rows()
1195 .build()
1196 .await
1197 }
1198
1199 async fn progress_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1200 StateTable::from_table_catalog(&progress_table_pb(), store, None).await
1201 }
1202
1203 async fn commit_insert_epoch(
1204 test_env: &HummockTestEnv,
1205 source_state_table: &mut StateTable<HummockStorage>,
1206 epoch: &mut EpochPair,
1207 table_ids: HashSet<TableId>,
1208 values: &[i64],
1209 ) {
1210 for value in values {
1211 source_state_table.insert(OwnedRow::new(vec![Some((*value).into())]));
1212 }
1213 epoch.inc_for_test();
1214 test_env.storage.start_epoch(epoch.curr, table_ids);
1215 source_state_table.commit_for_test(*epoch).await.unwrap();
1216 let res = test_env
1217 .storage
1218 .seal_and_sync_epoch(epoch.prev, HashSet::from_iter([SOURCE_TABLE_ID]))
1219 .await
1220 .unwrap();
1221 test_env
1222 .meta_client
1223 .commit_epoch_with_change_log(epoch.prev, res, Some(vec![epoch.prev]))
1224 .await
1225 .unwrap();
1226 test_env
1227 .storage
1228 .wait_version(test_env.manager.get_current_version().await)
1229 .await;
1230 }
1231
1232 fn start_progress_epochs(test_env: &HummockTestEnv, max_epoch: u64) {
1233 for epoch in 1..=max_epoch {
1234 test_env
1235 .storage
1236 .start_epoch(test_epoch(epoch), HashSet::from_iter([PROGRESS_TABLE_ID]));
1237 }
1238 }
1239
1240 fn make_upstream_input(
1241 barrier_manager: LocalBarrierManager,
1242 actor_ctx: ActorContextRef,
1243 rx: crate::executor::exchange::permit::Receiver,
1244 ) -> MergeExecutorInput {
1245 MergeExecutorInput::new(
1246 MergeExecutorUpstream::Singleton(LocalInput::new(rx, 1001.into()).boxed_input()),
1247 actor_ctx,
1248 1919.into(),
1249 barrier_manager,
1250 Arc::new(StreamingMetrics::unused()),
1251 ExecutorInfo::for_test(
1252 Schema::new(vec![Field::unnamed(DataType::Int64)]),
1253 vec![0],
1254 "SnapshotBackfillUpstream".to_owned(),
1255 0,
1256 ),
1257 )
1258 }
1259
1260 async fn expect_barrier_with_timeout(
1261 executor: &mut BoxedMessageStream,
1262 reason: &str,
1263 ) -> Barrier {
1264 let message = timeout(Duration::from_secs(10), executor.next())
1265 .await
1266 .unwrap_or_else(|_| panic!("timed out waiting for barrier: {reason}"))
1267 .unwrap()
1268 .unwrap();
1269 match message {
1270 Message::Barrier(barrier) => barrier,
1271 other => panic!("expected barrier for {reason}, got {other:?}"),
1272 }
1273 }
1274
1275 async fn expect_chunk_with_timeout(
1276 executor: &mut BoxedMessageStream,
1277 reason: &str,
1278 ) -> StreamChunk {
1279 let message = timeout(Duration::from_secs(10), executor.next())
1280 .await
1281 .unwrap_or_else(|_| panic!("timed out waiting for chunk: {reason}"))
1282 .unwrap()
1283 .unwrap();
1284 match message {
1285 Message::Chunk(chunk) => chunk,
1286 other => panic!("expected chunk for {reason}, got {other:?}"),
1287 }
1288 }
1289
1290 async fn expect_pending_with_timeout(executor: &mut BoxedMessageStream, reason: &str) {
1291 assert!(
1292 timeout(Duration::from_millis(200), executor.next())
1293 .await
1294 .is_err(),
1295 "executor unexpectedly produced a message while waiting for {reason}"
1296 );
1297 }
1298
1299 #[tokio::test]
1300 async fn test_snapshot_backfill_without_upstream_on_hummock() {
1301 let source_env = prepare_hummock_test_env().await;
1302 source_env.register_table(source_table_pb()).await;
1303 let progress_env = prepare_hummock_test_env().await;
1304 progress_env.register_table(progress_table_pb()).await;
1305
1306 let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1307 let source_table = source_batch_table(source_env.storage.clone());
1308 let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1309
1310 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1311 source_env
1312 .storage
1313 .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1314 source_state_table.init_epoch(epoch).await.unwrap();
1315
1316 commit_insert_epoch(
1317 &source_env,
1318 &mut source_state_table,
1319 &mut epoch,
1320 HashSet::from_iter([SOURCE_TABLE_ID]),
1321 &[1],
1322 )
1323 .await;
1324 commit_insert_epoch(
1325 &source_env,
1326 &mut source_state_table,
1327 &mut epoch,
1328 HashSet::from_iter([SOURCE_TABLE_ID]),
1329 &[2],
1330 )
1331 .await;
1332 commit_insert_epoch(
1333 &source_env,
1334 &mut source_state_table,
1335 &mut epoch,
1336 HashSet::from_iter([SOURCE_TABLE_ID]),
1337 &[3],
1338 )
1339 .await;
1340 commit_insert_epoch(
1341 &source_env,
1342 &mut source_state_table,
1343 &mut epoch,
1344 HashSet::from_iter([SOURCE_TABLE_ID]),
1345 &[],
1346 )
1347 .await;
1348 start_progress_epochs(&progress_env, 5);
1349
1350 let barrier_manager = LocalBarrierManager::for_test();
1351 let progress = CreateMviewProgressReporter::for_test(barrier_manager);
1352 let actor_ctx = ActorContext::for_test(1234);
1353 let (barrier_tx, barrier_rx) = unbounded_channel();
1354 barrier_tx
1355 .send(Barrier::new_test_barrier(test_epoch(1)))
1356 .unwrap();
1357
1358 let mut executor = SnapshotBackfillExecutor::new(
1359 source_table,
1360 progress_state_table,
1361 None,
1362 vec![0],
1363 actor_ctx,
1364 progress,
1365 1024,
1366 RateLimit::Disabled,
1367 barrier_rx,
1368 Arc::new(StreamingMetrics::unused()),
1369 Some(test_epoch(3)),
1370 )
1371 .boxed()
1372 .execute();
1373
1374 assert_eq!(
1375 expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1376 .await
1377 .epoch,
1378 Barrier::new_test_barrier(test_epoch(1)).epoch
1379 );
1380 assert_eq!(
1381 expect_chunk_with_timeout(&mut executor, "snapshot chunk without upstream").await,
1382 StreamChunk::from_pretty(
1383 " I
1384 + 1
1385 + 2
1386 + 3"
1387 )
1388 );
1389 expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1390
1391 barrier_tx
1392 .send(Barrier::new_test_barrier(test_epoch(2)))
1393 .unwrap();
1394 assert_eq!(
1395 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1396 .await
1397 .epoch,
1398 Barrier::new_test_barrier(test_epoch(2)).epoch
1399 );
1400
1401 barrier_tx
1402 .send(Barrier::new_test_barrier(test_epoch(3)))
1403 .unwrap();
1404 assert_eq!(
1405 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1406 .await
1407 .epoch,
1408 Barrier::new_test_barrier(test_epoch(3)).epoch
1409 );
1410
1411 barrier_tx
1412 .send(Barrier::new_test_barrier(test_epoch(4)))
1413 .unwrap();
1414 assert_eq!(
1415 expect_barrier_with_timeout(&mut executor, "post-snapshot barrier 4")
1416 .await
1417 .epoch,
1418 Barrier::new_test_barrier(test_epoch(4)).epoch
1419 );
1420
1421 barrier_tx
1422 .send(Barrier::new_test_barrier(test_epoch(5)))
1423 .unwrap();
1424 assert_eq!(
1425 expect_barrier_with_timeout(&mut executor, "steady-state barrier 5")
1426 .await
1427 .epoch,
1428 Barrier::new_test_barrier(test_epoch(5)).epoch
1429 );
1430
1431 expect_pending_with_timeout(&mut executor, "next local barrier").await;
1432 }
1433
1434 #[tokio::test]
1435 async fn test_snapshot_backfill_with_upstream_on_hummock() {
1436 let source_env = prepare_hummock_test_env().await;
1437 source_env.register_table(source_table_pb()).await;
1438 let progress_env = prepare_hummock_test_env().await;
1439 progress_env.register_table(progress_table_pb()).await;
1440
1441 let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1442 let source_table = source_batch_table(source_env.storage.clone());
1443 let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1444
1445 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1446 source_env
1447 .storage
1448 .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1449 source_state_table.init_epoch(epoch).await.unwrap();
1450
1451 commit_insert_epoch(
1452 &source_env,
1453 &mut source_state_table,
1454 &mut epoch,
1455 HashSet::from_iter([SOURCE_TABLE_ID]),
1456 &[],
1457 )
1458 .await;
1459 commit_insert_epoch(
1460 &source_env,
1461 &mut source_state_table,
1462 &mut epoch,
1463 HashSet::from_iter([SOURCE_TABLE_ID]),
1464 &[],
1465 )
1466 .await;
1467 commit_insert_epoch(
1468 &source_env,
1469 &mut source_state_table,
1470 &mut epoch,
1471 HashSet::from_iter([SOURCE_TABLE_ID]),
1472 &[],
1473 )
1474 .await;
1475 commit_insert_epoch(
1476 &source_env,
1477 &mut source_state_table,
1478 &mut epoch,
1479 HashSet::from_iter([SOURCE_TABLE_ID]),
1480 &[4],
1481 )
1482 .await;
1483 start_progress_epochs(&progress_env, 6);
1484
1485 let barrier_manager = LocalBarrierManager::for_test();
1486 let progress = CreateMviewProgressReporter::for_test(barrier_manager.clone());
1487 let actor_ctx = ActorContext::for_test(1235);
1488 let (barrier_tx, barrier_rx) = unbounded_channel();
1489 let (upstream_tx, upstream_rx) = channel_for_test();
1490
1491 upstream_tx
1492 .send(
1493 DispatcherMessage::Barrier(
1494 Barrier::new_test_barrier(test_epoch(5)).into_dispatcher(),
1495 )
1496 .into(),
1497 )
1498 .await
1499 .unwrap();
1500 barrier_tx
1501 .send(Barrier::new_test_barrier(test_epoch(1)))
1502 .unwrap();
1503
1504 let mut executor = SnapshotBackfillExecutor::new(
1505 source_table,
1506 progress_state_table,
1507 Some(make_upstream_input(
1508 barrier_manager,
1509 actor_ctx.clone(),
1510 upstream_rx,
1511 )),
1512 vec![0],
1513 actor_ctx,
1514 progress,
1515 1024,
1516 RateLimit::Disabled,
1517 barrier_rx,
1518 Arc::new(StreamingMetrics::unused()),
1519 Some(test_epoch(3)),
1520 )
1521 .boxed()
1522 .execute();
1523
1524 assert_eq!(
1525 expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1526 .await
1527 .epoch,
1528 Barrier::new_test_barrier(test_epoch(1)).epoch
1529 );
1530 expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1531 barrier_tx
1532 .send(Barrier::new_test_barrier(test_epoch(2)))
1533 .unwrap();
1534 assert_eq!(
1535 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1536 .await
1537 .epoch,
1538 Barrier::new_test_barrier(test_epoch(2)).epoch
1539 );
1540
1541 barrier_tx
1542 .send(Barrier::new_test_barrier(test_epoch(3)))
1543 .unwrap();
1544 assert_eq!(
1545 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1546 .await
1547 .epoch,
1548 Barrier::new_test_barrier(test_epoch(3)).epoch
1549 );
1550
1551 barrier_tx
1552 .send(Barrier::new_test_barrier(test_epoch(4)))
1553 .unwrap();
1554 assert_eq!(
1555 expect_barrier_with_timeout(&mut executor, "snapshot completion barrier 4")
1556 .await
1557 .epoch,
1558 Barrier::new_test_barrier(test_epoch(4)).epoch
1559 );
1560
1561 barrier_tx
1562 .send(Barrier::new_test_barrier(test_epoch(5)))
1563 .unwrap();
1564 assert_eq!(
1565 expect_chunk_with_timeout(&mut executor, "log-store replay chunk").await,
1566 StreamChunk::from_pretty(
1567 " I
1568 + 4"
1569 )
1570 );
1571 assert_eq!(
1572 expect_barrier_with_timeout(&mut executor, "log-store completion barrier")
1573 .await
1574 .epoch,
1575 Barrier::new_test_barrier(test_epoch(5)).epoch
1576 );
1577
1578 upstream_tx
1579 .send(DispatcherMessage::Chunk(StreamChunk::from_pretty(" I\n + 5")).into())
1580 .await
1581 .unwrap();
1582 let stop_barrier = Barrier::new_test_barrier(test_epoch(6)).with_stop();
1583 upstream_tx
1584 .send(DispatcherMessage::Barrier(stop_barrier.clone().into_dispatcher()).into())
1585 .await
1586 .unwrap();
1587 barrier_tx.send(stop_barrier.clone()).unwrap();
1588
1589 assert_eq!(
1590 expect_chunk_with_timeout(&mut executor, "live upstream chunk after handoff").await,
1591 StreamChunk::from_pretty(" I\n + 5")
1592 );
1593 assert_eq!(
1594 expect_barrier_with_timeout(&mut executor, "final stop barrier")
1595 .await
1596 .epoch,
1597 stop_barrier.epoch
1598 );
1599 }
1600}