1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::batch_plan::ScanRange;
33use risingwave_pb::common::PbThrottleType;
34use risingwave_storage::StateStore;
35use risingwave_storage::store::PrefetchOptions;
36use risingwave_storage::table::ChangeLogRow;
37use risingwave_storage::table::batch_table::{BatchTable, PkScanRange};
38use tokio::select;
39use tokio::sync::mpsc::UnboundedReceiver;
40use tokio::time::sleep;
41
42use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
43use crate::executor::backfill::snapshot_backfill::state::{
44 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
45};
46use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
47use crate::executor::backfill::utils::{
48 UpstreamStreamKeyUpdateNormalizer, create_builder, mapping_message,
49};
50use crate::executor::monitor::StreamingMetrics;
51use crate::executor::prelude::{StateTable, StreamExt, try_stream};
52use crate::executor::{
53 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
54 MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
55 expect_first_barrier,
56};
57use crate::task::CreateMviewProgressReporter;
58
59pub struct SnapshotBackfillExecutor<S: StateStore> {
60 upstream_table: BatchTable<S>,
62
63 progress_state_table: StateTable<S>,
65
66 upstream: Option<MergeExecutorInput>,
68
69 output_indices: Vec<usize>,
71
72 stream_key: Vec<usize>,
74
75 progress: CreateMviewProgressReporter,
76
77 chunk_size: usize,
78 rate_limit: RateLimit,
79
80 barrier_rx: UnboundedReceiver<Barrier>,
81
82 actor_ctx: ActorContextRef,
83 metrics: Arc<StreamingMetrics>,
84
85 snapshot_epoch: Option<u64>,
86 pk_scan_range: PkScanRange,
88}
89
90impl<S: StateStore> SnapshotBackfillExecutor<S> {
91 fn build_pk_scan_range(
92 pb_scan_range: Option<&ScanRange>,
93 upstream_table: &BatchTable<S>,
94 ) -> StreamExecutorResult<PkScanRange> {
95 match pb_scan_range {
96 Some(scan_range) => Ok(PkScanRange::new(
97 scan_range.clone(),
98 upstream_table.pk_serializer().get_data_types().to_vec(),
99 )?),
100 None => Ok(PkScanRange::full()),
101 }
102 }
103
104 #[expect(clippy::too_many_arguments)]
105 pub(crate) fn new(
106 upstream_table: BatchTable<S>,
107 progress_state_table: StateTable<S>,
108 upstream: Option<MergeExecutorInput>,
109 pb_pk_scan_range: Option<&ScanRange>,
110 output_indices: Vec<usize>,
111 stream_key: Vec<usize>,
112 actor_ctx: ActorContextRef,
113 progress: CreateMviewProgressReporter,
114 chunk_size: usize,
115 rate_limit: RateLimit,
116 barrier_rx: UnboundedReceiver<Barrier>,
117 metrics: Arc<StreamingMetrics>,
118 snapshot_epoch: Option<u64>,
119 ) -> StreamExecutorResult<Self> {
120 if let Some(upstream) = &upstream {
121 assert_eq!(&upstream.info.schema, upstream_table.schema());
122 }
123 if upstream_table.pk_in_output_indices().is_none() {
124 panic!(
125 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
126 upstream_table.pk_indices(),
127 upstream_table.output_indices(),
128 upstream_table.schema()
129 )
130 };
131 assert!(
132 stream_key.iter().all(|idx| *idx < output_indices.len()),
133 "stream key indices should refer to output schema: stream_key: {:?}, output_indices: {:?}",
134 stream_key,
135 output_indices
136 );
137 let pk_scan_range = Self::build_pk_scan_range(pb_pk_scan_range, &upstream_table)?;
138 if !matches!(rate_limit, RateLimit::Disabled) {
139 trace!(
140 ?rate_limit,
141 "create snapshot backfill executor with rate limit"
142 );
143 }
144 Ok(Self {
145 upstream_table,
146 progress_state_table,
147 upstream,
148 output_indices,
149 stream_key,
150 progress,
151 chunk_size,
152 rate_limit,
153 barrier_rx,
154 actor_ctx,
155 metrics,
156 snapshot_epoch,
157 pk_scan_range,
158 })
159 }
160
161 #[try_stream(ok = Message, error = StreamExecutorError)]
162 async fn execute_inner(mut self) {
163 trace!("snapshot backfill executor start");
164 let upstream = if let Some(mut upstream) = self.upstream {
165 let first_upstream_barrier = expect_first_barrier(&mut upstream).await?;
166 trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
167 Some((first_upstream_barrier, upstream))
168 } else {
169 None
170 };
171 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
172 trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
173 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
174 self.snapshot_epoch
175 {
176 if let Some((first_upstream_barrier, _)) = &upstream {
177 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
178 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
179 Some(snapshot_epoch)
180 } else {
181 None
182 }
183 } else {
184 Some(snapshot_epoch)
186 }
187 } else {
188 if cfg!(debug_assertions) {
190 panic!(
191 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
192 upstream.map(|(first_upstream_barrier, _)| first_upstream_barrier.epoch),
193 first_recv_barrier.epoch
194 );
195 } else {
196 let (first_upstream_barrier, _) = upstream
197 .as_ref()
198 .ok_or_else(|| anyhow!("no upstream while snapshot epoch not set"))?;
199 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
200 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
201 None
202 }
203 };
204 let first_recv_barrier_epoch = first_recv_barrier.epoch;
205 let initial_backfill_paused =
206 first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
207 yield Message::Barrier(first_recv_barrier);
208 let mut backfill_state = BackfillState::new(
209 self.progress_state_table,
210 first_recv_barrier_epoch,
211 self.upstream_table.pk_serializer().clone(),
212 )
213 .await?;
214
215 let (mut barrier_epoch, mut need_report_finish, upstream) = {
216 if let Some(snapshot_epoch) = should_snapshot_backfill {
217 let table_id_str = format!("{}", self.upstream_table.table_id());
218 let actor_id_str = format!("{}", self.actor_ctx.id);
219
220 let consume_upstream_row_count = self
221 .metrics
222 .snapshot_backfill_consume_row_count
223 .with_guarded_label_values(&[
224 table_id_str.as_str(),
225 actor_id_str.as_str(),
226 "consume_upstream",
227 ]);
228
229 let mut upstream_buffer = if let Some((first_upstream_barrier, upstream)) = upstream
230 {
231 SnapshotBackfillUpstream::Buffer(UpstreamBuffer::new(
232 upstream,
233 first_upstream_barrier,
234 consume_upstream_row_count,
235 ))
236 } else {
237 SnapshotBackfillUpstream::Empty
238 };
239
240 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
242 < snapshot_epoch
243 {
244 trace!(
245 table_id = %self.upstream_table.table_id(),
246 snapshot_epoch,
247 barrier_epoch = ?first_recv_barrier_epoch,
248 "start consuming snapshot"
249 );
250 {
251 let consuming_snapshot_row_count = self
252 .metrics
253 .snapshot_backfill_consume_row_count
254 .with_guarded_label_values(&[
255 table_id_str.as_str(),
256 actor_id_str.as_str(),
257 "consuming_snapshot",
258 ]);
259 let snapshot_stream = make_consume_snapshot_stream(
260 &self.upstream_table,
261 snapshot_epoch,
262 self.chunk_size,
263 &mut self.rate_limit,
264 &mut self.barrier_rx,
265 &mut self.progress,
266 &mut backfill_state,
267 first_recv_barrier_epoch,
268 initial_backfill_paused,
269 &self.actor_ctx,
270 &self.pk_scan_range,
271 );
272
273 pin_mut!(snapshot_stream);
274
275 while let Some(message) = upstream_buffer
276 .run_future(snapshot_stream.try_next())
277 .await?
278 {
279 if let Message::Chunk(chunk) = &message {
280 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
281 }
282 yield message;
283 }
284 }
285
286 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
287 let recv_barrier_epoch = recv_barrier.epoch;
288 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
289 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
290 yield Message::Barrier(recv_barrier);
291 post_commit.post_yield_barrier(None).await?;
292 (
293 recv_barrier_epoch,
294 upstream_buffer.start_consuming_log_store(snapshot_epoch),
295 )
296 } else {
297 trace!(
298 table_id = %self.upstream_table.table_id(),
299 snapshot_epoch,
300 barrier_epoch = ?first_recv_barrier_epoch,
301 "skip consuming snapshot"
302 );
303 (
304 first_recv_barrier_epoch,
305 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
306 )
307 };
308
309 match upstream_buffer {
311 Either::Left(mut upstream_buffer) => {
312 let initial_pending_lag =
313 if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
314 &upstream_buffer
315 {
316 Some(Duration::from_millis(
317 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
318 ))
319 } else {
320 None
321 };
322 trace!(
323 ?barrier_epoch,
324 table_id = %self.upstream_table.table_id(),
325 ?initial_pending_lag,
326 "start consuming log store"
327 );
328
329 let consuming_log_store_row_count = self
330 .metrics
331 .snapshot_backfill_consume_row_count
332 .with_guarded_label_values(&[
333 table_id_str.as_str(),
334 actor_id_str.as_str(),
335 "consuming_log_store",
336 ]);
337 let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
338 loop {
339 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
340 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
341 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
342 barrier_epoch = barrier.epoch;
357 if barrier.kind.is_checkpoint() {
358 let pending_non_checkpoint_barrier =
359 take(&mut pending_non_checkpoint_barrier);
360 let end_epoch = barrier_epoch.prev;
361 let start_epoch = pending_non_checkpoint_barrier
362 .first()
363 .map(|epoch| epoch.prev)
364 .unwrap_or(end_epoch);
365 trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
366 let mut stream = upstream_buffer
370 .run_future(make_log_stream(
371 &self.upstream_table,
372 start_epoch,
373 end_epoch,
374 None,
375 self.chunk_size,
376 ))
377 .await?;
378 while let Some(chunk) =
379 upstream_buffer.run_future(stream.try_next()).await?
380 {
381 trace!(
382 ?barrier_epoch,
383 size = chunk.cardinality(),
384 "consume change log yield chunk",
385 );
386 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
387 yield Message::Chunk(chunk);
388 }
389
390 trace!(?barrier_epoch, "after consume change log");
391
392 stream
393 .for_vnode_pk_progress(|vnode, row_count, progress| {
394 assert_eq!(progress, None);
395 backfill_state.finish_epoch(
396 vnode,
397 barrier.epoch.prev,
398 row_count,
399 );
400 })
401 .await?;
402 } else {
403 pending_non_checkpoint_barrier.push(barrier.epoch);
404 }
405
406 if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
407 &upstream_buffer
408 {
409 if is_finished {
410 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
411 assert!(pending_non_checkpoint_barrier.is_empty());
412 self.progress.finish_consuming_log_store(barrier.epoch);
413 } else {
414 self.progress.update_create_mview_log_store_progress(
415 barrier.epoch,
416 upstream_buffer.pending_epoch_lag(),
417 );
418 }
419 }
420
421 let post_commit = backfill_state.commit(barrier.epoch).await?;
422 let update_vnode_bitmap =
423 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
424 yield Message::Barrier(barrier);
425 post_commit.post_yield_barrier(None).await?;
426 if update_vnode_bitmap.is_some() {
427 return Err(anyhow!(
428 "should not update vnode bitmap during consuming log store"
429 )
430 .into());
431 }
432
433 if is_finished {
434 assert!(
435 pending_non_checkpoint_barrier.is_empty(),
436 "{pending_non_checkpoint_barrier:?}"
437 );
438 break;
439 }
440 }
441 trace!(
442 ?barrier_epoch,
443 table_id = %self.upstream_table.table_id(),
444 "finish consuming log store"
445 );
446
447 (
448 barrier_epoch,
449 false,
450 upstream_buffer.start_consuming_upstream(),
451 )
452 }
453 Either::Right(upstream) => {
454 trace!(
455 ?barrier_epoch,
456 table_id = %self.upstream_table.table_id(),
457 "skip consuming log store and start consuming upstream directly"
458 );
459
460 (barrier_epoch, true, upstream)
461 }
462 }
463 } else {
464 let (first_upstream_barrier, _) = upstream
465 .as_ref()
466 .expect("should have upstream when skipping snapshot backfill");
467 backfill_state
468 .latest_progress()
469 .for_each(|(vnode, progress)| {
470 let progress = progress.expect("should not be empty");
471 assert_eq!(
472 progress.epoch, first_upstream_barrier.epoch.prev,
473 "vnode: {:?}",
474 vnode
475 );
476 assert_eq!(
477 progress.progress,
478 EpochBackfillProgress::Consumed,
479 "vnode: {:?}",
480 vnode
481 );
482 });
483 trace!(
484 table_id = %self.upstream_table.table_id(),
485 "skip backfill"
486 );
487 let (first_upstream_barrier, upstream) =
488 upstream.expect("should have upstream when skipping snapshot backfill");
489 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
490 (first_upstream_barrier.epoch, true, upstream)
491 }
492 };
493 let current_stream_key_indices = self
494 .stream_key
495 .iter()
496 .map(|idx| self.output_indices[*idx])
497 .collect();
498 let update_normalizer = UpstreamStreamKeyUpdateNormalizer::new(
499 &upstream.info.stream_key,
500 current_stream_key_indices,
501 );
502 let mut upstream = upstream.into_executor(self.barrier_rx).execute();
503 let mut epoch_row_count = 0;
504 while let Some(msg) = upstream.try_next().await? {
506 let Some(msg) = update_normalizer.normalize_message(msg) else {
507 continue;
508 };
509 match msg {
510 Message::Barrier(barrier) => {
511 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
512 self.upstream_table
513 .vnodes()
514 .iter_vnodes()
515 .for_each(|vnode| {
516 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
519 });
520 epoch_row_count = 0;
521 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
522 barrier_epoch = barrier.epoch;
523 if need_report_finish {
524 need_report_finish = false;
525 self.progress.finish_consuming_log_store(barrier_epoch);
526 }
527 let post_commit = backfill_state.commit(barrier.epoch).await?;
528 yield Message::Barrier(barrier);
529 if let Some(new_vnode_bitmap) =
530 post_commit.post_yield_barrier(update_vnode_bitmap).await?
531 {
532 let _prev_vnode_bitmap =
533 self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
534 backfill_state
535 .latest_progress()
536 .for_each(|(vnode, progress)| {
537 let progress = progress.expect("should not be empty");
538 assert_eq!(
539 progress.epoch, barrier_epoch.prev,
540 "vnode {:?} has unexpected progress epoch",
541 vnode
542 );
543 assert_eq!(
544 progress.progress,
545 EpochBackfillProgress::Consumed,
546 "vnode {:?} has unexpected progress",
547 vnode
548 );
549 });
550 }
551 }
552 msg => {
553 if let Message::Chunk(chunk) = &msg {
554 epoch_row_count += chunk.cardinality();
555 }
556 yield msg;
557 }
558 }
559 }
560 }
561}
562
563impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
564 fn execute(self: Box<Self>) -> BoxedMessageStream {
565 let output_indices = self.output_indices.clone();
566 self.execute_inner()
567 .filter_map(move |result| {
568 ready({
569 match result {
570 Ok(message) => mapping_message(message, &output_indices).map(Ok),
571 Err(e) => Some(Err(e)),
572 }
573 })
574 })
575 .boxed()
576 }
577}
578
579struct ConsumingSnapshot;
580struct ConsumingLogStore;
581
582#[derive(Debug)]
583struct PendingBarriers {
584 first_upstream_barrier_epoch: EpochPair,
585
586 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
589
590 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
594}
595
596impl PendingBarriers {
597 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
598 Self {
599 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
600 pending_non_checkpoint_barriers: Default::default(),
601 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
602 first_upstream_barrier,
603 ])]),
604 }
605 }
606
607 fn add(&mut self, barrier: DispatcherBarrier) {
608 let is_checkpoint = barrier.kind.is_checkpoint();
609 self.pending_non_checkpoint_barriers.push_front(barrier);
610 if is_checkpoint {
611 self.checkpoint_barrier_groups
612 .push_front(take(&mut self.pending_non_checkpoint_barriers));
613 }
614 }
615
616 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
617 self.checkpoint_barrier_groups.pop_back()
618 }
619
620 fn consume_epoch(&mut self, epoch: EpochPair) {
621 let barriers = self
622 .checkpoint_barrier_groups
623 .back_mut()
624 .expect("non-empty");
625 let oldest_upstream_barrier = barriers.back().expect("non-empty");
626 assert!(
627 oldest_upstream_barrier.epoch.prev >= epoch.prev,
628 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
629 oldest_upstream_barrier.epoch,
630 epoch
631 );
632 if oldest_upstream_barrier.epoch.prev == epoch.prev {
633 assert_eq!(oldest_upstream_barrier.epoch, epoch);
634 barriers.pop_back();
635 if barriers.is_empty() {
636 self.checkpoint_barrier_groups.pop_back();
637 }
638 }
639 }
640
641 fn latest_epoch(&self) -> Option<EpochPair> {
642 self.pending_non_checkpoint_barriers
643 .front()
644 .or_else(|| {
645 self.checkpoint_barrier_groups
646 .front()
647 .and_then(|barriers| barriers.front())
648 })
649 .map(|barrier| barrier.epoch)
650 }
651
652 fn checkpoint_epoch_count(&self) -> usize {
653 self.checkpoint_barrier_groups.len()
654 }
655
656 fn has_checkpoint_epoch(&self) -> bool {
657 !self.checkpoint_barrier_groups.is_empty()
658 }
659}
660
661enum SnapshotBackfillUpstream<S> {
662 Empty,
663 Buffer(UpstreamBuffer<S>),
664}
665
666impl<S> SnapshotBackfillUpstream<S> {
667 async fn run_future<T, E: Into<StreamExecutorError>>(
668 &mut self,
669 future: impl Future<Output = Result<T, E>>,
670 ) -> StreamExecutorResult<T> {
671 match self {
672 SnapshotBackfillUpstream::Empty => future.await.map_err(Into::into),
673 SnapshotBackfillUpstream::Buffer(buffer) => buffer.run_future(future).await,
674 }
675 }
676}
677
678impl SnapshotBackfillUpstream<ConsumingSnapshot> {
679 fn start_consuming_log_store(
680 self,
681 consumed_epoch: u64,
682 ) -> Either<SnapshotBackfillUpstream<ConsumingLogStore>, MergeExecutorInput> {
683 match self {
684 SnapshotBackfillUpstream::Empty => Either::Left(SnapshotBackfillUpstream::Empty),
685 SnapshotBackfillUpstream::Buffer(buffer) => {
686 match buffer.start_consuming_log_store(consumed_epoch) {
687 Either::Left(buffer) => Either::Left(SnapshotBackfillUpstream::Buffer(buffer)),
688 Either::Right(input) => Either::Right(input),
689 }
690 }
691 }
692 }
693}
694
695impl SnapshotBackfillUpstream<ConsumingLogStore> {
696 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
697 match self {
698 SnapshotBackfillUpstream::Empty => Ok(false),
699 SnapshotBackfillUpstream::Buffer(buffer) => buffer.consumed_epoch(epoch).await,
700 }
701 }
702
703 fn start_consuming_upstream(self) -> MergeExecutorInput {
704 match self {
705 SnapshotBackfillUpstream::Empty => {
706 unreachable!("unlike to start consuming upstream when having no upstream")
707 }
708 SnapshotBackfillUpstream::Buffer(buffer) => buffer.start_consuming_upstream(),
709 }
710 }
711}
712
713struct UpstreamBuffer<S> {
714 upstream: MergeExecutorInput,
715 max_pending_epoch_lag: u64,
716 consumed_epoch: u64,
717 upstream_pending_barriers: PendingBarriers,
719 is_polling_epoch_data: bool,
724 consume_upstream_row_count: LabelGuardedIntCounter,
725 _phase: S,
726}
727
728impl UpstreamBuffer<ConsumingSnapshot> {
729 fn new(
730 upstream: MergeExecutorInput,
731 first_upstream_barrier: DispatcherBarrier,
732 consume_upstream_row_count: LabelGuardedIntCounter,
733 ) -> Self {
734 Self {
735 upstream,
736 is_polling_epoch_data: false,
737 consume_upstream_row_count,
738 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
739 max_pending_epoch_lag: u64::MAX,
741 consumed_epoch: 0,
742 _phase: ConsumingSnapshot {},
743 }
744 }
745
746 fn start_consuming_log_store(
747 mut self,
748 consumed_epoch: u64,
749 ) -> Either<UpstreamBuffer<ConsumingLogStore>, MergeExecutorInput> {
750 if self
751 .upstream_pending_barriers
752 .first_upstream_barrier_epoch
753 .prev
754 == consumed_epoch
755 {
756 assert_eq!(
757 1,
758 self.upstream_pending_barriers
759 .pop()
760 .expect("non-empty")
761 .len()
762 );
763 }
764 let max_pending_epoch_lag = self.pending_epoch_lag();
765 let buffer = UpstreamBuffer {
766 upstream: self.upstream,
767 upstream_pending_barriers: self.upstream_pending_barriers,
768 max_pending_epoch_lag,
769 is_polling_epoch_data: self.is_polling_epoch_data,
770 consume_upstream_row_count: self.consume_upstream_row_count,
771 consumed_epoch,
772 _phase: ConsumingLogStore {},
773 };
774 if buffer.is_finished() {
775 Either::Right(buffer.upstream)
776 } else {
777 Either::Left(buffer)
778 }
779 }
780}
781
782impl<S> UpstreamBuffer<S> {
783 fn can_consume_upstream(&self) -> bool {
784 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
785 }
786
787 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
788 {
789 loop {
790 if let Err(e) = try {
791 if !self.can_consume_upstream() {
792 sleep(Duration::from_secs(30)).await;
794 warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
795 return pending().await;
796 }
797 self.consume_until_next_checkpoint_barrier().await?;
798 } {
799 break e;
800 }
801 }
802 }
803 }
804
805 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
807 loop {
808 let msg: DispatcherMessage = self
809 .upstream
810 .try_next()
811 .await?
812 .ok_or_else(|| anyhow!("end of upstream"))?;
813 match msg {
814 DispatcherMessage::Chunk(chunk) => {
815 self.is_polling_epoch_data = true;
816 self.consume_upstream_row_count
817 .inc_by(chunk.cardinality() as _);
818 }
819 DispatcherMessage::Barrier(barrier) => {
820 let is_checkpoint = barrier.kind.is_checkpoint();
821 self.upstream_pending_barriers.add(barrier);
822 if is_checkpoint {
823 self.is_polling_epoch_data = false;
824 break;
825 } else {
826 self.is_polling_epoch_data = true;
827 }
828 }
829 DispatcherMessage::Watermark(_) => {
830 self.is_polling_epoch_data = true;
831 }
832 }
833 }
834 Ok(())
835 }
836}
837
838impl UpstreamBuffer<ConsumingLogStore> {
839 #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
840 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
841 assert!(!self.is_finished());
842 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
843 assert!(self.is_polling_epoch_data);
846 self.consume_until_next_checkpoint_barrier().await?;
847 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
848 }
849 self.upstream_pending_barriers.consume_epoch(epoch);
850
851 {
852 {
853 let prev_epoch = epoch.prev;
854 assert!(self.consumed_epoch < prev_epoch);
855 let elapsed_epoch = prev_epoch - self.consumed_epoch;
856 self.consumed_epoch = prev_epoch;
857 if self.upstream_pending_barriers.has_checkpoint_epoch() {
858 while self.can_consume_upstream()
860 && let Some(result) =
861 self.consume_until_next_checkpoint_barrier().now_or_never()
862 {
863 result?;
864 }
865 }
866 self.max_pending_epoch_lag = min(
870 self.pending_epoch_lag(),
871 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
872 );
873 }
874 }
875 Ok(self.is_finished())
876 }
877
878 fn is_finished(&self) -> bool {
879 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
880 assert!(
881 self.upstream_pending_barriers
882 .pending_non_checkpoint_barriers
883 .is_empty()
884 )
885 }
886 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
887 }
888
889 fn start_consuming_upstream(self) -> MergeExecutorInput {
890 assert!(self.is_finished());
891 assert_eq!(self.pending_epoch_lag(), 0);
892 self.upstream
893 }
894}
895
896impl<S> UpstreamBuffer<S> {
897 async fn run_future<T, E: Into<StreamExecutorError>>(
900 &mut self,
901 future: impl Future<Output = Result<T, E>>,
902 ) -> StreamExecutorResult<T> {
903 select! {
904 biased;
905 e = self.concurrently_consume_upstream() => {
906 Err(e)
907 }
908 result = future => {
910 result.map_err(Into::into)
911 }
912 }
913 }
914
915 fn pending_epoch_lag(&self) -> u64 {
916 self.upstream_pending_barriers
917 .latest_epoch()
918 .map(|epoch| {
919 epoch
920 .prev
921 .checked_sub(self.consumed_epoch)
922 .expect("pending epoch must be later than consumed_epoch")
923 })
924 .unwrap_or(0)
925 }
926}
927
928#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
929async fn make_log_stream(
930 upstream_table: &BatchTable<impl StateStore>,
931 start_epoch: u64,
932 end_epoch: u64,
933 start_pk: Option<OwnedRow>,
934 chunk_size: usize,
935) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
936 let data_types = upstream_table.schema().data_types();
937 let start_pk = start_pk.as_ref();
938 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
940 upstream_table
941 .batch_iter_vnode_log(
942 start_epoch,
943 HummockReadEpoch::Committed(end_epoch),
944 start_pk,
945 vnode,
946 )
947 .map_ok(move |stream| {
948 let stream = stream.map_err(Into::into);
949 (vnode, stream, 0)
950 })
951 }))
952 .await?;
953 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
954 Ok(VnodeStream::new(
955 vnode_streams,
956 upstream_table.pk_in_output_indices().expect("should exist"),
957 builder,
958 ))
959}
960
961async fn make_snapshot_stream(
962 upstream_table: &BatchTable<impl StateStore>,
963 snapshot_epoch: u64,
964 backfill_state: &BackfillState<impl StateStore>,
965 rate_limit: RateLimit,
966 chunk_size: usize,
967 snapshot_rebuild_interval: Duration,
968 pk_scan_range: &PkScanRange,
969) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
970 let data_types = upstream_table.schema().data_types();
971 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
972 move |(vnode, progress)| {
973 let start_pk = match progress {
974 None => Some((None, 0)),
975 Some(VnodeBackfillProgress {
976 row_count,
977 progress: EpochBackfillProgress::Consuming { latest_pk },
978 ..
979 }) => Some((Some(latest_pk), *row_count)),
980 Some(VnodeBackfillProgress {
981 progress: EpochBackfillProgress::Consumed,
982 ..
983 }) => None,
984 };
985 start_pk.map(|(start_pk, row_count)| {
986 upstream_table
987 .batch_iter_vnode_with_pk_range(
988 HummockReadEpoch::Committed(snapshot_epoch),
989 start_pk,
990 &pk_scan_range.pk_prefix,
991 &pk_scan_range.range_bounds,
992 vnode,
993 PrefetchOptions::prefetch_for_large_range_scan(),
994 snapshot_rebuild_interval,
995 )
996 .map_ok(move |stream| {
997 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
998 (vnode, stream, row_count)
999 })
1000 })
1001 },
1002 ))
1003 .await?;
1004 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
1005 Ok(VnodeStream::new(
1006 vnode_streams,
1007 upstream_table.pk_in_output_indices().expect("should exist"),
1008 builder,
1009 ))
1010}
1011
1012#[expect(clippy::too_many_arguments)]
1013#[try_stream(ok = Message, error = StreamExecutorError)]
1014async fn make_consume_snapshot_stream<'a, S: StateStore>(
1015 upstream_table: &'a BatchTable<S>,
1016 snapshot_epoch: u64,
1017 chunk_size: usize,
1018 rate_limit: &'a mut RateLimit,
1019 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
1020 progress: &'a mut CreateMviewProgressReporter,
1021 backfill_state: &'a mut BackfillState<S>,
1022 first_recv_barrier_epoch: EpochPair,
1023 initial_backfill_paused: bool,
1024 actor_ctx: &'a ActorContextRef,
1025 pk_scan_range: &'a PkScanRange,
1026) {
1027 let mut barrier_epoch = first_recv_barrier_epoch;
1028
1029 let mut snapshot_stream = make_snapshot_stream(
1031 upstream_table,
1032 snapshot_epoch,
1033 &*backfill_state,
1034 *rate_limit,
1035 chunk_size,
1036 actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
1037 pk_scan_range,
1038 )
1039 .await?;
1040
1041 async fn select_barrier_and_snapshot_stream(
1042 barrier_rx: &mut UnboundedReceiver<Barrier>,
1043 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
1044 throttle_snapshot_stream: bool,
1045 backfill_paused: bool,
1046 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
1047 select!(
1048 result = receive_next_barrier(barrier_rx) => {
1049 Ok(Either::Left(result?))
1050 },
1051 result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
1052 Ok(Either::Right(result?))
1053 }
1054 )
1055 }
1056
1057 let mut epoch_row_count = 0;
1058 let mut backfill_paused = initial_backfill_paused;
1059 loop {
1060 let throttle_snapshot_stream = epoch_row_count as u64 >= rate_limit.to_u64();
1061 match select_barrier_and_snapshot_stream(
1062 barrier_rx,
1063 &mut snapshot_stream,
1064 throttle_snapshot_stream,
1065 backfill_paused,
1066 )
1067 .await?
1068 {
1069 Either::Left(barrier) => {
1070 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1071 barrier_epoch = barrier.epoch;
1072
1073 if barrier_epoch.curr >= snapshot_epoch {
1074 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
1075 }
1076 if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
1077 backfill_paused = false;
1078 }
1079 if let Some(chunk) = snapshot_stream.consume_builder() {
1080 epoch_row_count += chunk.cardinality();
1081 yield Message::Chunk(chunk);
1082 }
1083 snapshot_stream
1084 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1085 if let Some(pk) = pk_progress {
1086 backfill_state.update_epoch_progress(
1087 vnode,
1088 snapshot_epoch,
1089 row_count,
1090 pk,
1091 );
1092 } else {
1093 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1094 }
1095 })
1096 .await?;
1097 let count = backfill_state.total_row_count();
1098 let post_commit = backfill_state.commit(barrier.epoch).await?;
1099 trace!(?barrier_epoch, count, epoch_row_count, "update progress");
1100 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
1101 epoch_row_count = 0;
1102
1103 let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
1104 if let Mutation::Throttle(config) = &**m
1105 && let Some(config) = config.get(&actor_ctx.fragment_id)
1106 && config.throttle_type() == PbThrottleType::Backfill
1107 {
1108 Some(config.rate_limit)
1109 } else {
1110 None
1111 }
1112 });
1113 yield Message::Barrier(barrier);
1114 post_commit.post_yield_barrier(None).await?;
1115
1116 if let Some(new_rate_limit) = new_rate_limit {
1117 let new_rate_limit = new_rate_limit.into();
1118 *rate_limit = new_rate_limit;
1119 snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
1120 }
1121 }
1122 Either::Right(Some(chunk)) => {
1123 if backfill_paused {
1124 return Err(
1125 anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
1126 );
1127 }
1128 epoch_row_count += chunk.cardinality();
1129 yield Message::Chunk(chunk);
1130 }
1131 Either::Right(None) => {
1132 break;
1133 }
1134 }
1135 }
1136
1137 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
1139 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
1140 barrier_epoch = barrier_to_report_finish.epoch;
1141 snapshot_stream
1142 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1143 assert_eq!(pk_progress, None);
1144 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1145 })
1146 .await?;
1147 let count = backfill_state.total_row_count();
1148 trace!(?barrier_epoch, count, "report finish");
1149 let post_commit = backfill_state.commit(barrier_epoch).await?;
1150 progress.finish(barrier_epoch, count as _);
1151 yield Message::Barrier(barrier_to_report_finish);
1152 post_commit.post_yield_barrier(None).await?;
1153
1154 loop {
1156 let barrier = receive_next_barrier(barrier_rx).await?;
1157 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1158 barrier_epoch = barrier.epoch;
1159 let post_commit = backfill_state.commit(barrier.epoch).await?;
1160 yield Message::Barrier(barrier);
1161 post_commit.post_yield_barrier(None).await?;
1162 if barrier_epoch.curr == snapshot_epoch {
1163 break;
1164 }
1165 }
1166 trace!(?barrier_epoch, "finish consuming snapshot");
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use std::collections::HashSet;
1172 use std::sync::Arc;
1173
1174 use risingwave_common::array::StreamChunk;
1175 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1176 use risingwave_common::row::OwnedRow;
1177 use risingwave_common::test_prelude::StreamChunkTestExt;
1178 use risingwave_common::types::DataType;
1179 use risingwave_common::util::epoch::{EpochPair, test_epoch};
1180 use risingwave_common::util::sort_util::OrderType;
1181 use risingwave_hummock_test::test_utils::{HummockTestEnv, prepare_hummock_test_env};
1182 use risingwave_rpc_client::HummockMetaClient;
1183 use risingwave_storage::hummock::HummockStorage;
1184 use risingwave_storage::table::batch_table::BatchTable;
1185 use tokio::sync::mpsc::unbounded_channel;
1186 use tokio::time::{Duration, timeout};
1187
1188 use super::*;
1189 use crate::common::table::state_table::{
1190 StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
1191 };
1192 use crate::common::table::test_utils::gen_pbtable_with_value_indices;
1193 use crate::executor::exchange::input::{Input, LocalInput};
1194 use crate::executor::exchange::permit::channel_for_test;
1195 use crate::executor::{ActorContext, DispatcherMessage, ExecutorInfo, MergeExecutorUpstream};
1196 use crate::task::LocalBarrierManager;
1197
1198 const SOURCE_TABLE_ID: TableId = TableId::new(0x233);
1199 const PROGRESS_TABLE_ID: TableId = TableId::new(0x234);
1200
1201 fn source_table_pb() -> risingwave_pb::catalog::PbTable {
1202 gen_pbtable_with_value_indices(
1203 SOURCE_TABLE_ID,
1204 vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1205 vec![OrderType::ascending()],
1206 vec![0],
1207 0,
1208 vec![0],
1209 )
1210 }
1211
1212 fn progress_table_pb() -> risingwave_pb::catalog::PbTable {
1213 gen_pbtable_with_value_indices(
1214 PROGRESS_TABLE_ID,
1215 vec![
1216 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int16),
1217 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1218 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1219 ColumnDesc::unnamed(ColumnId::new(3), DataType::Boolean),
1220 ColumnDesc::unnamed(ColumnId::new(4), DataType::Int64),
1221 ],
1222 vec![OrderType::ascending()],
1223 vec![0],
1224 1,
1225 vec![1, 2, 3, 4],
1226 )
1227 }
1228
1229 fn source_batch_table(store: HummockStorage) -> BatchTable<HummockStorage> {
1230 BatchTable::for_test(
1231 store,
1232 SOURCE_TABLE_ID,
1233 vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1234 vec![OrderType::ascending()],
1235 vec![0],
1236 vec![0],
1237 )
1238 }
1239
1240 async fn source_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1241 StateTableBuilder::new(&source_table_pb(), store, None)
1242 .with_op_consistency_level(StateTableOpConsistencyLevel::LogStoreEnabled)
1243 .forbid_preload_all_rows()
1244 .build()
1245 .await
1246 }
1247
1248 async fn progress_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1249 StateTable::from_table_catalog(&progress_table_pb(), store, None).await
1250 }
1251
1252 async fn commit_insert_epoch(
1253 test_env: &HummockTestEnv,
1254 source_state_table: &mut StateTable<HummockStorage>,
1255 epoch: &mut EpochPair,
1256 table_ids: HashSet<TableId>,
1257 values: &[i64],
1258 ) {
1259 for value in values {
1260 source_state_table.insert(OwnedRow::new(vec![Some((*value).into())]));
1261 }
1262 epoch.inc_for_test();
1263 test_env.storage.start_epoch(epoch.curr, table_ids);
1264 source_state_table.commit_for_test(*epoch).await.unwrap();
1265 let res = test_env
1266 .storage
1267 .seal_and_sync_epoch(epoch.prev, HashSet::from_iter([SOURCE_TABLE_ID]))
1268 .await
1269 .unwrap();
1270 test_env
1271 .meta_client
1272 .commit_epoch_with_change_log(epoch.prev, res, Some(vec![epoch.prev]))
1273 .await
1274 .unwrap();
1275 test_env
1276 .storage
1277 .wait_version(test_env.manager.get_current_version().await)
1278 .await;
1279 }
1280
1281 fn start_progress_epochs(test_env: &HummockTestEnv, max_epoch: u64) {
1282 for epoch in 1..=max_epoch {
1283 test_env
1284 .storage
1285 .start_epoch(test_epoch(epoch), HashSet::from_iter([PROGRESS_TABLE_ID]));
1286 }
1287 }
1288
1289 fn make_upstream_input(
1290 barrier_manager: LocalBarrierManager,
1291 actor_ctx: ActorContextRef,
1292 rx: crate::executor::exchange::permit::Receiver,
1293 ) -> MergeExecutorInput {
1294 MergeExecutorInput::new(
1295 MergeExecutorUpstream::Singleton(LocalInput::new(rx, 1001.into()).boxed_input()),
1296 actor_ctx,
1297 1919.into(),
1298 barrier_manager,
1299 Arc::new(StreamingMetrics::unused()),
1300 ExecutorInfo::for_test(
1301 Schema::new(vec![Field::unnamed(DataType::Int64)]),
1302 vec![0],
1303 "SnapshotBackfillUpstream".to_owned(),
1304 0,
1305 ),
1306 )
1307 }
1308
1309 async fn expect_barrier_with_timeout(
1310 executor: &mut BoxedMessageStream,
1311 reason: &str,
1312 ) -> Barrier {
1313 let message = timeout(Duration::from_secs(10), executor.next())
1314 .await
1315 .unwrap_or_else(|_| panic!("timed out waiting for barrier: {reason}"))
1316 .unwrap()
1317 .unwrap();
1318 match message {
1319 Message::Barrier(barrier) => barrier,
1320 other => panic!("expected barrier for {reason}, got {other:?}"),
1321 }
1322 }
1323
1324 async fn expect_chunk_with_timeout(
1325 executor: &mut BoxedMessageStream,
1326 reason: &str,
1327 ) -> StreamChunk {
1328 let message = timeout(Duration::from_secs(10), executor.next())
1329 .await
1330 .unwrap_or_else(|_| panic!("timed out waiting for chunk: {reason}"))
1331 .unwrap()
1332 .unwrap();
1333 match message {
1334 Message::Chunk(chunk) => chunk,
1335 other => panic!("expected chunk for {reason}, got {other:?}"),
1336 }
1337 }
1338
1339 async fn expect_pending_with_timeout(executor: &mut BoxedMessageStream, reason: &str) {
1340 assert!(
1341 timeout(Duration::from_millis(200), executor.next())
1342 .await
1343 .is_err(),
1344 "executor unexpectedly produced a message while waiting for {reason}"
1345 );
1346 }
1347
1348 #[tokio::test]
1349 async fn test_snapshot_backfill_without_upstream_on_hummock() {
1350 let source_env = prepare_hummock_test_env().await;
1351 source_env.register_table(source_table_pb()).await;
1352 let progress_env = prepare_hummock_test_env().await;
1353 progress_env.register_table(progress_table_pb()).await;
1354
1355 let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1356 let source_table = source_batch_table(source_env.storage.clone());
1357 let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1358
1359 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1360 source_env
1361 .storage
1362 .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1363 source_state_table.init_epoch(epoch).await.unwrap();
1364
1365 commit_insert_epoch(
1366 &source_env,
1367 &mut source_state_table,
1368 &mut epoch,
1369 HashSet::from_iter([SOURCE_TABLE_ID]),
1370 &[1],
1371 )
1372 .await;
1373 commit_insert_epoch(
1374 &source_env,
1375 &mut source_state_table,
1376 &mut epoch,
1377 HashSet::from_iter([SOURCE_TABLE_ID]),
1378 &[2],
1379 )
1380 .await;
1381 commit_insert_epoch(
1382 &source_env,
1383 &mut source_state_table,
1384 &mut epoch,
1385 HashSet::from_iter([SOURCE_TABLE_ID]),
1386 &[3],
1387 )
1388 .await;
1389 commit_insert_epoch(
1390 &source_env,
1391 &mut source_state_table,
1392 &mut epoch,
1393 HashSet::from_iter([SOURCE_TABLE_ID]),
1394 &[],
1395 )
1396 .await;
1397 start_progress_epochs(&progress_env, 5);
1398
1399 let barrier_manager = LocalBarrierManager::for_test();
1400 let progress = CreateMviewProgressReporter::for_test(barrier_manager);
1401 let actor_ctx = ActorContext::for_test(1234);
1402 let (barrier_tx, barrier_rx) = unbounded_channel();
1403 barrier_tx
1404 .send(Barrier::new_test_barrier(test_epoch(1)))
1405 .unwrap();
1406
1407 let mut executor = SnapshotBackfillExecutor::new(
1408 source_table,
1409 progress_state_table,
1410 None,
1411 None,
1412 vec![0],
1413 vec![0],
1414 actor_ctx,
1415 progress,
1416 1024,
1417 RateLimit::Disabled,
1418 barrier_rx,
1419 Arc::new(StreamingMetrics::unused()),
1420 Some(test_epoch(3)),
1421 )
1422 .expect("snapshot backfill executor should be created")
1423 .boxed()
1424 .execute();
1425
1426 assert_eq!(
1427 expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1428 .await
1429 .epoch,
1430 Barrier::new_test_barrier(test_epoch(1)).epoch
1431 );
1432 assert_eq!(
1433 expect_chunk_with_timeout(&mut executor, "snapshot chunk without upstream").await,
1434 StreamChunk::from_pretty(
1435 " I
1436 + 1
1437 + 2
1438 + 3"
1439 )
1440 );
1441 expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1442
1443 barrier_tx
1444 .send(Barrier::new_test_barrier(test_epoch(2)))
1445 .unwrap();
1446 assert_eq!(
1447 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1448 .await
1449 .epoch,
1450 Barrier::new_test_barrier(test_epoch(2)).epoch
1451 );
1452
1453 barrier_tx
1454 .send(Barrier::new_test_barrier(test_epoch(3)))
1455 .unwrap();
1456 assert_eq!(
1457 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1458 .await
1459 .epoch,
1460 Barrier::new_test_barrier(test_epoch(3)).epoch
1461 );
1462
1463 barrier_tx
1464 .send(Barrier::new_test_barrier(test_epoch(4)))
1465 .unwrap();
1466 assert_eq!(
1467 expect_barrier_with_timeout(&mut executor, "post-snapshot barrier 4")
1468 .await
1469 .epoch,
1470 Barrier::new_test_barrier(test_epoch(4)).epoch
1471 );
1472
1473 barrier_tx
1474 .send(Barrier::new_test_barrier(test_epoch(5)))
1475 .unwrap();
1476 assert_eq!(
1477 expect_barrier_with_timeout(&mut executor, "steady-state barrier 5")
1478 .await
1479 .epoch,
1480 Barrier::new_test_barrier(test_epoch(5)).epoch
1481 );
1482
1483 expect_pending_with_timeout(&mut executor, "next local barrier").await;
1484 }
1485
1486 #[tokio::test]
1487 async fn test_snapshot_backfill_with_upstream_on_hummock() {
1488 let source_env = prepare_hummock_test_env().await;
1489 source_env.register_table(source_table_pb()).await;
1490 let progress_env = prepare_hummock_test_env().await;
1491 progress_env.register_table(progress_table_pb()).await;
1492
1493 let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1494 let source_table = source_batch_table(source_env.storage.clone());
1495 let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1496
1497 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1498 source_env
1499 .storage
1500 .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1501 source_state_table.init_epoch(epoch).await.unwrap();
1502
1503 commit_insert_epoch(
1504 &source_env,
1505 &mut source_state_table,
1506 &mut epoch,
1507 HashSet::from_iter([SOURCE_TABLE_ID]),
1508 &[],
1509 )
1510 .await;
1511 commit_insert_epoch(
1512 &source_env,
1513 &mut source_state_table,
1514 &mut epoch,
1515 HashSet::from_iter([SOURCE_TABLE_ID]),
1516 &[],
1517 )
1518 .await;
1519 commit_insert_epoch(
1520 &source_env,
1521 &mut source_state_table,
1522 &mut epoch,
1523 HashSet::from_iter([SOURCE_TABLE_ID]),
1524 &[],
1525 )
1526 .await;
1527 commit_insert_epoch(
1528 &source_env,
1529 &mut source_state_table,
1530 &mut epoch,
1531 HashSet::from_iter([SOURCE_TABLE_ID]),
1532 &[4],
1533 )
1534 .await;
1535 start_progress_epochs(&progress_env, 6);
1536
1537 let barrier_manager = LocalBarrierManager::for_test();
1538 let progress = CreateMviewProgressReporter::for_test(barrier_manager.clone());
1539 let actor_ctx = ActorContext::for_test(1235);
1540 let (barrier_tx, barrier_rx) = unbounded_channel();
1541 let (upstream_tx, upstream_rx) = channel_for_test();
1542
1543 upstream_tx
1544 .send(
1545 DispatcherMessage::Barrier(
1546 Barrier::new_test_barrier(test_epoch(5)).into_dispatcher(),
1547 )
1548 .into(),
1549 )
1550 .await
1551 .unwrap();
1552 barrier_tx
1553 .send(Barrier::new_test_barrier(test_epoch(1)))
1554 .unwrap();
1555
1556 let mut executor = SnapshotBackfillExecutor::new(
1557 source_table,
1558 progress_state_table,
1559 Some(make_upstream_input(
1560 barrier_manager,
1561 actor_ctx.clone(),
1562 upstream_rx,
1563 )),
1564 None,
1565 vec![0],
1566 vec![0],
1567 actor_ctx,
1568 progress,
1569 1024,
1570 RateLimit::Disabled,
1571 barrier_rx,
1572 Arc::new(StreamingMetrics::unused()),
1573 Some(test_epoch(3)),
1574 )
1575 .expect("snapshot backfill executor should be created")
1576 .boxed()
1577 .execute();
1578
1579 assert_eq!(
1580 expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1581 .await
1582 .epoch,
1583 Barrier::new_test_barrier(test_epoch(1)).epoch
1584 );
1585 expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1586 barrier_tx
1587 .send(Barrier::new_test_barrier(test_epoch(2)))
1588 .unwrap();
1589 assert_eq!(
1590 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1591 .await
1592 .epoch,
1593 Barrier::new_test_barrier(test_epoch(2)).epoch
1594 );
1595
1596 barrier_tx
1597 .send(Barrier::new_test_barrier(test_epoch(3)))
1598 .unwrap();
1599 assert_eq!(
1600 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1601 .await
1602 .epoch,
1603 Barrier::new_test_barrier(test_epoch(3)).epoch
1604 );
1605
1606 barrier_tx
1607 .send(Barrier::new_test_barrier(test_epoch(4)))
1608 .unwrap();
1609 assert_eq!(
1610 expect_barrier_with_timeout(&mut executor, "snapshot completion barrier 4")
1611 .await
1612 .epoch,
1613 Barrier::new_test_barrier(test_epoch(4)).epoch
1614 );
1615
1616 barrier_tx
1617 .send(Barrier::new_test_barrier(test_epoch(5)))
1618 .unwrap();
1619 assert_eq!(
1620 expect_chunk_with_timeout(&mut executor, "log-store replay chunk").await,
1621 StreamChunk::from_pretty(
1622 " I
1623 + 4"
1624 )
1625 );
1626 assert_eq!(
1627 expect_barrier_with_timeout(&mut executor, "log-store completion barrier")
1628 .await
1629 .epoch,
1630 Barrier::new_test_barrier(test_epoch(5)).epoch
1631 );
1632
1633 upstream_tx
1634 .send(DispatcherMessage::Chunk(StreamChunk::from_pretty(" I\n + 5")).into())
1635 .await
1636 .unwrap();
1637 let stop_barrier = Barrier::new_test_barrier(test_epoch(6)).with_stop();
1638 upstream_tx
1639 .send(DispatcherMessage::Barrier(stop_barrier.clone().into_dispatcher()).into())
1640 .await
1641 .unwrap();
1642 barrier_tx.send(stop_barrier.clone()).unwrap();
1643
1644 assert_eq!(
1645 expect_chunk_with_timeout(&mut executor, "live upstream chunk after handoff").await,
1646 StreamChunk::from_pretty(" I\n + 5")
1647 );
1648 assert_eq!(
1649 expect_barrier_with_timeout(&mut executor, "final stop barrier")
1650 .await
1651 .epoch,
1652 stop_barrier.epoch
1653 );
1654 }
1655}