1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::batch_plan::ScanRange;
33use risingwave_pb::common::PbThrottleType;
34use risingwave_storage::StateStore;
35use risingwave_storage::store::PrefetchOptions;
36use risingwave_storage::table::ChangeLogRow;
37use risingwave_storage::table::batch_table::{BatchTable, PkScanRange};
38use tokio::select;
39use tokio::sync::mpsc::UnboundedReceiver;
40use tokio::time::sleep;
41
42use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
43use crate::executor::backfill::snapshot_backfill::state::{
44 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
45};
46use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
47use crate::executor::backfill::utils::{
48 UpstreamStreamKeyUpdateNormalizer, create_builder, mapping_message,
49};
50use crate::executor::monitor::StreamingMetrics;
51use crate::executor::prelude::{StateTable, StreamExt, try_stream};
52use crate::executor::{
53 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
54 MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
55 expect_first_barrier,
56};
57use crate::task::CreateMviewProgressReporter;
58
59pub struct SnapshotBackfillExecutor<S: StateStore> {
60 upstream_table: BatchTable<S>,
62
63 progress_state_table: StateTable<S>,
65
66 upstream: Option<MergeExecutorInput>,
68
69 output_indices: Vec<usize>,
71
72 stream_key: Vec<usize>,
74
75 progress: CreateMviewProgressReporter,
76
77 chunk_size: usize,
78 rate_limit: RateLimit,
79
80 barrier_rx: UnboundedReceiver<Barrier>,
81
82 actor_ctx: ActorContextRef,
83 metrics: Arc<StreamingMetrics>,
84
85 snapshot_epoch: Option<u64>,
86 pk_scan_range: PkScanRange,
88}
89
90impl<S: StateStore> SnapshotBackfillExecutor<S> {
91 fn build_pk_scan_range(
92 pb_scan_range: Option<&ScanRange>,
93 upstream_table: &BatchTable<S>,
94 ) -> StreamExecutorResult<PkScanRange> {
95 match pb_scan_range {
96 Some(scan_range) => Ok(PkScanRange::new(
97 scan_range.clone(),
98 upstream_table.pk_serializer().get_data_types().to_vec(),
99 )?),
100 None => Ok(PkScanRange::full()),
101 }
102 }
103
104 #[expect(clippy::too_many_arguments)]
105 pub(crate) fn new(
106 upstream_table: BatchTable<S>,
107 progress_state_table: StateTable<S>,
108 upstream: Option<MergeExecutorInput>,
109 pb_pk_scan_range: Option<&ScanRange>,
110 output_indices: Vec<usize>,
111 stream_key: Vec<usize>,
112 actor_ctx: ActorContextRef,
113 progress: CreateMviewProgressReporter,
114 chunk_size: usize,
115 rate_limit: RateLimit,
116 barrier_rx: UnboundedReceiver<Barrier>,
117 metrics: Arc<StreamingMetrics>,
118 snapshot_epoch: Option<u64>,
119 ) -> StreamExecutorResult<Self> {
120 if let Some(upstream) = &upstream {
121 assert_eq!(&upstream.info.schema, upstream_table.schema());
122 }
123 if upstream_table.pk_in_output_indices().is_none() {
124 panic!(
125 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
126 upstream_table.pk_indices(),
127 upstream_table.output_indices(),
128 upstream_table.schema()
129 )
130 };
131 assert!(
132 stream_key.iter().all(|idx| *idx < output_indices.len()),
133 "stream key indices should refer to output schema: stream_key: {:?}, output_indices: {:?}",
134 stream_key,
135 output_indices
136 );
137 let pk_scan_range = Self::build_pk_scan_range(pb_pk_scan_range, &upstream_table)?;
138 if !matches!(rate_limit, RateLimit::Disabled) {
139 trace!(
140 ?rate_limit,
141 "create snapshot backfill executor with rate limit"
142 );
143 }
144 Ok(Self {
145 upstream_table,
146 progress_state_table,
147 upstream,
148 output_indices,
149 stream_key,
150 progress,
151 chunk_size,
152 rate_limit,
153 barrier_rx,
154 actor_ctx,
155 metrics,
156 snapshot_epoch,
157 pk_scan_range,
158 })
159 }
160
161 #[try_stream(ok = Message, error = StreamExecutorError)]
162 async fn execute_inner(mut self) {
163 trace!("snapshot backfill executor start");
164 let upstream = if let Some(mut upstream) = self.upstream {
165 let first_upstream_barrier = expect_first_barrier(&mut upstream).await?;
166 trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
167 Some((first_upstream_barrier, upstream))
168 } else {
169 None
170 };
171 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
172 trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
173 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
174 self.snapshot_epoch
175 {
176 if let Some((first_upstream_barrier, _)) = &upstream {
177 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
178 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
179 Some(snapshot_epoch)
180 } else {
181 None
182 }
183 } else {
184 Some(snapshot_epoch)
186 }
187 } else {
188 if cfg!(debug_assertions) {
190 panic!(
191 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
192 upstream.map(|(first_upstream_barrier, _)| first_upstream_barrier.epoch),
193 first_recv_barrier.epoch
194 );
195 } else {
196 let (first_upstream_barrier, _) = upstream
197 .as_ref()
198 .ok_or_else(|| anyhow!("no upstream while snapshot epoch not set"))?;
199 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
200 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
201 None
202 }
203 };
204 let first_recv_barrier_epoch = first_recv_barrier.epoch;
205 let initial_backfill_paused =
206 first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
207 yield Message::Barrier(first_recv_barrier);
208 let mut backfill_state = BackfillState::new(
209 self.progress_state_table,
210 first_recv_barrier_epoch,
211 self.upstream_table.pk_serializer().clone(),
212 )
213 .await?;
214
215 let (mut barrier_epoch, mut need_report_finish, upstream) = {
216 if let Some(snapshot_epoch) = should_snapshot_backfill {
217 let table_id_str = format!("{}", self.upstream_table.table_id());
218 let actor_id_str = format!("{}", self.actor_ctx.id);
219
220 let consume_upstream_row_count = self
221 .metrics
222 .snapshot_backfill_consume_row_count
223 .with_guarded_label_values(&[
224 table_id_str.as_str(),
225 actor_id_str.as_str(),
226 "consume_upstream",
227 ]);
228
229 let mut upstream_buffer = if let Some((first_upstream_barrier, upstream)) = upstream
230 {
231 SnapshotBackfillUpstream::Buffer(UpstreamBuffer::new(
232 upstream,
233 first_upstream_barrier,
234 consume_upstream_row_count,
235 ))
236 } else {
237 SnapshotBackfillUpstream::Empty
238 };
239
240 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
242 < snapshot_epoch
243 {
244 trace!(
245 table_id = %self.upstream_table.table_id(),
246 snapshot_epoch,
247 barrier_epoch = ?first_recv_barrier_epoch,
248 "start consuming snapshot"
249 );
250 {
251 let consuming_snapshot_row_count = self
252 .metrics
253 .snapshot_backfill_consume_row_count
254 .with_guarded_label_values(&[
255 table_id_str.as_str(),
256 actor_id_str.as_str(),
257 "consuming_snapshot",
258 ]);
259 let snapshot_stream = make_consume_snapshot_stream(
260 &self.upstream_table,
261 snapshot_epoch,
262 self.chunk_size,
263 &mut self.rate_limit,
264 &mut self.barrier_rx,
265 &mut self.progress,
266 &mut backfill_state,
267 first_recv_barrier_epoch,
268 initial_backfill_paused,
269 &self.actor_ctx,
270 &self.pk_scan_range,
271 );
272
273 pin_mut!(snapshot_stream);
274
275 while let Some(message) = upstream_buffer
276 .run_future(snapshot_stream.try_next())
277 .await?
278 {
279 if let Message::Chunk(chunk) = &message {
280 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
281 }
282 yield message;
283 }
284 }
285
286 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
287 let recv_barrier_epoch = recv_barrier.epoch;
288 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
289 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
290 yield Message::Barrier(recv_barrier);
291 post_commit.post_yield_barrier(None).await?;
292 (
293 recv_barrier_epoch,
294 upstream_buffer.start_consuming_log_store(snapshot_epoch),
295 )
296 } else {
297 trace!(
298 table_id = %self.upstream_table.table_id(),
299 snapshot_epoch,
300 barrier_epoch = ?first_recv_barrier_epoch,
301 "skip consuming snapshot"
302 );
303 (
304 first_recv_barrier_epoch,
305 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
306 )
307 };
308
309 match upstream_buffer {
311 Either::Left(mut upstream_buffer) => {
312 let initial_pending_lag =
313 if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
314 &upstream_buffer
315 {
316 Some(Duration::from_millis(
317 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
318 ))
319 } else {
320 None
321 };
322 trace!(
323 ?barrier_epoch,
324 table_id = %self.upstream_table.table_id(),
325 ?initial_pending_lag,
326 "start consuming log store"
327 );
328
329 let consuming_log_store_row_count = self
330 .metrics
331 .snapshot_backfill_consume_row_count
332 .with_guarded_label_values(&[
333 table_id_str.as_str(),
334 actor_id_str.as_str(),
335 "consuming_log_store",
336 ]);
337 let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
338 loop {
339 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
340 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
341 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
342 barrier_epoch = barrier.epoch;
357 if barrier.kind.is_checkpoint() {
358 let pending_non_checkpoint_barrier =
359 take(&mut pending_non_checkpoint_barrier);
360 let end_epoch = barrier_epoch.prev;
361 let start_epoch = pending_non_checkpoint_barrier
362 .first()
363 .map(|epoch| epoch.prev)
364 .unwrap_or(end_epoch);
365 trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
366 let mut stream = upstream_buffer
370 .run_future(make_log_stream(
371 &self.upstream_table,
372 start_epoch,
373 end_epoch,
374 None,
375 self.chunk_size,
376 ))
377 .await?;
378 while let Some(chunk) =
379 upstream_buffer.run_future(stream.try_next()).await?
380 {
381 trace!(
382 ?barrier_epoch,
383 size = chunk.cardinality(),
384 "consume change log yield chunk",
385 );
386 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
387 yield Message::Chunk(chunk);
388 }
389
390 trace!(?barrier_epoch, "after consume change log");
391
392 stream
393 .for_vnode_pk_progress(|vnode, row_count, progress| {
394 assert_eq!(progress, None);
395 backfill_state.finish_epoch(
396 vnode,
397 barrier.epoch.prev,
398 row_count,
399 );
400 })
401 .await?;
402 } else {
403 pending_non_checkpoint_barrier.push(barrier.epoch);
404 }
405
406 if let SnapshotBackfillUpstream::Buffer(upstream_buffer) =
407 &upstream_buffer
408 {
409 if is_finished {
410 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
411 assert!(pending_non_checkpoint_barrier.is_empty());
412 self.progress.finish_consuming_log_store(barrier.epoch);
413 } else {
414 self.progress.update_create_mview_log_store_progress(
415 barrier.epoch,
416 upstream_buffer.pending_epoch_lag(),
417 );
418 }
419 }
420
421 let post_commit = backfill_state.commit(barrier.epoch).await?;
422 let update_vnode_bitmap =
423 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
424 yield Message::Barrier(barrier);
425 post_commit.post_yield_barrier(None).await?;
426 if update_vnode_bitmap.is_some() {
427 return Err(anyhow!(
428 "should not update vnode bitmap during consuming log store"
429 )
430 .into());
431 }
432
433 if is_finished {
434 assert!(
435 pending_non_checkpoint_barrier.is_empty(),
436 "{pending_non_checkpoint_barrier:?}"
437 );
438 break;
439 }
440 }
441 trace!(
442 ?barrier_epoch,
443 table_id = %self.upstream_table.table_id(),
444 "finish consuming log store"
445 );
446
447 (
448 barrier_epoch,
449 false,
450 upstream_buffer.start_consuming_upstream(),
451 )
452 }
453 Either::Right(upstream) => {
454 trace!(
455 ?barrier_epoch,
456 table_id = %self.upstream_table.table_id(),
457 "skip consuming log store and start consuming upstream directly"
458 );
459
460 (barrier_epoch, true, upstream)
461 }
462 }
463 } else {
464 let (first_upstream_barrier, _) = upstream
465 .as_ref()
466 .expect("should have upstream when skipping snapshot backfill");
467 backfill_state
468 .latest_progress()
469 .for_each(|(vnode, progress)| {
470 let progress = progress.expect("should not be empty");
471 assert_eq!(
472 progress.epoch, first_upstream_barrier.epoch.prev,
473 "vnode: {:?}",
474 vnode
475 );
476 assert_eq!(
477 progress.progress,
478 EpochBackfillProgress::Consumed,
479 "vnode: {:?}",
480 vnode
481 );
482 });
483 trace!(
484 table_id = %self.upstream_table.table_id(),
485 "skip backfill"
486 );
487 let (first_upstream_barrier, upstream) =
488 upstream.expect("should have upstream when skipping snapshot backfill");
489 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
490 (first_upstream_barrier.epoch, true, upstream)
491 }
492 };
493 let current_stream_key_indices = self
494 .stream_key
495 .iter()
496 .map(|idx| self.output_indices[*idx])
497 .collect();
498 let update_normalizer = UpstreamStreamKeyUpdateNormalizer::new(
499 &upstream.info.stream_key,
500 current_stream_key_indices,
501 );
502 let mut upstream = upstream.into_executor(self.barrier_rx).execute();
503 let mut epoch_row_count = 0;
504 while let Some(msg) = upstream.try_next().await? {
506 let Some(msg) = update_normalizer.normalize_message(msg) else {
507 continue;
508 };
509 match msg {
510 Message::Barrier(barrier) => {
511 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
512 self.upstream_table
513 .vnodes()
514 .iter_vnodes()
515 .for_each(|vnode| {
516 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
519 });
520 epoch_row_count = 0;
521 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
522 barrier_epoch = barrier.epoch;
523 if need_report_finish {
524 need_report_finish = false;
525 self.progress.finish_consuming_log_store(barrier_epoch);
526 }
527 let post_commit = backfill_state.commit(barrier.epoch).await?;
528 yield Message::Barrier(barrier);
529 if let Some(new_vnode_bitmap) =
530 post_commit.post_yield_barrier(update_vnode_bitmap).await?
531 {
532 let _prev_vnode_bitmap =
533 self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
534 backfill_state
535 .latest_progress()
536 .for_each(|(vnode, progress)| {
537 let progress = progress.expect("should not be empty");
538 assert_eq!(
539 progress.epoch, barrier_epoch.prev,
540 "vnode {:?} has unexpected progress epoch",
541 vnode
542 );
543 assert_eq!(
544 progress.progress,
545 EpochBackfillProgress::Consumed,
546 "vnode {:?} has unexpected progress",
547 vnode
548 );
549 });
550 }
551 }
552 msg => {
553 if let Message::Chunk(chunk) = &msg {
554 epoch_row_count += chunk.cardinality();
555 }
556 yield msg;
557 }
558 }
559 }
560 }
561}
562
563impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
564 fn execute(self: Box<Self>) -> BoxedMessageStream {
565 let output_indices = self.output_indices.clone();
566 self.execute_inner()
567 .filter_map(move |result| {
568 ready({
569 match result {
570 Ok(message) => mapping_message(message, &output_indices).map(Ok),
571 Err(e) => Some(Err(e)),
572 }
573 })
574 })
575 .boxed()
576 }
577}
578
579struct ConsumingSnapshot;
580struct ConsumingLogStore;
581
582#[derive(Debug)]
583struct PendingBarriers {
584 first_upstream_barrier_epoch: EpochPair,
585
586 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
589
590 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
594}
595
596impl PendingBarriers {
597 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
598 Self {
599 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
600 pending_non_checkpoint_barriers: Default::default(),
601 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
602 first_upstream_barrier,
603 ])]),
604 }
605 }
606
607 fn add(&mut self, barrier: DispatcherBarrier) {
608 let is_checkpoint = barrier.kind.is_checkpoint();
609 self.pending_non_checkpoint_barriers.push_front(barrier);
610 if is_checkpoint {
611 self.checkpoint_barrier_groups
612 .push_front(take(&mut self.pending_non_checkpoint_barriers));
613 }
614 }
615
616 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
617 self.checkpoint_barrier_groups.pop_back()
618 }
619
620 fn consume_epoch(&mut self, epoch: EpochPair) {
621 let barriers = self
622 .checkpoint_barrier_groups
623 .back_mut()
624 .expect("non-empty");
625 let oldest_upstream_barrier = barriers.back().expect("non-empty");
626 assert!(
627 oldest_upstream_barrier.epoch.prev >= epoch.prev,
628 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
629 oldest_upstream_barrier.epoch,
630 epoch
631 );
632 if oldest_upstream_barrier.epoch.prev == epoch.prev {
633 assert_eq!(oldest_upstream_barrier.epoch, epoch);
634 barriers.pop_back();
635 if barriers.is_empty() {
636 self.checkpoint_barrier_groups.pop_back();
637 }
638 }
639 }
640
641 fn latest_epoch(&self) -> Option<EpochPair> {
642 self.pending_non_checkpoint_barriers
643 .front()
644 .or_else(|| {
645 self.checkpoint_barrier_groups
646 .front()
647 .and_then(|barriers| barriers.front())
648 })
649 .map(|barrier| barrier.epoch)
650 }
651
652 fn checkpoint_epoch_count(&self) -> usize {
653 self.checkpoint_barrier_groups.len()
654 }
655
656 fn has_checkpoint_epoch(&self) -> bool {
657 !self.checkpoint_barrier_groups.is_empty()
658 }
659}
660
661enum SnapshotBackfillUpstream<S> {
662 Empty,
663 Buffer(UpstreamBuffer<S>),
664}
665
666impl<S> SnapshotBackfillUpstream<S> {
667 async fn run_future<T, E: Into<StreamExecutorError>>(
668 &mut self,
669 future: impl Future<Output = Result<T, E>>,
670 ) -> StreamExecutorResult<T> {
671 match self {
672 SnapshotBackfillUpstream::Empty => future.await.map_err(Into::into),
673 SnapshotBackfillUpstream::Buffer(buffer) => buffer.run_future(future).await,
674 }
675 }
676}
677
678impl SnapshotBackfillUpstream<ConsumingSnapshot> {
679 fn start_consuming_log_store(
680 self,
681 consumed_epoch: u64,
682 ) -> Either<SnapshotBackfillUpstream<ConsumingLogStore>, MergeExecutorInput> {
683 match self {
684 SnapshotBackfillUpstream::Empty => Either::Left(SnapshotBackfillUpstream::Empty),
685 SnapshotBackfillUpstream::Buffer(buffer) => {
686 match buffer.start_consuming_log_store(consumed_epoch) {
687 Either::Left(buffer) => Either::Left(SnapshotBackfillUpstream::Buffer(buffer)),
688 Either::Right(input) => Either::Right(input),
689 }
690 }
691 }
692 }
693}
694
695impl SnapshotBackfillUpstream<ConsumingLogStore> {
696 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
697 match self {
698 SnapshotBackfillUpstream::Empty => Ok(false),
699 SnapshotBackfillUpstream::Buffer(buffer) => buffer.consumed_epoch(epoch).await,
700 }
701 }
702
703 fn start_consuming_upstream(self) -> MergeExecutorInput {
704 match self {
705 SnapshotBackfillUpstream::Empty => {
706 unreachable!("unlike to start consuming upstream when having no upstream")
707 }
708 SnapshotBackfillUpstream::Buffer(buffer) => buffer.start_consuming_upstream(),
709 }
710 }
711}
712
713struct UpstreamBuffer<S> {
714 upstream: MergeExecutorInput,
715 max_pending_epoch_lag: u64,
716 consumed_epoch: u64,
717 upstream_pending_barriers: PendingBarriers,
719 is_polling_epoch_data: bool,
724 consume_upstream_row_count: LabelGuardedIntCounter,
725 _phase: S,
726}
727
728impl UpstreamBuffer<ConsumingSnapshot> {
729 fn new(
730 upstream: MergeExecutorInput,
731 first_upstream_barrier: DispatcherBarrier,
732 consume_upstream_row_count: LabelGuardedIntCounter,
733 ) -> Self {
734 Self {
735 upstream,
736 is_polling_epoch_data: false,
737 consume_upstream_row_count,
738 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
739 max_pending_epoch_lag: u64::MAX,
741 consumed_epoch: 0,
742 _phase: ConsumingSnapshot {},
743 }
744 }
745
746 fn start_consuming_log_store(
747 mut self,
748 consumed_epoch: u64,
749 ) -> Either<UpstreamBuffer<ConsumingLogStore>, MergeExecutorInput> {
750 if self
751 .upstream_pending_barriers
752 .first_upstream_barrier_epoch
753 .prev
754 == consumed_epoch
755 {
756 assert_eq!(
757 1,
758 self.upstream_pending_barriers
759 .pop()
760 .expect("non-empty")
761 .len()
762 );
763 }
764 let max_pending_epoch_lag = self.pending_epoch_lag();
765 let buffer = UpstreamBuffer {
766 upstream: self.upstream,
767 upstream_pending_barriers: self.upstream_pending_barriers,
768 max_pending_epoch_lag,
769 is_polling_epoch_data: self.is_polling_epoch_data,
770 consume_upstream_row_count: self.consume_upstream_row_count,
771 consumed_epoch,
772 _phase: ConsumingLogStore {},
773 };
774 if buffer.is_finished() {
775 Either::Right(buffer.upstream)
776 } else {
777 Either::Left(buffer)
778 }
779 }
780}
781
782impl<S> UpstreamBuffer<S> {
783 fn can_consume_upstream(&self) -> bool {
784 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
785 }
786
787 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
788 {
789 loop {
790 if let Err(e) = try {
791 if !self.can_consume_upstream() {
792 sleep(Duration::from_secs(30)).await;
794 warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
795 return pending().await;
796 }
797 self.consume_until_next_checkpoint_barrier().await?;
798 } {
799 break e;
800 }
801 }
802 }
803 }
804
805 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
807 loop {
808 let msg: DispatcherMessage = self
809 .upstream
810 .try_next()
811 .await?
812 .ok_or_else(|| anyhow!("end of upstream"))?;
813 match msg {
814 DispatcherMessage::Chunk(chunk) => {
815 self.is_polling_epoch_data = true;
816 self.consume_upstream_row_count
817 .inc_by(chunk.cardinality() as _);
818 }
819 DispatcherMessage::Barrier(barrier) => {
820 let is_checkpoint = barrier.kind.is_checkpoint();
821 self.upstream_pending_barriers.add(barrier);
822 if is_checkpoint {
823 self.is_polling_epoch_data = false;
824 break;
825 } else {
826 self.is_polling_epoch_data = true;
827 }
828 }
829 DispatcherMessage::Watermark(_) => {
830 self.is_polling_epoch_data = true;
831 }
832 }
833 }
834 Ok(())
835 }
836}
837
838impl UpstreamBuffer<ConsumingLogStore> {
839 #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
840 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
841 assert!(!self.is_finished());
842 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
843 assert!(self.is_polling_epoch_data);
846 self.consume_until_next_checkpoint_barrier().await?;
847 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
848 }
849 self.upstream_pending_barriers.consume_epoch(epoch);
850
851 {
852 {
853 let prev_epoch = epoch.prev;
854 assert!(self.consumed_epoch < prev_epoch);
855 let elapsed_epoch = prev_epoch - self.consumed_epoch;
856 self.consumed_epoch = prev_epoch;
857 if self.upstream_pending_barriers.has_checkpoint_epoch() {
858 while self.can_consume_upstream()
860 && let Some(result) =
861 self.consume_until_next_checkpoint_barrier().now_or_never()
862 {
863 result?;
864 }
865 }
866 self.max_pending_epoch_lag = min(
870 self.pending_epoch_lag(),
871 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
872 );
873 }
874 }
875 Ok(self.is_finished())
876 }
877
878 fn is_finished(&self) -> bool {
879 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
880 assert!(
881 self.upstream_pending_barriers
882 .pending_non_checkpoint_barriers
883 .is_empty()
884 )
885 }
886 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
887 }
888
889 fn start_consuming_upstream(self) -> MergeExecutorInput {
890 assert!(self.is_finished());
891 assert_eq!(self.pending_epoch_lag(), 0);
892 self.upstream
893 }
894}
895
896impl<S> UpstreamBuffer<S> {
897 async fn run_future<T, E: Into<StreamExecutorError>>(
900 &mut self,
901 future: impl Future<Output = Result<T, E>>,
902 ) -> StreamExecutorResult<T> {
903 select! {
904 biased;
905 e = self.concurrently_consume_upstream() => {
906 Err(e)
907 }
908 result = future => {
910 result.map_err(Into::into)
911 }
912 }
913 }
914
915 fn pending_epoch_lag(&self) -> u64 {
916 self.upstream_pending_barriers
917 .latest_epoch()
918 .map(|epoch| {
919 epoch
920 .prev
921 .checked_sub(self.consumed_epoch)
922 .expect("pending epoch must be later than consumed_epoch")
923 })
924 .unwrap_or(0)
925 }
926}
927
928#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
929async fn make_log_stream(
930 upstream_table: &BatchTable<impl StateStore>,
931 start_epoch: u64,
932 end_epoch: u64,
933 start_pk: Option<OwnedRow>,
934 chunk_size: usize,
935) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
936 let data_types = upstream_table.schema().data_types();
937 let start_pk = start_pk.as_ref();
938 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
940 upstream_table
941 .batch_iter_vnode_log(
942 start_epoch,
943 HummockReadEpoch::Committed(end_epoch),
944 start_pk,
945 vnode,
946 )
947 .map_ok(move |stream| {
948 let stream = stream.map_err(Into::into);
949 (vnode, stream, 0)
950 })
951 }))
952 .await?;
953 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
954 Ok(VnodeStream::new(
955 vnode_streams,
956 upstream_table.pk_in_output_indices().expect("should exist"),
957 builder,
958 ))
959}
960
961async fn make_snapshot_stream(
962 upstream_table: &BatchTable<impl StateStore>,
963 snapshot_epoch: u64,
964 backfill_state: &BackfillState<impl StateStore>,
965 rate_limit: RateLimit,
966 chunk_size: usize,
967 snapshot_rebuild_interval: Duration,
968 pk_scan_range: &PkScanRange,
969) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
970 let data_types = upstream_table.schema().data_types();
971 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
972 move |(vnode, progress)| {
973 let start_pk = match progress {
974 None => Some((None, 0)),
975 Some(VnodeBackfillProgress {
976 row_count,
977 progress: EpochBackfillProgress::Consuming { latest_pk },
978 ..
979 }) => Some((Some(latest_pk), *row_count)),
980 Some(VnodeBackfillProgress {
981 progress: EpochBackfillProgress::Consumed,
982 ..
983 }) => None,
984 };
985 start_pk.map(|(start_pk, row_count)| {
986 upstream_table
987 .batch_iter_vnode_with_pk_range(
988 HummockReadEpoch::Committed(snapshot_epoch),
989 start_pk,
990 &pk_scan_range.pk_prefix,
991 &pk_scan_range.range_bounds,
992 vnode,
993 PrefetchOptions::prefetch_for_large_range_scan(),
994 snapshot_rebuild_interval,
995 )
996 .map_ok(move |stream| {
997 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
998 (vnode, stream, row_count)
999 })
1000 })
1001 },
1002 ))
1003 .await?;
1004 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
1005 Ok(VnodeStream::new(
1006 vnode_streams,
1007 upstream_table.pk_in_output_indices().expect("should exist"),
1008 builder,
1009 ))
1010}
1011
1012#[expect(clippy::too_many_arguments)]
1013#[try_stream(ok = Message, error = StreamExecutorError)]
1014async fn make_consume_snapshot_stream<'a, S: StateStore>(
1015 upstream_table: &'a BatchTable<S>,
1016 snapshot_epoch: u64,
1017 chunk_size: usize,
1018 rate_limit: &'a mut RateLimit,
1019 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
1020 progress: &'a mut CreateMviewProgressReporter,
1021 backfill_state: &'a mut BackfillState<S>,
1022 first_recv_barrier_epoch: EpochPair,
1023 initial_backfill_paused: bool,
1024 actor_ctx: &'a ActorContextRef,
1025 pk_scan_range: &'a PkScanRange,
1026) {
1027 let mut barrier_epoch = first_recv_barrier_epoch;
1028
1029 let mut snapshot_stream = make_snapshot_stream(
1031 upstream_table,
1032 snapshot_epoch,
1033 &*backfill_state,
1034 *rate_limit,
1035 chunk_size,
1036 actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
1037 pk_scan_range,
1038 )
1039 .await?;
1040
1041 async fn select_barrier_and_snapshot_stream(
1042 barrier_rx: &mut UnboundedReceiver<Barrier>,
1043 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
1044 throttle_snapshot_stream: bool,
1045 backfill_paused: bool,
1046 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
1047 select!(
1048 result = receive_next_barrier(barrier_rx) => {
1049 Ok(Either::Left(result?))
1050 },
1051 result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
1052 Ok(Either::Right(result?))
1053 }
1054 )
1055 }
1056
1057 let mut count = 0;
1058 let mut epoch_row_count = 0;
1059 let mut backfill_paused = initial_backfill_paused;
1060 loop {
1061 let throttle_snapshot_stream = epoch_row_count as u64 >= rate_limit.to_u64();
1062 match select_barrier_and_snapshot_stream(
1063 barrier_rx,
1064 &mut snapshot_stream,
1065 throttle_snapshot_stream,
1066 backfill_paused,
1067 )
1068 .await?
1069 {
1070 Either::Left(barrier) => {
1071 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1072 barrier_epoch = barrier.epoch;
1073
1074 if barrier_epoch.curr >= snapshot_epoch {
1075 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
1076 }
1077 if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
1078 backfill_paused = false;
1079 }
1080 if let Some(chunk) = snapshot_stream.consume_builder() {
1081 count += chunk.cardinality();
1082 epoch_row_count += chunk.cardinality();
1083 yield Message::Chunk(chunk);
1084 }
1085 snapshot_stream
1086 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1087 if let Some(pk) = pk_progress {
1088 backfill_state.update_epoch_progress(
1089 vnode,
1090 snapshot_epoch,
1091 row_count,
1092 pk,
1093 );
1094 } else {
1095 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1096 }
1097 })
1098 .await?;
1099 let post_commit = backfill_state.commit(barrier.epoch).await?;
1100 trace!(?barrier_epoch, count, epoch_row_count, "update progress");
1101 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
1102 epoch_row_count = 0;
1103
1104 let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
1105 if let Mutation::Throttle(config) = &**m
1106 && let Some(config) = config.get(&actor_ctx.fragment_id)
1107 && config.throttle_type() == PbThrottleType::Backfill
1108 {
1109 Some(config.rate_limit)
1110 } else {
1111 None
1112 }
1113 });
1114 yield Message::Barrier(barrier);
1115 post_commit.post_yield_barrier(None).await?;
1116
1117 if let Some(new_rate_limit) = new_rate_limit {
1118 let new_rate_limit = new_rate_limit.into();
1119 *rate_limit = new_rate_limit;
1120 snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
1121 }
1122 }
1123 Either::Right(Some(chunk)) => {
1124 if backfill_paused {
1125 return Err(
1126 anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
1127 );
1128 }
1129 count += chunk.cardinality();
1130 epoch_row_count += chunk.cardinality();
1131 yield Message::Chunk(chunk);
1132 }
1133 Either::Right(None) => {
1134 break;
1135 }
1136 }
1137 }
1138
1139 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
1141 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
1142 barrier_epoch = barrier_to_report_finish.epoch;
1143 trace!(?barrier_epoch, count, "report finish");
1144 snapshot_stream
1145 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
1146 assert_eq!(pk_progress, None);
1147 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
1148 })
1149 .await?;
1150 let post_commit = backfill_state.commit(barrier_epoch).await?;
1151 progress.finish(barrier_epoch, count as _);
1152 yield Message::Barrier(barrier_to_report_finish);
1153 post_commit.post_yield_barrier(None).await?;
1154
1155 loop {
1157 let barrier = receive_next_barrier(barrier_rx).await?;
1158 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1159 barrier_epoch = barrier.epoch;
1160 let post_commit = backfill_state.commit(barrier.epoch).await?;
1161 yield Message::Barrier(barrier);
1162 post_commit.post_yield_barrier(None).await?;
1163 if barrier_epoch.curr == snapshot_epoch {
1164 break;
1165 }
1166 }
1167 trace!(?barrier_epoch, "finish consuming snapshot");
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use std::collections::HashSet;
1173 use std::sync::Arc;
1174
1175 use risingwave_common::array::StreamChunk;
1176 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1177 use risingwave_common::row::OwnedRow;
1178 use risingwave_common::test_prelude::StreamChunkTestExt;
1179 use risingwave_common::types::DataType;
1180 use risingwave_common::util::epoch::{EpochPair, test_epoch};
1181 use risingwave_common::util::sort_util::OrderType;
1182 use risingwave_hummock_test::test_utils::{HummockTestEnv, prepare_hummock_test_env};
1183 use risingwave_rpc_client::HummockMetaClient;
1184 use risingwave_storage::hummock::HummockStorage;
1185 use risingwave_storage::table::batch_table::BatchTable;
1186 use tokio::sync::mpsc::unbounded_channel;
1187 use tokio::time::{Duration, timeout};
1188
1189 use super::*;
1190 use crate::common::table::state_table::{
1191 StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
1192 };
1193 use crate::common::table::test_utils::gen_pbtable_with_value_indices;
1194 use crate::executor::exchange::input::{Input, LocalInput};
1195 use crate::executor::exchange::permit::channel_for_test;
1196 use crate::executor::{ActorContext, DispatcherMessage, ExecutorInfo, MergeExecutorUpstream};
1197 use crate::task::LocalBarrierManager;
1198
1199 const SOURCE_TABLE_ID: TableId = TableId::new(0x233);
1200 const PROGRESS_TABLE_ID: TableId = TableId::new(0x234);
1201
1202 fn source_table_pb() -> risingwave_pb::catalog::PbTable {
1203 gen_pbtable_with_value_indices(
1204 SOURCE_TABLE_ID,
1205 vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1206 vec![OrderType::ascending()],
1207 vec![0],
1208 0,
1209 vec![0],
1210 )
1211 }
1212
1213 fn progress_table_pb() -> risingwave_pb::catalog::PbTable {
1214 gen_pbtable_with_value_indices(
1215 PROGRESS_TABLE_ID,
1216 vec![
1217 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int16),
1218 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1219 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1220 ColumnDesc::unnamed(ColumnId::new(3), DataType::Boolean),
1221 ColumnDesc::unnamed(ColumnId::new(4), DataType::Int64),
1222 ],
1223 vec![OrderType::ascending()],
1224 vec![0],
1225 1,
1226 vec![1, 2, 3, 4],
1227 )
1228 }
1229
1230 fn source_batch_table(store: HummockStorage) -> BatchTable<HummockStorage> {
1231 BatchTable::for_test(
1232 store,
1233 SOURCE_TABLE_ID,
1234 vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64)],
1235 vec![OrderType::ascending()],
1236 vec![0],
1237 vec![0],
1238 )
1239 }
1240
1241 async fn source_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1242 StateTableBuilder::new(&source_table_pb(), store, None)
1243 .with_op_consistency_level(StateTableOpConsistencyLevel::LogStoreEnabled)
1244 .forbid_preload_all_rows()
1245 .build()
1246 .await
1247 }
1248
1249 async fn progress_state_table(store: HummockStorage) -> StateTable<HummockStorage> {
1250 StateTable::from_table_catalog(&progress_table_pb(), store, None).await
1251 }
1252
1253 async fn commit_insert_epoch(
1254 test_env: &HummockTestEnv,
1255 source_state_table: &mut StateTable<HummockStorage>,
1256 epoch: &mut EpochPair,
1257 table_ids: HashSet<TableId>,
1258 values: &[i64],
1259 ) {
1260 for value in values {
1261 source_state_table.insert(OwnedRow::new(vec![Some((*value).into())]));
1262 }
1263 epoch.inc_for_test();
1264 test_env.storage.start_epoch(epoch.curr, table_ids);
1265 source_state_table.commit_for_test(*epoch).await.unwrap();
1266 let res = test_env
1267 .storage
1268 .seal_and_sync_epoch(epoch.prev, HashSet::from_iter([SOURCE_TABLE_ID]))
1269 .await
1270 .unwrap();
1271 test_env
1272 .meta_client
1273 .commit_epoch_with_change_log(epoch.prev, res, Some(vec![epoch.prev]))
1274 .await
1275 .unwrap();
1276 test_env
1277 .storage
1278 .wait_version(test_env.manager.get_current_version().await)
1279 .await;
1280 }
1281
1282 fn start_progress_epochs(test_env: &HummockTestEnv, max_epoch: u64) {
1283 for epoch in 1..=max_epoch {
1284 test_env
1285 .storage
1286 .start_epoch(test_epoch(epoch), HashSet::from_iter([PROGRESS_TABLE_ID]));
1287 }
1288 }
1289
1290 fn make_upstream_input(
1291 barrier_manager: LocalBarrierManager,
1292 actor_ctx: ActorContextRef,
1293 rx: crate::executor::exchange::permit::Receiver,
1294 ) -> MergeExecutorInput {
1295 MergeExecutorInput::new(
1296 MergeExecutorUpstream::Singleton(LocalInput::new(rx, 1001.into()).boxed_input()),
1297 actor_ctx,
1298 1919.into(),
1299 barrier_manager,
1300 Arc::new(StreamingMetrics::unused()),
1301 ExecutorInfo::for_test(
1302 Schema::new(vec![Field::unnamed(DataType::Int64)]),
1303 vec![0],
1304 "SnapshotBackfillUpstream".to_owned(),
1305 0,
1306 ),
1307 )
1308 }
1309
1310 async fn expect_barrier_with_timeout(
1311 executor: &mut BoxedMessageStream,
1312 reason: &str,
1313 ) -> Barrier {
1314 let message = timeout(Duration::from_secs(10), executor.next())
1315 .await
1316 .unwrap_or_else(|_| panic!("timed out waiting for barrier: {reason}"))
1317 .unwrap()
1318 .unwrap();
1319 match message {
1320 Message::Barrier(barrier) => barrier,
1321 other => panic!("expected barrier for {reason}, got {other:?}"),
1322 }
1323 }
1324
1325 async fn expect_chunk_with_timeout(
1326 executor: &mut BoxedMessageStream,
1327 reason: &str,
1328 ) -> StreamChunk {
1329 let message = timeout(Duration::from_secs(10), executor.next())
1330 .await
1331 .unwrap_or_else(|_| panic!("timed out waiting for chunk: {reason}"))
1332 .unwrap()
1333 .unwrap();
1334 match message {
1335 Message::Chunk(chunk) => chunk,
1336 other => panic!("expected chunk for {reason}, got {other:?}"),
1337 }
1338 }
1339
1340 async fn expect_pending_with_timeout(executor: &mut BoxedMessageStream, reason: &str) {
1341 assert!(
1342 timeout(Duration::from_millis(200), executor.next())
1343 .await
1344 .is_err(),
1345 "executor unexpectedly produced a message while waiting for {reason}"
1346 );
1347 }
1348
1349 #[tokio::test]
1350 async fn test_snapshot_backfill_without_upstream_on_hummock() {
1351 let source_env = prepare_hummock_test_env().await;
1352 source_env.register_table(source_table_pb()).await;
1353 let progress_env = prepare_hummock_test_env().await;
1354 progress_env.register_table(progress_table_pb()).await;
1355
1356 let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1357 let source_table = source_batch_table(source_env.storage.clone());
1358 let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1359
1360 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1361 source_env
1362 .storage
1363 .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1364 source_state_table.init_epoch(epoch).await.unwrap();
1365
1366 commit_insert_epoch(
1367 &source_env,
1368 &mut source_state_table,
1369 &mut epoch,
1370 HashSet::from_iter([SOURCE_TABLE_ID]),
1371 &[1],
1372 )
1373 .await;
1374 commit_insert_epoch(
1375 &source_env,
1376 &mut source_state_table,
1377 &mut epoch,
1378 HashSet::from_iter([SOURCE_TABLE_ID]),
1379 &[2],
1380 )
1381 .await;
1382 commit_insert_epoch(
1383 &source_env,
1384 &mut source_state_table,
1385 &mut epoch,
1386 HashSet::from_iter([SOURCE_TABLE_ID]),
1387 &[3],
1388 )
1389 .await;
1390 commit_insert_epoch(
1391 &source_env,
1392 &mut source_state_table,
1393 &mut epoch,
1394 HashSet::from_iter([SOURCE_TABLE_ID]),
1395 &[],
1396 )
1397 .await;
1398 start_progress_epochs(&progress_env, 5);
1399
1400 let barrier_manager = LocalBarrierManager::for_test();
1401 let progress = CreateMviewProgressReporter::for_test(barrier_manager);
1402 let actor_ctx = ActorContext::for_test(1234);
1403 let (barrier_tx, barrier_rx) = unbounded_channel();
1404 barrier_tx
1405 .send(Barrier::new_test_barrier(test_epoch(1)))
1406 .unwrap();
1407
1408 let mut executor = SnapshotBackfillExecutor::new(
1409 source_table,
1410 progress_state_table,
1411 None,
1412 None,
1413 vec![0],
1414 vec![0],
1415 actor_ctx,
1416 progress,
1417 1024,
1418 RateLimit::Disabled,
1419 barrier_rx,
1420 Arc::new(StreamingMetrics::unused()),
1421 Some(test_epoch(3)),
1422 )
1423 .expect("snapshot backfill executor should be created")
1424 .boxed()
1425 .execute();
1426
1427 assert_eq!(
1428 expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1429 .await
1430 .epoch,
1431 Barrier::new_test_barrier(test_epoch(1)).epoch
1432 );
1433 assert_eq!(
1434 expect_chunk_with_timeout(&mut executor, "snapshot chunk without upstream").await,
1435 StreamChunk::from_pretty(
1436 " I
1437 + 1
1438 + 2
1439 + 3"
1440 )
1441 );
1442 expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1443
1444 barrier_tx
1445 .send(Barrier::new_test_barrier(test_epoch(2)))
1446 .unwrap();
1447 assert_eq!(
1448 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1449 .await
1450 .epoch,
1451 Barrier::new_test_barrier(test_epoch(2)).epoch
1452 );
1453
1454 barrier_tx
1455 .send(Barrier::new_test_barrier(test_epoch(3)))
1456 .unwrap();
1457 assert_eq!(
1458 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1459 .await
1460 .epoch,
1461 Barrier::new_test_barrier(test_epoch(3)).epoch
1462 );
1463
1464 barrier_tx
1465 .send(Barrier::new_test_barrier(test_epoch(4)))
1466 .unwrap();
1467 assert_eq!(
1468 expect_barrier_with_timeout(&mut executor, "post-snapshot barrier 4")
1469 .await
1470 .epoch,
1471 Barrier::new_test_barrier(test_epoch(4)).epoch
1472 );
1473
1474 barrier_tx
1475 .send(Barrier::new_test_barrier(test_epoch(5)))
1476 .unwrap();
1477 assert_eq!(
1478 expect_barrier_with_timeout(&mut executor, "steady-state barrier 5")
1479 .await
1480 .epoch,
1481 Barrier::new_test_barrier(test_epoch(5)).epoch
1482 );
1483
1484 expect_pending_with_timeout(&mut executor, "next local barrier").await;
1485 }
1486
1487 #[tokio::test]
1488 async fn test_snapshot_backfill_with_upstream_on_hummock() {
1489 let source_env = prepare_hummock_test_env().await;
1490 source_env.register_table(source_table_pb()).await;
1491 let progress_env = prepare_hummock_test_env().await;
1492 progress_env.register_table(progress_table_pb()).await;
1493
1494 let mut source_state_table = source_state_table(source_env.storage.clone()).await;
1495 let source_table = source_batch_table(source_env.storage.clone());
1496 let progress_state_table = progress_state_table(progress_env.storage.clone()).await;
1497
1498 let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
1499 source_env
1500 .storage
1501 .start_epoch(epoch.curr, HashSet::from_iter([SOURCE_TABLE_ID]));
1502 source_state_table.init_epoch(epoch).await.unwrap();
1503
1504 commit_insert_epoch(
1505 &source_env,
1506 &mut source_state_table,
1507 &mut epoch,
1508 HashSet::from_iter([SOURCE_TABLE_ID]),
1509 &[],
1510 )
1511 .await;
1512 commit_insert_epoch(
1513 &source_env,
1514 &mut source_state_table,
1515 &mut epoch,
1516 HashSet::from_iter([SOURCE_TABLE_ID]),
1517 &[],
1518 )
1519 .await;
1520 commit_insert_epoch(
1521 &source_env,
1522 &mut source_state_table,
1523 &mut epoch,
1524 HashSet::from_iter([SOURCE_TABLE_ID]),
1525 &[],
1526 )
1527 .await;
1528 commit_insert_epoch(
1529 &source_env,
1530 &mut source_state_table,
1531 &mut epoch,
1532 HashSet::from_iter([SOURCE_TABLE_ID]),
1533 &[4],
1534 )
1535 .await;
1536 start_progress_epochs(&progress_env, 6);
1537
1538 let barrier_manager = LocalBarrierManager::for_test();
1539 let progress = CreateMviewProgressReporter::for_test(barrier_manager.clone());
1540 let actor_ctx = ActorContext::for_test(1235);
1541 let (barrier_tx, barrier_rx) = unbounded_channel();
1542 let (upstream_tx, upstream_rx) = channel_for_test();
1543
1544 upstream_tx
1545 .send(
1546 DispatcherMessage::Barrier(
1547 Barrier::new_test_barrier(test_epoch(5)).into_dispatcher(),
1548 )
1549 .into(),
1550 )
1551 .await
1552 .unwrap();
1553 barrier_tx
1554 .send(Barrier::new_test_barrier(test_epoch(1)))
1555 .unwrap();
1556
1557 let mut executor = SnapshotBackfillExecutor::new(
1558 source_table,
1559 progress_state_table,
1560 Some(make_upstream_input(
1561 barrier_manager,
1562 actor_ctx.clone(),
1563 upstream_rx,
1564 )),
1565 None,
1566 vec![0],
1567 vec![0],
1568 actor_ctx,
1569 progress,
1570 1024,
1571 RateLimit::Disabled,
1572 barrier_rx,
1573 Arc::new(StreamingMetrics::unused()),
1574 Some(test_epoch(3)),
1575 )
1576 .expect("snapshot backfill executor should be created")
1577 .boxed()
1578 .execute();
1579
1580 assert_eq!(
1581 expect_barrier_with_timeout(&mut executor, "initial injected barrier")
1582 .await
1583 .epoch,
1584 Barrier::new_test_barrier(test_epoch(1)).epoch
1585 );
1586 expect_pending_with_timeout(&mut executor, "snapshot finish barrier 2").await;
1587 barrier_tx
1588 .send(Barrier::new_test_barrier(test_epoch(2)))
1589 .unwrap();
1590 assert_eq!(
1591 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 2")
1592 .await
1593 .epoch,
1594 Barrier::new_test_barrier(test_epoch(2)).epoch
1595 );
1596
1597 barrier_tx
1598 .send(Barrier::new_test_barrier(test_epoch(3)))
1599 .unwrap();
1600 assert_eq!(
1601 expect_barrier_with_timeout(&mut executor, "snapshot progress barrier 3")
1602 .await
1603 .epoch,
1604 Barrier::new_test_barrier(test_epoch(3)).epoch
1605 );
1606
1607 barrier_tx
1608 .send(Barrier::new_test_barrier(test_epoch(4)))
1609 .unwrap();
1610 assert_eq!(
1611 expect_barrier_with_timeout(&mut executor, "snapshot completion barrier 4")
1612 .await
1613 .epoch,
1614 Barrier::new_test_barrier(test_epoch(4)).epoch
1615 );
1616
1617 barrier_tx
1618 .send(Barrier::new_test_barrier(test_epoch(5)))
1619 .unwrap();
1620 assert_eq!(
1621 expect_chunk_with_timeout(&mut executor, "log-store replay chunk").await,
1622 StreamChunk::from_pretty(
1623 " I
1624 + 4"
1625 )
1626 );
1627 assert_eq!(
1628 expect_barrier_with_timeout(&mut executor, "log-store completion barrier")
1629 .await
1630 .epoch,
1631 Barrier::new_test_barrier(test_epoch(5)).epoch
1632 );
1633
1634 upstream_tx
1635 .send(DispatcherMessage::Chunk(StreamChunk::from_pretty(" I\n + 5")).into())
1636 .await
1637 .unwrap();
1638 let stop_barrier = Barrier::new_test_barrier(test_epoch(6)).with_stop();
1639 upstream_tx
1640 .send(DispatcherMessage::Barrier(stop_barrier.clone().into_dispatcher()).into())
1641 .await
1642 .unwrap();
1643 barrier_tx.send(stop_barrier.clone()).unwrap();
1644
1645 assert_eq!(
1646 expect_chunk_with_timeout(&mut executor, "live upstream chunk after handoff").await,
1647 StreamChunk::from_pretty(" I\n + 5")
1648 );
1649 assert_eq!(
1650 expect_barrier_with_timeout(&mut executor, "final stop barrier")
1651 .await
1652 .epoch,
1653 stop_barrier.epoch
1654 );
1655 }
1656}