1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::batch_plan::ScanRange;
33use risingwave_pb::common::PbThrottleType;
34use risingwave_storage::StateStore;
35use risingwave_storage::store::PrefetchOptions;
36use risingwave_storage::table::ChangeLogRow;
37use risingwave_storage::table::batch_table::{BatchTable, PkScanRange};
38use tokio::select;
39use tokio::sync::mpsc::UnboundedReceiver;
40use tokio::time::sleep;
41
42use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
43use crate::executor::backfill::snapshot_backfill::state::{
44 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
45};
46use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
47use crate::executor::backfill::utils::{
48 UpstreamStreamKeyUpdateNormalizer, create_builder, mapping_message,
49};
50use crate::executor::monitor::StreamingMetrics;
51use crate::executor::prelude::{StateTable, StreamExt, try_stream};
52use crate::executor::{
53 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
54 MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
55 expect_first_barrier,
56};
57use crate::task::CreateMviewProgressReporter;
58
59pub struct SnapshotBackfillExecutor<S: StateStore> {
60 upstream_table: BatchTable<S>,
62
63 progress_state_table: StateTable<S>,
65
66 upstream: Option<MergeExecutorInput>,
68
69 output_indices: Vec<usize>,
71
72 stream_key: Vec<usize>,
74
75 progress: CreateMviewProgressReporter,
76
77 chunk_size: usize,
78 rate_limiter: MonitoredRateLimiter,
79
80 barrier_rx: UnboundedReceiver<Barrier>,
81
82 actor_ctx: ActorContextRef,
83 metrics: Arc<StreamingMetrics>,
84
85 snapshot_epoch: Option<u64>,
86 pk_scan_range: PkScanRange,
88}
89
90impl<S: StateStore> SnapshotBackfillExecutor<S> {
91 fn build_pk_scan_range(
92 pb_scan_range: Option<&ScanRange>,
93 upstream_table: &BatchTable<S>,
94 ) -> StreamExecutorResult<PkScanRange> {
95 match pb_scan_range {
96 Some(scan_range) => Ok(PkScanRange::new(
97 scan_range.clone(),
98 upstream_table.pk_serializer().get_data_types().to_vec(),
99 )?),
100 None => Ok(PkScanRange::full()),
101 }
102 }
103
104 #[expect(clippy::too_many_arguments)]
105 pub(crate) fn new(
106 upstream_table: BatchTable<S>,
107 progress_state_table: StateTable<S>,
108 upstream: Option<MergeExecutorInput>,
109 pb_pk_scan_range: Option<&ScanRange>,
110 output_indices: Vec<usize>,
111 stream_key: Vec<usize>,
112 actor_ctx: ActorContextRef,
113 progress: CreateMviewProgressReporter,
114 chunk_size: usize,
115 rate_limit: RateLimit,
116 barrier_rx: UnboundedReceiver<Barrier>,
117 metrics: Arc<StreamingMetrics>,
118 snapshot_epoch: Option<u64>,
119 ) -> StreamExecutorResult<Self> {
120 if let Some(upstream) = &upstream {
121 assert_eq!(&upstream.info.schema, upstream_table.schema());
122 }
123 if upstream_table.pk_in_output_indices().is_none() {
124 panic!(
125 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
126 upstream_table.pk_indices(),
127 upstream_table.output_indices(),
128 upstream_table.schema()
129 )
130 };
131 assert!(
132 stream_key.iter().all(|idx| *idx < output_indices.len()),
133 "stream key indices should refer to output schema: stream_key: {:?}, output_indices: {:?}",
134 stream_key,
135 output_indices
136 );
137 let pk_scan_range = Self::build_pk_scan_range(pb_pk_scan_range, &upstream_table)?;
138 if !matches!(rate_limit, RateLimit::Disabled) {
139 trace!(
140 ?rate_limit,
141 "create snapshot backfill executor with rate limit"
142 );
143 }
144 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
145 Ok(Self {
146 upstream_table,
147 progress_state_table,
148 upstream,
149 output_indices,
150 stream_key,
151 progress,
152 chunk_size,
153 rate_limiter,
154 barrier_rx,
155 actor_ctx,
156 metrics,
157 snapshot_epoch,
158 pk_scan_range,
159 })
160 }
161
162 #[try_stream(ok = Message, error = StreamExecutorError)]
163 async fn execute_inner(mut self) {
164 trace!("snapshot backfill executor start");
165 let upstream = if let Some(mut upstream) = self.upstream {
166 let first_upstream_barrier = expect_first_barrier(&mut upstream).await?;
167 trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
168 Some((first_upstream_barrier, upstream))
169 } else {
170 None
171 };
172 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
173 trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
174 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
175 self.snapshot_epoch
176 {
177 if let Some((first_upstream_barrier, _)) = &upstream {
178 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
179 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
180 Some(snapshot_epoch)
181 } else {
182 None
183 }
184 } else {
185 Some(snapshot_epoch)
187 }
188 } else {
189 if cfg!(debug_assertions) {
191 panic!(
192 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
193 upstream.map(|(first_upstream_barrier, _)| first_upstream_barrier.epoch),
194 first_recv_barrier.epoch
195 );
196 } else {
197 let (first_upstream_barrier, _) = upstream
198 .as_ref()
199 .ok_or_else(|| anyhow!("no upstream while snapshot epoch not set"))?;
200 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
201 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
202 None
203 }
204 };
205 let first_recv_barrier_epoch = first_recv_barrier.epoch;
206 let initial_backfill_paused =
207 first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
208 yield Message::Barrier(first_recv_barrier);
209 let mut backfill_state = BackfillState::new(
210 self.progress_state_table,
211 first_recv_barrier_epoch,
212 self.upstream_table.pk_serializer().clone(),
213 )
214 .await?;
215
216 let (mut barrier_epoch, mut need_report_finish, upstream) = {
217 if let Some(snapshot_epoch) = should_snapshot_backfill {
218 let table_id_str = format!("{}", self.upstream_table.table_id());
219 let actor_id_str = format!("{}", self.actor_ctx.id);
220
221 let consume_upstream_row_count = self
222 .metrics
223 .snapshot_backfill_consume_row_count
224 .with_guarded_label_values(&[
225 table_id_str.as_str(),
226 actor_id_str.as_str(),
227 "consume_upstream",
228 ]);
229
230 let mut upstream_buffer = if let Some((first_upstream_barrier, upstream)) = upstream
231 {
232 SnapshotBackfillUpstream::Buffer(UpstreamBuffer::new(
233 upstream,
234 first_upstream_barrier,
235 consume_upstream_row_count,
236 ))
237 } else {
238 SnapshotBackfillUpstream::Empty
239 };
240
241 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
243 < snapshot_epoch
244 {
245 trace!(
246 table_id = %self.upstream_table.table_id(),
247 snapshot_epoch,
248 barrier_epoch = ?first_recv_barrier_epoch,
249 "start consuming snapshot"
250 );
251 {
252 let consuming_snapshot_row_count = self
253 .metrics
254 .snapshot_backfill_consume_row_count
255 .with_guarded_label_values(&[
256 table_id_str.as_str(),
257 actor_id_str.as_str(),
258 "consuming_snapshot",
259 ]);
260 let snapshot_stream = make_consume_snapshot_stream(
261 &self.upstream_table,
262 snapshot_epoch,
263 self.chunk_size,
264 &self.rate_limiter,
265 &mut self.barrier_rx,
266 &mut self.progress,
267 &mut backfill_state,
268 first_recv_barrier_epoch,
269 initial_backfill_paused,
270 &self.actor_ctx,
271 &self.pk_scan_range,
272 );
273
274 pin_mut!(snapshot_stream);
275
276 while let Some(message) = upstream_buffer
277 .run_future(snapshot_stream.try_next())
278 .await?
279 {
280 if let Message::Chunk(chunk) = &message {
281 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
282 }
283 yield message;
284 }
285 }
286
287 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
288 let recv_barrier_epoch = recv_barrier.epoch;
289 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
290 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
291 yield Message::Barrier(recv_barrier);
292 post_commit.post_yield_barrier(None).await?;
293 (
294 recv_barrier_epoch,
295 upstream_buffer.start_consuming_log_store(snapshot_epoch),
296 )
297 } else {
298 trace!(
299 table_id = %self.upstream_table.table_id(),
300 snapshot_epoch,
301 barrier_epoch = ?first_recv_barrier_epoch,
302 "skip consuming snapshot"
303 );
304 (
305 first_recv_barrier_epoch,
306 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
307 )
308 };
309
310 match upstream_buffer {
312 Either::Left(mut upstream_buffer) => {
313 let initial_pending_lag =
314 if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
315 &upstream_buffer
316 {
317 Some(Duration::from_millis(
318 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
319 ))
320 } else {
321 None
322 };
323 trace!(
324 ?barrier_epoch,
325 table_id = %self.upstream_table.table_id(),
326 ?initial_pending_lag,
327 "start consuming log store"
328 );
329
330 let consuming_log_store_row_count = self
331 .metrics
332 .snapshot_backfill_consume_row_count
333 .with_guarded_label_values(&[
334 table_id_str.as_str(),
335 actor_id_str.as_str(),
336 "consuming_log_store",
337 ]);
338 let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
339 loop {
340 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
341 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
342 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
343 barrier_epoch = barrier.epoch;
358 if barrier.kind.is_checkpoint() {
359 let pending_non_checkpoint_barrier =
360 take(&mut pending_non_checkpoint_barrier);
361 let end_epoch = barrier_epoch.prev;
362 let start_epoch = pending_non_checkpoint_barrier
363 .first()
364 .map(|epoch| epoch.prev)
365 .unwrap_or(end_epoch);
366 trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
367 let mut stream = upstream_buffer
371 .run_future(make_log_stream(
372 &self.upstream_table,
373 start_epoch,
374 end_epoch,
375 None,
376 self.chunk_size,
377 ))
378 .await?;
379 while let Some(chunk) =
380 upstream_buffer.run_future(stream.try_next()).await?
381 {
382 trace!(
383 ?barrier_epoch,
384 size = chunk.cardinality(),
385 "consume change log yield chunk",
386 );
387 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
388 yield Message::Chunk(chunk);
389 }
390
391 trace!(?barrier_epoch, "after consume change log");
392
393 stream
394 .for_vnode_pk_progress(|vnode, row_count, progress| {
395 assert_eq!(progress, None);
396 backfill_state.finish_epoch(
397 vnode,
398 barrier.epoch.prev,
399 row_count,
400 );
401 })
402 .await?;
403 } else {
404 pending_non_checkpoint_barrier.push(barrier.epoch);
405 }
406
407 if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
408 &upstream_buffer
409 {
410 if is_finished {
411 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
412 assert!(pending_non_checkpoint_barrier.is_empty());
413 self.progress.finish_consuming_log_store(barrier.epoch);
414 } else {
415 self.progress.update_create_mview_log_store_progress(
416 barrier.epoch,
417 upstream_buffer.pending_epoch_lag(),
418 );
419 }
420 }
421
422 let post_commit = backfill_state.commit(barrier.epoch).await?;
423 let update_vnode_bitmap =
424 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
425 yield Message::Barrier(barrier);
426 post_commit.post_yield_barrier(None).await?;
427 if update_vnode_bitmap.is_some() {
428 return Err(anyhow!(
429 "should not update vnode bitmap during consuming log store"
430 )
431 .into());
432 }
433
434 if is_finished {
435 assert!(
436 pending_non_checkpoint_barrier.is_empty(),
437 "{pending_non_checkpoint_barrier:?}"
438 );
439 break;
440 }
441 }
442 trace!(
443 ?barrier_epoch,
444 table_id = %self.upstream_table.table_id(),
445 "finish consuming log store"
446 );
447
448 (
449 barrier_epoch,
450 false,
451 upstream_buffer.start_consuming_upstream(),
452 )
453 }
454 Either::Right(upstream) => {
455 trace!(
456 ?barrier_epoch,
457 table_id = %self.upstream_table.table_id(),
458 "skip consuming log store and start consuming upstream directly"
459 );
460
461 (barrier_epoch, true, upstream)
462 }
463 }
464 } else {
465 let (first_upstream_barrier, _) = upstream
466 .as_ref()
467 .expect("should have upstream when skipping snapshot backfill");
468 backfill_state
469 .latest_progress()
470 .for_each(|(vnode, progress)| {
471 let progress = progress.expect("should not be empty");
472 assert_eq!(
473 progress.epoch, first_upstream_barrier.epoch.prev,
474 "vnode: {:?}",
475 vnode
476 );
477 assert_eq!(
478 progress.progress,
479 EpochBackfillProgress::Consumed,
480 "vnode: {:?}",
481 vnode
482 );
483 });
484 trace!(
485 table_id = %self.upstream_table.table_id(),
486 "skip backfill"
487 );
488 let (first_upstream_barrier, upstream) =
489 upstream.expect("should have upstream when skipping snapshot backfill");
490 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
491 (first_upstream_barrier.epoch, true, upstream)
492 }
493 };
494 let current_stream_key_indices = self
495 .stream_key
496 .iter()
497 .map(|idx| self.output_indices[*idx])
498 .collect();
499 let update_normalizer = UpstreamStreamKeyUpdateNormalizer::new(
500 &upstream.info.stream_key,
501 current_stream_key_indices,
502 );
503 let mut upstream = upstream.into_executor(self.barrier_rx).execute();
504 let mut epoch_row_count = 0;
505 while let Some(msg) = upstream.try_next().await? {
507 let Some(msg) = update_normalizer.normalize_message(msg) else {
508 continue;
509 };
510 match msg {
511 Message::Barrier(barrier) => {
512 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
513 self.upstream_table
514 .vnodes()
515 .iter_vnodes()
516 .for_each(|vnode| {
517 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
520 });
521 epoch_row_count = 0;
522 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
523 barrier_epoch = barrier.epoch;
524 if need_report_finish {
525 need_report_finish = false;
526 self.progress.finish_consuming_log_store(barrier_epoch);
527 }
528 let post_commit = backfill_state.commit(barrier.epoch).await?;
529 yield Message::Barrier(barrier);
530 if let Some(new_vnode_bitmap) =
531 post_commit.post_yield_barrier(update_vnode_bitmap).await?
532 {
533 let _prev_vnode_bitmap =
534 self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
535 backfill_state
536 .latest_progress()
537 .for_each(|(vnode, progress)| {
538 let progress = progress.expect("should not be empty");
539 assert_eq!(
540 progress.epoch, barrier_epoch.prev,
541 "vnode {:?} has unexpected progress epoch",
542 vnode
543 );
544 assert_eq!(
545 progress.progress,
546 EpochBackfillProgress::Consumed,
547 "vnode {:?} has unexpected progress",
548 vnode
549 );
550 });
551 }
552 }
553 msg => {
554 if let Message::Chunk(chunk) = &msg {
555 epoch_row_count += chunk.cardinality();
556 }
557 yield msg;
558 }
559 }
560 }
561 }
562}
563
564impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
565 fn execute(self: Box<Self>) -> BoxedMessageStream {
566 let output_indices = self.output_indices.clone();
567 self.execute_inner()
568 .filter_map(move |result| {
569 ready({
570 match result {
571 Ok(message) => mapping_message(message, &output_indices).map(Ok),
572 Err(e) => Some(Err(e)),
573 }
574 })
575 })
576 .boxed()
577 }
578}
579
580struct ConsumingSnapshot;
581struct ConsumingLogStore;
582
583#[derive(Debug)]
584struct PendingBarriers {
585 first_upstream_barrier_epoch: EpochPair,
586
587 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
590
591 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
595}
596
597impl PendingBarriers {
598 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
599 Self {
600 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
601 pending_non_checkpoint_barriers: Default::default(),
602 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
603 first_upstream_barrier,
604 ])]),
605 }
606 }
607
608 fn add(&mut self, barrier: DispatcherBarrier) {
609 let is_checkpoint = barrier.kind.is_checkpoint();
610 self.pending_non_checkpoint_barriers.push_front(barrier);
611 if is_checkpoint {
612 self.checkpoint_barrier_groups
613 .push_front(take(&mut self.pending_non_checkpoint_barriers));
614 }
615 }
616
617 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
618 self.checkpoint_barrier_groups.pop_back()
619 }
620
621 fn consume_epoch(&mut self, epoch: EpochPair) {
622 let barriers = self
623 .checkpoint_barrier_groups
624 .back_mut()
625 .expect("non-empty");
626 let oldest_upstream_barrier = barriers.back().expect("non-empty");
627 assert!(
628 oldest_upstream_barrier.epoch.prev >= epoch.prev,
629 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
630 oldest_upstream_barrier.epoch,
631 epoch
632 );
633 if oldest_upstream_barrier.epoch.prev == epoch.prev {
634 assert_eq!(oldest_upstream_barrier.epoch, epoch);
635 barriers.pop_back();
636 if barriers.is_empty() {
637 self.checkpoint_barrier_groups.pop_back();
638 }
639 }
640 }
641
642 fn latest_epoch(&self) -> Option<EpochPair> {
643 self.pending_non_checkpoint_barriers
644 .front()
645 .or_else(|| {
646 self.checkpoint_barrier_groups
647 .front()
648 .and_then(|barriers| barriers.front())
649 })
650 .map(|barrier| barrier.epoch)
651 }
652
653 fn checkpoint_epoch_count(&self) -> usize {
654 self.checkpoint_barrier_groups.len()
655 }
656
657 fn has_checkpoint_epoch(&self) -> bool {
658 !self.checkpoint_barrier_groups.is_empty()
659 }
660}
661
662enum SnapshotBackfillUpstream<S> {
663 Empty,
664 Buffer(UpstreamBuffer<S>),
665}
666
667impl<S> SnapshotBackfillUpstream<S> {
668 async fn run_future<T, E: Into<StreamExecutorError>>(
669 &mut self,
670 future: impl Future<Output = Result<T, E>>,
671 ) -> StreamExecutorResult<T> {
672 match self {
673 SnapshotBackfillUpstream::Empty => future.await.map_err(Into::into),
674 SnapshotBackfillUpstream::Buffer(buffer) => buffer.run_future(future).await,
675 }
676 }
677}
678
679impl SnapshotBackfillUpstream<ConsumingSnapshot> {
680 fn start_consuming_log_store(
681 self,
682 consumed_epoch: u64,
683 ) -> Either<SnapshotBackfillUpstream<ConsumingLogStore>, MergeExecutorInput> {
684 match self {
685 SnapshotBackfillUpstream::Empty => Either::Left(SnapshotBackfillUpstream::Empty),
686 SnapshotBackfillUpstream::Buffer(buffer) => {
687 match buffer.start_consuming_log_store(consumed_epoch) {
688 Either::Left(buffer) => Either::Left(SnapshotBackfillUpstream::Buffer(buffer)),
689 Either::Right(input) => Either::Right(input),
690 }
691 }
692 }
693 }
694}
695
696impl SnapshotBackfillUpstream<ConsumingLogStore> {
697 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
698 match self {
699 SnapshotBackfillUpstream::Empty => Ok(false),
700 SnapshotBackfillUpstream::Buffer(buffer) => buffer.consumed_epoch(epoch).await,
701 }
702 }
703
704 fn start_consuming_upstream(self) -> MergeExecutorInput {
705 match self {
706 SnapshotBackfillUpstream::Empty => {
707 unreachable!("unlike to start consuming upstream when having no upstream")
708 }
709 SnapshotBackfillUpstream::Buffer(buffer) => buffer.start_consuming_upstream(),
710 }
711 }
712}
713
714struct UpstreamBuffer<S> {
715 upstream: MergeExecutorInput,
716 max_pending_epoch_lag: u64,
717 consumed_epoch: u64,
718 upstream_pending_barriers: PendingBarriers,
720 is_polling_epoch_data: bool,
725 consume_upstream_row_count: LabelGuardedIntCounter,
726 _phase: S,
727}
728
729impl UpstreamBuffer<ConsumingSnapshot> {
730 fn new(
731 upstream: MergeExecutorInput,
732 first_upstream_barrier: DispatcherBarrier,
733 consume_upstream_row_count: LabelGuardedIntCounter,
734 ) -> Self {
735 Self {
736 upstream,
737 is_polling_epoch_data: false,
738 consume_upstream_row_count,
739 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
740 max_pending_epoch_lag: u64::MAX,
742 consumed_epoch: 0,
743 _phase: ConsumingSnapshot {},
744 }
745 }
746
747 fn start_consuming_log_store(
748 mut self,
749 consumed_epoch: u64,
750 ) -> Either<UpstreamBuffer<ConsumingLogStore>, MergeExecutorInput> {
751 if self
752 .upstream_pending_barriers
753 .first_upstream_barrier_epoch
754 .prev
755 == consumed_epoch
756 {
757 assert_eq!(
758 1,
759 self.upstream_pending_barriers
760 .pop()
761 .expect("non-empty")
762 .len()
763 );
764 }
765 let max_pending_epoch_lag = self.pending_epoch_lag();
766 let buffer = UpstreamBuffer {
767 upstream: self.upstream,
768 upstream_pending_barriers: self.upstream_pending_barriers,
769 max_pending_epoch_lag,
770 is_polling_epoch_data: self.is_polling_epoch_data,
771 consume_upstream_row_count: self.consume_upstream_row_count,
772 consumed_epoch,
773 _phase: ConsumingLogStore {},
774 };
775 if buffer.is_finished() {
776 Either::Right(buffer.upstream)
777 } else {
778 Either::Left(buffer)
779 }
780 }
781}
782
783impl<S> UpstreamBuffer<S> {
784 fn can_consume_upstream(&self) -> bool {
785 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
786 }
787
788 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
789 {
790 loop {
791 if let Err(e) = try {
792 if !self.can_consume_upstream() {
793 sleep(Duration::from_secs(30)).await;
795 warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
796 return pending().await;
797 }
798 self.consume_until_next_checkpoint_barrier().await?;
799 } {
800 break e;
801 }
802 }
803 }
804 }
805
806 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
808 loop {
809 let msg: DispatcherMessage = self
810 .upstream
811 .try_next()
812 .await?
813 .ok_or_else(|| anyhow!("end of upstream"))?;
814 match msg {
815 DispatcherMessage::Chunk(chunk) => {
816 self.is_polling_epoch_data = true;
817 self.consume_upstream_row_count
818 .inc_by(chunk.cardinality() as _);
819 }
820 DispatcherMessage::Barrier(barrier) => {
821 let is_checkpoint = barrier.kind.is_checkpoint();
822 self.upstream_pending_barriers.add(barrier);
823 if is_checkpoint {
824 self.is_polling_epoch_data = false;
825 break;
826 } else {
827 self.is_polling_epoch_data = true;
828 }
829 }
830 DispatcherMessage::Watermark(_) => {
831 self.is_polling_epoch_data = true;
832 }
833 }
834 }
835 Ok(())
836 }
837}
838
839impl UpstreamBuffer<ConsumingLogStore> {
840 #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
841 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
842 assert!(!self.is_finished());
843 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
844 assert!(self.is_polling_epoch_data);
847 self.consume_until_next_checkpoint_barrier().await?;
848 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
849 }
850 self.upstream_pending_barriers.consume_epoch(epoch);
851
852 {
853 {
854 let prev_epoch = epoch.prev;
855 assert!(self.consumed_epoch < prev_epoch);
856 let elapsed_epoch = prev_epoch - self.consumed_epoch;
857 self.consumed_epoch = prev_epoch;
858 if self.upstream_pending_barriers.has_checkpoint_epoch() {
859 while self.can_consume_upstream()
861 && let Some(result) =
862 self.consume_until_next_checkpoint_barrier().now_or_never()
863 {
864 result?;
865 }
866 }
867 self.max_pending_epoch_lag = min(
871 self.pending_epoch_lag(),
872 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
873 );
874 }
875 }
876 Ok(self.is_finished())
877 }
878
879 fn is_finished(&self) -> bool {
880 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
881 assert!(
882 self.upstream_pending_barriers
883 .pending_non_checkpoint_barriers
884 .is_empty()
885 )
886 }
887 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
888 }
889
890 fn start_consuming_upstream(self) -> MergeExecutorInput {
891 assert!(self.is_finished());
892 assert_eq!(self.pending_epoch_lag(), 0);
893 self.upstream
894 }
895}
896
897impl<S> UpstreamBuffer<S> {
898 async fn run_future<T, E: Into<StreamExecutorError>>(
901 &mut self,
902 future: impl Future<Output = Result<T, E>>,
903 ) -> StreamExecutorResult<T> {
904 select! {
905 biased;
906 e = self.concurrently_consume_upstream() => {
907 Err(e)
908 }
909 result = future => {
911 result.map_err(Into::into)
912 }
913 }
914 }
915
916 fn pending_epoch_lag(&self) -> u64 {
917 self.upstream_pending_barriers
918 .latest_epoch()
919 .map(|epoch| {
920 epoch
921 .prev
922 .checked_sub(self.consumed_epoch)
923 .expect("pending epoch must be later than consumed_epoch")
924 })
925 .unwrap_or(0)
926 }
927}
928
929#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
930async fn make_log_stream(
931 upstream_table: &BatchTable<impl StateStore>,
932 start_epoch: u64,
933 end_epoch: u64,
934 start_pk: Option<OwnedRow>,
935 chunk_size: usize,
936) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
937 let data_types = upstream_table.schema().data_types();
938 let start_pk = start_pk.as_ref();
939 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
941 upstream_table
942 .batch_iter_vnode_log(
943 start_epoch,
944 HummockReadEpoch::Committed(end_epoch),
945 start_pk,
946 vnode,
947 )
948 .map_ok(move |stream| {
949 let stream = stream.map_err(Into::into);
950 (vnode, stream, 0)
951 })
952 }))
953 .await?;
954 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
955 Ok(VnodeStream::new(
956 vnode_streams,
957 upstream_table.pk_in_output_indices().expect("should exist"),
958 builder,
959 ))
960}
961
962async fn make_snapshot_stream(
963 upstream_table: &BatchTable<impl StateStore>,
964 snapshot_epoch: u64,
965 backfill_state: &BackfillState<impl StateStore>,
966 rate_limit: RateLimit,
967 chunk_size: usize,
968 snapshot_rebuild_interval: Duration,
969 pk_scan_range: &PkScanRange,
970) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
971 let data_types = upstream_table.schema().data_types();
972 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
973 move |(vnode, progress)| {
974 let start_pk = match progress {
975 None => Some((None, 0)),
976 Some(VnodeBackfillProgress {
977 row_count,
978 progress: EpochBackfillProgress::Consuming { latest_pk },
979 ..
980 }) => Some((Some(latest_pk), *row_count)),
981 Some(VnodeBackfillProgress {
982 progress: EpochBackfillProgress::Consumed,
983 ..
984 }) => None,
985 };
986 start_pk.map(|(start_pk, row_count)| {
987 upstream_table
988 .batch_iter_vnode_with_pk_range(
989 HummockReadEpoch::Committed(snapshot_epoch),
990 start_pk,
991 &pk_scan_range.pk_prefix,
992 &pk_scan_range.range_bounds,
993 vnode,
994 PrefetchOptions::prefetch_for_large_range_scan(),
995 snapshot_rebuild_interval,
996 )
997 .map_ok(move |stream| {
998 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
999 (vnode, stream, row_count)
1000 })
1001 })
1002 },
1003 ))
1004 .await?;
1005 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
1006 Ok(VnodeStream::new(
1007 vnode_streams,
1008 upstream_table.pk_in_output_indices().expect("should exist"),
1009 builder,
1010 ))
1011}
1012
1013#[expect(clippy::too_many_arguments)]
1014#[try_stream(ok = Message, error = StreamExecutorError)]
1015async fn make_consume_snapshot_stream<'a, S: StateStore>(
1016 upstream_table: &'a BatchTable<S>,
1017 snapshot_epoch: u64,
1018 chunk_size: usize,
1019 rate_limiter: &'a MonitoredRateLimiter,
1020 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
1021 progress: &'a mut CreateMviewProgressReporter,
1022 backfill_state: &'a mut BackfillState<S>,
1023 first_recv_barrier_epoch: EpochPair,
1024 initial_backfill_paused: bool,
1025 actor_ctx: &'a ActorContextRef,
1026 pk_scan_range: &'a PkScanRange,
1027) {
1028 let mut barrier_epoch = first_recv_barrier_epoch;
1029
1030 let mut snapshot_stream = make_snapshot_stream(
1032 upstream_table,
1033 snapshot_epoch,
1034 &*backfill_state,
1035 rate_limiter.rate_limit(),
1036 chunk_size,
1037 actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
1038 pk_scan_range,
1039 )
1040 .await?;
1041
1042 async fn select_barrier_and_snapshot_stream(
1043 barrier_rx: &mut UnboundedReceiver<Barrier>,
1044 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
1045 throttle_snapshot_stream: bool,
1046 backfill_paused: bool,
1047 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
1048 select!(
1049 result = receive_next_barrier(barrier_rx) => {
1050 Ok(Either::Left(result?))
1051 },
1052 result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
1053 Ok(Either::Right(result?))
1054 }
1055 )
1056 }
1057
1058 let mut backfill_paused = initial_backfill_paused;
1059 loop {
1060 let throttle_snapshot_stream = matches!(rate_limiter.rate_limit(), RateLimit::Pause);
1061 match select_barrier_and_snapshot_stream(
1062 barrier_rx,
1063 &mut snapshot_stream,
1064 throttle_snapshot_stream,
1065 backfill_paused,
1066 )
1067 .await?
1068 {
1069 Either::Left(barrier) => {
1070 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1071 barrier_epoch = barrier.epoch;
1072
1073 if barrier_epoch.curr >= snapshot_epoch {
1074 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
1075 }
1076 if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
1077 backfill_paused = false;
1078 }
1079 if let Some(chunk) = snapshot_stream.consume_builder() {
1080 rate_limiter.wait(chunk.cardinality() as _).await;
1081 yield Message::Chunk(chunk);
1082 }
1083 snapshot_stream
1084 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1085 if let Some(pk) = pk_progress {
1086 backfill_state.update_epoch_progress(
1087 vnode,
1088 snapshot_epoch,
1089 row_count,
1090 pk,
1091 );
1092 } else {
1093 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1094 }
1095 })
1096 .await?;
1097 let count = backfill_state.total_row_count();
1098 let post_commit = backfill_state.commit(barrier.epoch).await?;
1099 trace!(?barrier_epoch, count, "update progress");
1100 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
1101
1102 let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
1103 if let Mutation::Throttle(config) = &**m
1104 && let Some(config) = config.get(&actor_ctx.fragment_id)
1105 && config.throttle_type() == PbThrottleType::Backfill
1106 {
1107 Some(config.rate_limit)
1108 } else {
1109 None
1110 }
1111 });
1112 yield Message::Barrier(barrier);
1113 post_commit.post_yield_barrier(None).await?;
1114
1115 if let Some(new_rate_limit) = new_rate_limit {
1116 let new_rate_limit = new_rate_limit.into();
1117 rate_limiter.update(new_rate_limit);
1118 snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
1119 }
1120 }
1121 Either::Right(Some(chunk)) => {
1122 if backfill_paused {
1123 return Err(
1124 anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
1125 );
1126 }
1127 rate_limiter.wait(chunk.cardinality() as _).await;
1128 yield Message::Chunk(chunk);
1129 }
1130 Either::Right(None) => {
1131 break;
1132 }
1133 }
1134 }
1135
1136 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
1138 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
1139 barrier_epoch = barrier_to_report_finish.epoch;
1140 snapshot_stream
1141 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1142 assert_eq!(pk_progress, None);
1143 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1144 })
1145 .await?;
1146 let count = backfill_state.total_row_count();
1147 trace!(?barrier_epoch, count, "report finish");
1148 let post_commit = backfill_state.commit(barrier_epoch).await?;
1149 progress.finish(barrier_epoch, count as _);
1150 yield Message::Barrier(barrier_to_report_finish);
1151 post_commit.post_yield_barrier(None).await?;
1152
1153 loop {
1155 let barrier = receive_next_barrier(barrier_rx).await?;
1156 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1157 barrier_epoch = barrier.epoch;
1158 let post_commit = backfill_state.commit(barrier.epoch).await?;
1159 yield Message::Barrier(barrier);
1160 post_commit.post_yield_barrier(None).await?;
1161 if barrier_epoch.curr == snapshot_epoch {
1162 break;
1163 }
1164 }
1165 trace!(?barrier_epoch, "finish consuming snapshot");
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170 use std::collections::HashSet;
1171 use std::sync::Arc;
1172
1173 use risingwave_common::array::StreamChunk;
1174 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1175 use risingwave_common::row::OwnedRow;
1176 use risingwave_common::test_prelude::StreamChunkTestExt;
1177 use risingwave_common::types::DataType;
1178 use risingwave_common::util::epoch::{EpochPair, test_epoch};
1179 use risingwave_common::util::sort_util::OrderType;
1180 use risingwave_hummock_test::test_utils::{HummockTestEnv, prepare_hummock_test_env};
1181 use risingwave_rpc_client::HummockMetaClient;
1182 use risingwave_storage::hummock::HummockStorage;
1183 use risingwave_storage::table::batch_table::BatchTable;
1184 use tokio::sync::mpsc::unbounded_channel;
1185 use tokio::time::{Duration, timeout};
1186
1187 use super::*;
1188 use crate::common::table::state_table::{
1189 StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
1190 };
1191 use crate::common::table::test_utils::gen_pbtable_with_value_indices;
1192 use crate::executor::exchange::input::{Input, LocalInput};
1193 use crate::executor::exchange::permit::channel_for_test;
1194 use crate::executor::{ActorContext, DispatcherMessage, ExecutorInfo, MergeExecutorUpstream};
1195 use crate::task::LocalBarrierManager;
1196
1197 const SOURCE_TABLE_ID: TableId = TableId::new(0x233);
1198 const PROGRESS_TABLE_ID: TableId = TableId::new(0x234);
1199
1200 fn source_table_pb() -> risingwave_pb::catalog::PbTable {
1201 gen_pbtable_with_value_indices(
1202 SOURCE_TABLE_ID,
1203 vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1204 vec![OrderType::ascending()],
1205 vec![0],
1206 0,
1207 vec![0],
1208 )
1209 }
1210
1211 fn progress_table_pb() -> risingwave_pb::catalog::PbTable {
1212 gen_pbtable_with_value_indices(
1213 PROGRESS_TABLE_ID,
1214 vec![
1215 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int16),
1216 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1217 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1218 ColumnDesc::unnamed(ColumnId::new(3), DataType::Boolean),
1219 ColumnDesc::unnamed(ColumnId::new(4), DataType::Int64),
1220 ],
1221 vec![OrderType::ascending()],
1222 vec![0],
1223 1,
1224 vec![1, 2, 3, 4],
1225 )
1226 }
1227
1228 fn source_batch_table(store: HummockStorage) -> BatchTable<HummockStorage> {
1229 BatchTable::for_test(
1230 store,
1231 SOURCE_TABLE_ID,
1232 vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1233 vec![OrderType::ascending()],
1234 vec![0],
1235 vec![0],
1236 )
1237 }
1238
1239 async fn source_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1240 StateTableBuilder::new(&source_table_pb(), store, None)
1241 .with_op_consistency_level(StateTableOpConsistencyLevel::LogStoreEnabled)
1242 .forbid_preload_all_rows()
1243 .build()
1244 .await
1245 }
1246
1247 async fn progress_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1248 StateTable::from_table_catalog(&progress_table_pb(), store, None).await
1249 }
1250
1251 async fn commit_insert_epoch(
1252 test_env: &HummockTestEnv,
1253 source_state_table: &mut StateTable<HummockStorage>,
1254 epoch: &mut EpochPair,
1255 table_ids: HashSet<TableId>,
1256 values: &[i64],
1257 ) {
1258 for value in values {
1259 source_state_table.insert(OwnedRow::new(vec![Some((*value).into())]));
1260 }
1261 epoch.inc_for_test();
1262 test_env.storage.start_epoch(epoch.curr, table_ids);
1263 source_state_table.commit_for_test(*epoch).await.unwrap();
1264 let res = test_env
1265 .storage
1266 .seal_and_sync_epoch(epoch.prev, HashSet::from_iter([SOURCE_TABLE_ID]))
1267 .await
1268 .unwrap();
1269 test_env
1270 .meta_client
1271 .commit_epoch_with_change_log(epoch.prev, res, Some(vec![epoch.prev]))
1272 .await
1273 .unwrap();
1274 test_env
1275 .storage
1276 .wait_version(test_env.manager.get_current_version().await)
1277 .await;
1278 }
1279
1280 fn start_progress_epochs(test_env: &HummockTestEnv, max_epoch: u64) {
1281 for epoch in 1..=max_epoch {
1282 test_env
1283 .storage
1284 .start_epoch(test_epoch(epoch), HashSet::from_iter([PROGRESS_TABLE_ID]));
1285 }
1286 }
1287
1288 fn make_upstream_input(
1289 barrier_manager: LocalBarrierManager,
1290 actor_ctx: ActorContextRef,
1291 rx: crate::executor::exchange::permit::Receiver,
1292 ) -> MergeExecutorInput {
1293 MergeExecutorInput::new(
1294 MergeExecutorUpstream::Singleton(LocalInput::new(rx, 1001.into()).boxed_input()),
1295 actor_ctx,
1296 1919.into(),
1297 barrier_manager,
1298 Arc::new(StreamingMetrics::unused()),
1299 ExecutorInfo::for_test(
1300 Schema::new(vec![Field::unnamed(DataType::Int64)]),
1301 vec![0],
1302 "SnapshotBackfillUpstream".to_owned(),
1303 0,
1304 ),
1305 )
1306 }
1307
1308 async fn expect_barrier_with_timeout(
1309 executor: &mut BoxedMessageStream,
1310 reason: &str,
1311 ) -> Barrier {
1312 let message = timeout(Duration::from_secs(10), executor.next())
1313 .await
1314 .unwrap_or_else(|_| panic!("timed out waiting for barrier: {reason}"))
1315 .unwrap()
1316 .unwrap();
1317 match message {
1318 Message::Barrier(barrier) => barrier,
1319 other => panic!("expected barrier for {reason}, got {other:?}"),
1320 }
1321 }
1322
1323 async fn expect_chunk_with_timeout(
1324 executor: &mut BoxedMessageStream,
1325 reason: &str,
1326 ) -> StreamChunk {
1327 let message = timeout(Duration::from_secs(10), executor.next())
1328 .await
1329 .unwrap_or_else(|_| panic!("timed out waiting for chunk: {reason}"))
1330 .unwrap()
1331 .unwrap();
1332 match message {
1333 Message::Chunk(chunk) => chunk,
1334 other => panic!("expected chunk for {reason}, got {other:?}"),
1335 }
1336 }
1337
1338 async fn expect_pending_with_timeout(executor: &mut BoxedMessageStream, reason: &str) {
1339 assert!(
1340 timeout(Duration::from_millis(200), executor.next())
1341 .await
1342 .is_err(),
1343 "executor unexpectedly produced a message while waiting for {reason}"
1344 );
1345 }
1346
1347 #[tokio::test]
1348 async fn test_snapshot_backfill_without_upstream_on_hummock() {
1349 let source_env = prepare_hummock_test_env().await;
1350 source_env.register_table(source_table_pb()).await;
1351 let progress_env = prepare_hummock_test_env().await;
1352 progress_env.register_table(progress_table_pb()).await;
1353
1354 let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1355 let source_table = source_batch_table(source_env.storage.clone());
1356 let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1357
1358 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1359 source_env
1360 .storage
1361 .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1362 source_state_table.init_epoch(epoch).await.unwrap();
1363
1364 commit_insert_epoch(
1365 &source_env,
1366 &mut source_state_table,
1367 &mut epoch,
1368 HashSet::from_iter([SOURCE_TABLE_ID]),
1369 &[1],
1370 )
1371 .await;
1372 commit_insert_epoch(
1373 &source_env,
1374 &mut source_state_table,
1375 &mut epoch,
1376 HashSet::from_iter([SOURCE_TABLE_ID]),
1377 &[2],
1378 )
1379 .await;
1380 commit_insert_epoch(
1381 &source_env,
1382 &mut source_state_table,
1383 &mut epoch,
1384 HashSet::from_iter([SOURCE_TABLE_ID]),
1385 &[3],
1386 )
1387 .await;
1388 commit_insert_epoch(
1389 &source_env,
1390 &mut source_state_table,
1391 &mut epoch,
1392 HashSet::from_iter([SOURCE_TABLE_ID]),
1393 &[],
1394 )
1395 .await;
1396 start_progress_epochs(&progress_env, 5);
1397
1398 let barrier_manager = LocalBarrierManager::for_test();
1399 let progress = CreateMviewProgressReporter::for_test(barrier_manager);
1400 let actor_ctx = ActorContext::for_test(1234);
1401 let (barrier_tx, barrier_rx) = unbounded_channel();
1402 barrier_tx
1403 .send(Barrier::new_test_barrier(test_epoch(1)))
1404 .unwrap();
1405
1406 let mut executor = SnapshotBackfillExecutor::new(
1407 source_table,
1408 progress_state_table,
1409 None,
1410 None,
1411 vec![0],
1412 vec![0],
1413 actor_ctx,
1414 progress,
1415 1024,
1416 RateLimit::Disabled,
1417 barrier_rx,
1418 Arc::new(StreamingMetrics::unused()),
1419 Some(test_epoch(3)),
1420 )
1421 .expect("snapshot backfill executor should be created")
1422 .boxed()
1423 .execute();
1424
1425 assert_eq!(
1426 expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1427 .await
1428 .epoch,
1429 Barrier::new_test_barrier(test_epoch(1)).epoch
1430 );
1431 assert_eq!(
1432 expect_chunk_with_timeout(&mut executor, "snapshot chunk without upstream").await,
1433 StreamChunk::from_pretty(
1434 " I
1435 + 1
1436 + 2
1437 + 3"
1438 )
1439 );
1440 expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1441
1442 barrier_tx
1443 .send(Barrier::new_test_barrier(test_epoch(2)))
1444 .unwrap();
1445 assert_eq!(
1446 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1447 .await
1448 .epoch,
1449 Barrier::new_test_barrier(test_epoch(2)).epoch
1450 );
1451
1452 barrier_tx
1453 .send(Barrier::new_test_barrier(test_epoch(3)))
1454 .unwrap();
1455 assert_eq!(
1456 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1457 .await
1458 .epoch,
1459 Barrier::new_test_barrier(test_epoch(3)).epoch
1460 );
1461
1462 barrier_tx
1463 .send(Barrier::new_test_barrier(test_epoch(4)))
1464 .unwrap();
1465 assert_eq!(
1466 expect_barrier_with_timeout(&mut executor, "post-snapshot barrier 4")
1467 .await
1468 .epoch,
1469 Barrier::new_test_barrier(test_epoch(4)).epoch
1470 );
1471
1472 barrier_tx
1473 .send(Barrier::new_test_barrier(test_epoch(5)))
1474 .unwrap();
1475 assert_eq!(
1476 expect_barrier_with_timeout(&mut executor, "steady-state barrier 5")
1477 .await
1478 .epoch,
1479 Barrier::new_test_barrier(test_epoch(5)).epoch
1480 );
1481
1482 expect_pending_with_timeout(&mut executor, "next local barrier").await;
1483 }
1484
1485 #[tokio::test]
1486 async fn test_snapshot_backfill_with_upstream_on_hummock() {
1487 let source_env = prepare_hummock_test_env().await;
1488 source_env.register_table(source_table_pb()).await;
1489 let progress_env = prepare_hummock_test_env().await;
1490 progress_env.register_table(progress_table_pb()).await;
1491
1492 let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1493 let source_table = source_batch_table(source_env.storage.clone());
1494 let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1495
1496 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1497 source_env
1498 .storage
1499 .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1500 source_state_table.init_epoch(epoch).await.unwrap();
1501
1502 commit_insert_epoch(
1503 &source_env,
1504 &mut source_state_table,
1505 &mut epoch,
1506 HashSet::from_iter([SOURCE_TABLE_ID]),
1507 &[],
1508 )
1509 .await;
1510 commit_insert_epoch(
1511 &source_env,
1512 &mut source_state_table,
1513 &mut epoch,
1514 HashSet::from_iter([SOURCE_TABLE_ID]),
1515 &[],
1516 )
1517 .await;
1518 commit_insert_epoch(
1519 &source_env,
1520 &mut source_state_table,
1521 &mut epoch,
1522 HashSet::from_iter([SOURCE_TABLE_ID]),
1523 &[],
1524 )
1525 .await;
1526 commit_insert_epoch(
1527 &source_env,
1528 &mut source_state_table,
1529 &mut epoch,
1530 HashSet::from_iter([SOURCE_TABLE_ID]),
1531 &[4],
1532 )
1533 .await;
1534 start_progress_epochs(&progress_env, 6);
1535
1536 let barrier_manager = LocalBarrierManager::for_test();
1537 let progress = CreateMviewProgressReporter::for_test(barrier_manager.clone());
1538 let actor_ctx = ActorContext::for_test(1235);
1539 let (barrier_tx, barrier_rx) = unbounded_channel();
1540 let (upstream_tx, upstream_rx) = channel_for_test();
1541
1542 upstream_tx
1543 .send(
1544 DispatcherMessage::Barrier(
1545 Barrier::new_test_barrier(test_epoch(5)).into_dispatcher(),
1546 )
1547 .into(),
1548 )
1549 .await
1550 .unwrap();
1551 barrier_tx
1552 .send(Barrier::new_test_barrier(test_epoch(1)))
1553 .unwrap();
1554
1555 let mut executor = SnapshotBackfillExecutor::new(
1556 source_table,
1557 progress_state_table,
1558 Some(make_upstream_input(
1559 barrier_manager,
1560 actor_ctx.clone(),
1561 upstream_rx,
1562 )),
1563 None,
1564 vec![0],
1565 vec![0],
1566 actor_ctx,
1567 progress,
1568 1024,
1569 RateLimit::Disabled,
1570 barrier_rx,
1571 Arc::new(StreamingMetrics::unused()),
1572 Some(test_epoch(3)),
1573 )
1574 .expect("snapshot backfill executor should be created")
1575 .boxed()
1576 .execute();
1577
1578 assert_eq!(
1579 expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1580 .await
1581 .epoch,
1582 Barrier::new_test_barrier(test_epoch(1)).epoch
1583 );
1584 expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1585 barrier_tx
1586 .send(Barrier::new_test_barrier(test_epoch(2)))
1587 .unwrap();
1588 assert_eq!(
1589 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1590 .await
1591 .epoch,
1592 Barrier::new_test_barrier(test_epoch(2)).epoch
1593 );
1594
1595 barrier_tx
1596 .send(Barrier::new_test_barrier(test_epoch(3)))
1597 .unwrap();
1598 assert_eq!(
1599 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1600 .await
1601 .epoch,
1602 Barrier::new_test_barrier(test_epoch(3)).epoch
1603 );
1604
1605 barrier_tx
1606 .send(Barrier::new_test_barrier(test_epoch(4)))
1607 .unwrap();
1608 assert_eq!(
1609 expect_barrier_with_timeout(&mut executor, "snapshot completion barrier 4")
1610 .await
1611 .epoch,
1612 Barrier::new_test_barrier(test_epoch(4)).epoch
1613 );
1614
1615 barrier_tx
1616 .send(Barrier::new_test_barrier(test_epoch(5)))
1617 .unwrap();
1618 assert_eq!(
1619 expect_chunk_with_timeout(&mut executor, "log-store replay chunk").await,
1620 StreamChunk::from_pretty(
1621 " I
1622 + 4"
1623 )
1624 );
1625 assert_eq!(
1626 expect_barrier_with_timeout(&mut executor, "log-store completion barrier")
1627 .await
1628 .epoch,
1629 Barrier::new_test_barrier(test_epoch(5)).epoch
1630 );
1631
1632 upstream_tx
1633 .send(DispatcherMessage::Chunk(StreamChunk::from_pretty(" I\n + 5")).into())
1634 .await
1635 .unwrap();
1636 let stop_barrier = Barrier::new_test_barrier(test_epoch(6)).with_stop();
1637 upstream_tx
1638 .send(DispatcherMessage::Barrier(stop_barrier.clone().into_dispatcher()).into())
1639 .await
1640 .unwrap();
1641 barrier_tx.send(stop_barrier.clone()).unwrap();
1642
1643 assert_eq!(
1644 expect_chunk_with_timeout(&mut executor, "live upstream chunk after handoff").await,
1645 StreamChunk::from_pretty(" I\n + 5")
1646 );
1647 assert_eq!(
1648 expect_barrier_with_timeout(&mut executor, "final stop barrier")
1649 .await
1650 .epoch,
1651 stop_barrier.epoch
1652 );
1653 }
1654}