1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::ChangeLogRow;
35use risingwave_storage::table::batch_table::BatchTable;
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
40use crate::executor::backfill::snapshot_backfill::state::{
41 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
42};
43use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
44use crate::executor::backfill::utils::{create_builder, mapping_message};
45use crate::executor::monitor::StreamingMetrics;
46use crate::executor::prelude::{StateTable, StreamExt, try_stream};
47use crate::executor::{
48 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
49 MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, expect_first_barrier,
50};
51use crate::task::CreateMviewProgressReporter;
52
53pub struct SnapshotBackfillExecutor<S: StateStore> {
54 upstream_table: BatchTable<S>,
56
57 progress_state_table: StateTable<S>,
59
60 upstream: MergeExecutorInput,
62
63 output_indices: Vec<usize>,
65
66 progress: CreateMviewProgressReporter,
67
68 chunk_size: usize,
69 rate_limit: RateLimit,
70
71 barrier_rx: UnboundedReceiver<Barrier>,
72
73 actor_ctx: ActorContextRef,
74 metrics: Arc<StreamingMetrics>,
75
76 snapshot_epoch: Option<u64>,
77}
78
79impl<S: StateStore> SnapshotBackfillExecutor<S> {
80 #[expect(clippy::too_many_arguments)]
81 pub(crate) fn new(
82 upstream_table: BatchTable<S>,
83 progress_state_table: StateTable<S>,
84 upstream: MergeExecutorInput,
85 output_indices: Vec<usize>,
86 actor_ctx: ActorContextRef,
87 progress: CreateMviewProgressReporter,
88 chunk_size: usize,
89 rate_limit: RateLimit,
90 barrier_rx: UnboundedReceiver<Barrier>,
91 metrics: Arc<StreamingMetrics>,
92 snapshot_epoch: Option<u64>,
93 ) -> Self {
94 assert_eq!(&upstream.info.schema, upstream_table.schema());
95 if upstream_table.pk_in_output_indices().is_none() {
96 panic!(
97 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
98 upstream_table.pk_indices(),
99 upstream_table.output_indices(),
100 upstream_table.schema()
101 )
102 };
103 if !matches!(rate_limit, RateLimit::Disabled) {
104 debug!(
105 ?rate_limit,
106 "create snapshot backfill executor with rate limit"
107 );
108 }
109 Self {
110 upstream_table,
111 progress_state_table,
112 upstream,
113 output_indices,
114 progress,
115 chunk_size,
116 rate_limit,
117 barrier_rx,
118 actor_ctx,
119 metrics,
120 snapshot_epoch,
121 }
122 }
123
124 #[try_stream(ok = Message, error = StreamExecutorError)]
125 async fn execute_inner(mut self) {
126 debug!("snapshot backfill executor start");
127 let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
128 debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
129 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
130 debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
131 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
132 self.snapshot_epoch
133 {
134 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
135 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
136 Some(snapshot_epoch)
137 } else {
138 None
139 }
140 } else {
141 if cfg!(debug_assertions) {
143 panic!(
144 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
145 first_upstream_barrier.epoch, first_recv_barrier.epoch
146 );
147 } else {
148 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
149 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
150 None
151 }
152 };
153 let first_recv_barrier_epoch = first_recv_barrier.epoch;
154 yield Message::Barrier(first_recv_barrier);
155 let mut backfill_state = BackfillState::new(
156 self.progress_state_table,
157 first_recv_barrier_epoch,
158 self.upstream_table.pk_serializer().clone(),
159 )
160 .await?;
161
162 let (mut barrier_epoch, mut need_report_finish) = {
163 if let Some(snapshot_epoch) = should_snapshot_backfill {
164 let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
165 let actor_id_str = format!("{}", self.actor_ctx.id);
166
167 let consume_upstream_row_count = self
168 .metrics
169 .snapshot_backfill_consume_row_count
170 .with_guarded_label_values(&[&table_id_str, &actor_id_str, "consume_upstream"]);
171
172 let mut upstream_buffer = UpstreamBuffer::new(
173 &mut self.upstream,
174 first_upstream_barrier,
175 consume_upstream_row_count,
176 );
177
178 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
180 < snapshot_epoch
181 {
182 info!(
183 table_id = %self.upstream_table.table_id(),
184 snapshot_epoch,
185 barrier_epoch = ?first_recv_barrier_epoch,
186 "start consuming snapshot"
187 );
188 {
189 let consuming_snapshot_row_count = self
190 .metrics
191 .snapshot_backfill_consume_row_count
192 .with_guarded_label_values(&[
193 &table_id_str,
194 &actor_id_str,
195 "consuming_snapshot",
196 ]);
197 let snapshot_stream = make_consume_snapshot_stream(
198 &self.upstream_table,
199 snapshot_epoch,
200 self.chunk_size,
201 self.rate_limit,
202 &mut self.barrier_rx,
203 &mut self.progress,
204 &mut backfill_state,
205 first_recv_barrier_epoch,
206 );
207
208 pin_mut!(snapshot_stream);
209
210 while let Some(message) = upstream_buffer
211 .run_future(snapshot_stream.try_next())
212 .await?
213 {
214 if let Message::Chunk(chunk) = &message {
215 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
216 }
217 yield message;
218 }
219 }
220
221 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
222 let recv_barrier_epoch = recv_barrier.epoch;
223 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
224 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
225 yield Message::Barrier(recv_barrier);
226 post_commit.post_yield_barrier(None).await?;
227 (
228 recv_barrier_epoch,
229 upstream_buffer.start_consuming_log_store(snapshot_epoch),
230 )
231 } else {
232 info!(
233 table_id = %self.upstream_table.table_id(),
234 snapshot_epoch,
235 barrier_epoch = ?first_recv_barrier_epoch,
236 "skip consuming snapshot"
237 );
238 (
239 first_recv_barrier_epoch,
240 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
241 )
242 };
243
244 if let Some(mut upstream_buffer) = upstream_buffer {
246 let initial_pending_lag = Duration::from_millis(
247 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
248 );
249 info!(
250 ?barrier_epoch,
251 table_id = self.upstream_table.table_id().table_id,
252 ?initial_pending_lag,
253 "start consuming log store"
254 );
255
256 let consuming_log_store_row_count = self
257 .metrics
258 .snapshot_backfill_consume_row_count
259 .with_guarded_label_values(&[
260 &table_id_str,
261 &actor_id_str,
262 "consuming_log_store",
263 ]);
264 loop {
265 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
266 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
267 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
268 {
269 let next_prev_epoch =
272 self.upstream_table.next_epoch(barrier_epoch.prev).await?;
273 assert_eq!(next_prev_epoch, barrier.epoch.prev);
274 }
275 barrier_epoch = barrier.epoch;
276 debug!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
277 let mut stream = upstream_buffer
281 .run_future(make_log_stream(
282 &self.upstream_table,
283 barrier_epoch.prev,
284 None,
285 self.chunk_size,
286 ))
287 .await?;
288 while let Some(chunk) =
289 upstream_buffer.run_future(stream.try_next()).await?
290 {
291 trace!(
292 ?barrier_epoch,
293 size = chunk.cardinality(),
294 "consume change log yield chunk",
295 );
296 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
297 yield Message::Chunk(chunk);
298 }
299
300 trace!(?barrier_epoch, "after consume change log");
301
302 if is_finished {
303 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
304 self.progress.finish_consuming_log_store(barrier.epoch);
305 } else {
306 self.progress.update_create_mview_log_store_progress(
307 barrier.epoch,
308 upstream_buffer.pending_epoch_lag(),
309 );
310 }
311
312 stream
313 .for_vnode_pk_progress(|vnode, row_count, progress| {
314 assert_eq!(progress, None);
315 backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
316 })
317 .await?;
318 let post_commit = backfill_state.commit(barrier.epoch).await?;
319 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
320 yield Message::Barrier(barrier);
321 post_commit.post_yield_barrier(None).await?;
322 if update_vnode_bitmap.is_some() {
323 return Err(anyhow!(
324 "should not update vnode bitmap during consuming log store"
325 )
326 .into());
327 }
328
329 if is_finished {
330 break;
331 }
332 }
333 info!(
334 ?barrier_epoch,
335 table_id = self.upstream_table.table_id().table_id,
336 "finish consuming log store"
337 );
338
339 (barrier_epoch, false)
340 } else {
341 info!(
342 ?barrier_epoch,
343 table_id = self.upstream_table.table_id().table_id,
344 "skip consuming log store and start consuming upstream directly"
345 );
346
347 (barrier_epoch, true)
348 }
349 } else {
350 backfill_state
351 .latest_progress()
352 .for_each(|(vnode, progress)| {
353 let progress = progress.expect("should not be empty");
354 assert_eq!(
355 progress.epoch, first_upstream_barrier.epoch.prev,
356 "vnode: {:?}",
357 vnode
358 );
359 assert_eq!(
360 progress.progress,
361 EpochBackfillProgress::Consumed,
362 "vnode: {:?}",
363 vnode
364 );
365 });
366 info!(
367 table_id = self.upstream_table.table_id().table_id,
368 "skip backfill"
369 );
370 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
371 (first_upstream_barrier.epoch, true)
372 }
373 };
374 let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
375 let mut epoch_row_count = 0;
376 while let Some(msg) = upstream.try_next().await? {
378 match msg {
379 Message::Barrier(barrier) => {
380 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
381 self.upstream_table
382 .vnodes()
383 .iter_vnodes()
384 .for_each(|vnode| {
385 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
388 });
389 epoch_row_count = 0;
390 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
391 barrier_epoch = barrier.epoch;
392 if need_report_finish {
393 need_report_finish = false;
394 self.progress.finish_consuming_log_store(barrier_epoch);
395 }
396 let post_commit = backfill_state.commit(barrier.epoch).await?;
397 yield Message::Barrier(barrier);
398 if let Some(new_vnode_bitmap) =
399 post_commit.post_yield_barrier(update_vnode_bitmap).await?
400 {
401 let _prev_vnode_bitmap = self
402 .upstream_table
403 .update_vnode_bitmap(new_vnode_bitmap.clone());
404 backfill_state
405 .latest_progress()
406 .for_each(|(vnode, progress)| {
407 let progress = progress.expect("should not be empty");
408 assert_eq!(
409 progress.epoch, barrier_epoch.prev,
410 "vnode {:?} has unexpected progress epoch",
411 vnode
412 );
413 assert_eq!(
414 progress.progress,
415 EpochBackfillProgress::Consumed,
416 "vnode {:?} has unexpected progress",
417 vnode
418 );
419 });
420 }
421 }
422 msg => {
423 if let Message::Chunk(chunk) = &msg {
424 epoch_row_count += chunk.cardinality();
425 }
426 yield msg;
427 }
428 }
429 }
430 }
431}
432
433impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
434 fn execute(self: Box<Self>) -> BoxedMessageStream {
435 let output_indices = self.output_indices.clone();
436 self.execute_inner()
437 .filter_map(move |result| {
438 ready({
439 match result {
440 Ok(message) => mapping_message(message, &output_indices).map(Ok),
441 Err(e) => Some(Err(e)),
442 }
443 })
444 })
445 .boxed()
446 }
447}
448
449struct ConsumingSnapshot;
450struct ConsumingLogStore;
451
452struct PendingBarriers {
453 first_upstream_barrier_epoch: EpochPair,
454
455 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
458
459 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
463}
464
465impl PendingBarriers {
466 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
467 Self {
468 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
469 pending_non_checkpoint_barriers: Default::default(),
470 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
471 first_upstream_barrier,
472 ])]),
473 }
474 }
475
476 fn add(&mut self, barrier: DispatcherBarrier) {
477 let is_checkpoint = barrier.kind.is_checkpoint();
478 self.pending_non_checkpoint_barriers.push_front(barrier);
479 if is_checkpoint {
480 self.checkpoint_barrier_groups
481 .push_front(take(&mut self.pending_non_checkpoint_barriers));
482 }
483 }
484
485 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
486 self.checkpoint_barrier_groups.pop_back()
487 }
488
489 fn consume_epoch(&mut self, epoch: EpochPair) {
490 let barriers = self
491 .checkpoint_barrier_groups
492 .back_mut()
493 .expect("non-empty");
494 let oldest_upstream_barrier = barriers.back().expect("non-empty");
495 assert!(
496 oldest_upstream_barrier.epoch.prev >= epoch.prev,
497 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
498 oldest_upstream_barrier.epoch,
499 epoch
500 );
501 if oldest_upstream_barrier.epoch.prev == epoch.prev {
502 assert_eq!(oldest_upstream_barrier.epoch, epoch);
503 barriers.pop_back();
504 if barriers.is_empty() {
505 self.checkpoint_barrier_groups.pop_back();
506 }
507 }
508 }
509
510 fn latest_epoch(&self) -> Option<EpochPair> {
511 self.pending_non_checkpoint_barriers
512 .front()
513 .or_else(|| {
514 self.checkpoint_barrier_groups
515 .front()
516 .and_then(|barriers| barriers.front())
517 })
518 .map(|barrier| barrier.epoch)
519 }
520
521 fn checkpoint_epoch_count(&self) -> usize {
522 self.checkpoint_barrier_groups.len()
523 }
524
525 fn has_checkpoint_epoch(&self) -> bool {
526 !self.checkpoint_barrier_groups.is_empty()
527 }
528}
529
530struct UpstreamBuffer<'a, S> {
531 upstream: &'a mut MergeExecutorInput,
532 max_pending_epoch_lag: u64,
533 consumed_epoch: u64,
534 upstream_pending_barriers: PendingBarriers,
536 is_polling_epoch_data: bool,
541 consume_upstream_row_count: LabelGuardedIntCounter<3>,
542 _phase: S,
543}
544
545impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
546 fn new(
547 upstream: &'a mut MergeExecutorInput,
548 first_upstream_barrier: DispatcherBarrier,
549 consume_upstream_row_count: LabelGuardedIntCounter<3>,
550 ) -> Self {
551 Self {
552 upstream,
553 is_polling_epoch_data: false,
554 consume_upstream_row_count,
555 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
556 max_pending_epoch_lag: u64::MAX,
558 consumed_epoch: 0,
559 _phase: ConsumingSnapshot {},
560 }
561 }
562
563 fn start_consuming_log_store(
564 mut self,
565 consumed_epoch: u64,
566 ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
567 if self
568 .upstream_pending_barriers
569 .first_upstream_barrier_epoch
570 .prev
571 == consumed_epoch
572 {
573 assert_eq!(
574 1,
575 self.upstream_pending_barriers
576 .pop()
577 .expect("non-empty")
578 .len()
579 );
580 }
581 let max_pending_epoch_lag = self.pending_epoch_lag();
582 let buffer = UpstreamBuffer {
583 upstream: self.upstream,
584 upstream_pending_barriers: self.upstream_pending_barriers,
585 max_pending_epoch_lag,
586 is_polling_epoch_data: self.is_polling_epoch_data,
587 consume_upstream_row_count: self.consume_upstream_row_count,
588 consumed_epoch,
589 _phase: ConsumingLogStore {},
590 };
591 if buffer.is_finished() {
592 None
593 } else {
594 Some(buffer)
595 }
596 }
597}
598
599impl<S> UpstreamBuffer<'_, S> {
600 fn can_consume_upstream(&self) -> bool {
601 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
602 }
603
604 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
605 {
606 loop {
607 if let Err(e) = try {
608 if !self.can_consume_upstream() {
609 return pending().await;
611 }
612 self.consume_until_next_checkpoint_barrier().await?;
613 } {
614 break e;
615 }
616 }
617 }
618 }
619
620 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
622 loop {
623 let msg: DispatcherMessage = self
624 .upstream
625 .try_next()
626 .await?
627 .ok_or_else(|| anyhow!("end of upstream"))?;
628 match msg {
629 DispatcherMessage::Chunk(chunk) => {
630 self.is_polling_epoch_data = true;
631 self.consume_upstream_row_count
632 .inc_by(chunk.cardinality() as _);
633 }
634 DispatcherMessage::Barrier(barrier) => {
635 let is_checkpoint = barrier.kind.is_checkpoint();
636 self.upstream_pending_barriers.add(barrier);
637 if is_checkpoint {
638 self.is_polling_epoch_data = false;
639 break;
640 } else {
641 self.is_polling_epoch_data = true;
642 }
643 }
644 DispatcherMessage::Watermark(_) => {
645 self.is_polling_epoch_data = true;
646 }
647 }
648 }
649 Ok(())
650 }
651}
652
653impl UpstreamBuffer<'_, ConsumingLogStore> {
654 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
655 assert!(!self.is_finished());
656 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
657 assert!(self.is_polling_epoch_data);
660 self.consume_until_next_checkpoint_barrier().await?;
661 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
662 }
663 self.upstream_pending_barriers.consume_epoch(epoch);
664
665 {
666 {
667 let prev_epoch = epoch.prev;
668 assert!(self.consumed_epoch < prev_epoch);
669 let elapsed_epoch = prev_epoch - self.consumed_epoch;
670 self.consumed_epoch = prev_epoch;
671 if self.upstream_pending_barriers.has_checkpoint_epoch() {
672 while self.can_consume_upstream()
674 && let Some(result) =
675 self.consume_until_next_checkpoint_barrier().now_or_never()
676 {
677 result?;
678 }
679 }
680 self.max_pending_epoch_lag = min(
684 self.pending_epoch_lag(),
685 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
686 );
687 }
688 }
689 Ok(self.is_finished())
690 }
691
692 fn is_finished(&self) -> bool {
693 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
694 assert!(
695 self.upstream_pending_barriers
696 .pending_non_checkpoint_barriers
697 .is_empty()
698 )
699 }
700 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
701 }
702}
703
704impl<S> UpstreamBuffer<'_, S> {
705 async fn run_future<T, E: Into<StreamExecutorError>>(
708 &mut self,
709 future: impl Future<Output = Result<T, E>>,
710 ) -> StreamExecutorResult<T> {
711 select! {
712 biased;
713 e = self.concurrently_consume_upstream() => {
714 Err(e)
715 }
716 result = future => {
718 result.map_err(Into::into)
719 }
720 }
721 }
722
723 fn pending_epoch_lag(&self) -> u64 {
724 self.upstream_pending_barriers
725 .latest_epoch()
726 .map(|epoch| {
727 epoch
728 .prev
729 .checked_sub(self.consumed_epoch)
730 .expect("pending epoch must be later than consumed_epoch")
731 })
732 .unwrap_or(0)
733 }
734}
735
736async fn make_log_stream(
737 upstream_table: &BatchTable<impl StateStore>,
738 prev_epoch: u64,
739 start_pk: Option<OwnedRow>,
740 chunk_size: usize,
741) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
742 let data_types = upstream_table.schema().data_types();
743 let start_pk = start_pk.as_ref();
744 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
746 upstream_table
747 .batch_iter_vnode_log(
748 prev_epoch,
749 HummockReadEpoch::Committed(prev_epoch),
750 start_pk,
751 vnode,
752 )
753 .map_ok(move |stream| {
754 let stream = stream.map_err(Into::into);
755 (vnode, stream, 0)
756 })
757 }))
758 .await?;
759 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
760 Ok(VnodeStream::new(
761 vnode_streams,
762 upstream_table.pk_in_output_indices().expect("should exist"),
763 builder,
764 ))
765}
766
767async fn make_snapshot_stream(
768 upstream_table: &BatchTable<impl StateStore>,
769 snapshot_epoch: u64,
770 backfill_state: &BackfillState<impl StateStore>,
771 rate_limit: RateLimit,
772 chunk_size: usize,
773) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
774 let data_types = upstream_table.schema().data_types();
775 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
776 move |(vnode, progress)| {
777 let start_pk = match progress {
778 None => Some((None, 0)),
779 Some(VnodeBackfillProgress {
780 row_count,
781 progress: EpochBackfillProgress::Consuming { latest_pk },
782 ..
783 }) => Some((Some(latest_pk), *row_count)),
784 Some(VnodeBackfillProgress {
785 progress: EpochBackfillProgress::Consumed,
786 ..
787 }) => None,
788 };
789 start_pk.map(|(start_pk, row_count)| {
790 upstream_table
791 .batch_iter_vnode(
792 HummockReadEpoch::Committed(snapshot_epoch),
793 start_pk,
794 vnode,
795 PrefetchOptions::prefetch_for_large_range_scan(),
796 )
797 .map_ok(move |stream| {
798 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
799 (vnode, stream, row_count)
800 })
801 })
802 },
803 ))
804 .await?;
805 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
806 Ok(VnodeStream::new(
807 vnode_streams,
808 upstream_table.pk_in_output_indices().expect("should exist"),
809 builder,
810 ))
811}
812
813#[expect(clippy::too_many_arguments)]
814#[try_stream(ok = Message, error = StreamExecutorError)]
815async fn make_consume_snapshot_stream<'a, S: StateStore>(
816 upstream_table: &'a BatchTable<S>,
817 snapshot_epoch: u64,
818 chunk_size: usize,
819 rate_limit: RateLimit,
820 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
821 progress: &'a mut CreateMviewProgressReporter,
822 backfill_state: &'a mut BackfillState<S>,
823 first_recv_barrier_epoch: EpochPair,
824) {
825 let mut barrier_epoch = first_recv_barrier_epoch;
826
827 let mut snapshot_stream = make_snapshot_stream(
829 upstream_table,
830 snapshot_epoch,
831 &*backfill_state,
832 rate_limit,
833 chunk_size,
834 )
835 .await?;
836
837 async fn select_barrier_and_snapshot_stream(
838 barrier_rx: &mut UnboundedReceiver<Barrier>,
839 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
840 throttle_snapshot_stream: bool,
841 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
842 select!(
843 result = receive_next_barrier(barrier_rx) => {
844 Ok(Either::Left(result?))
845 },
846 result = snapshot_stream.try_next(), if !throttle_snapshot_stream => {
847 Ok(Either::Right(result?))
848 }
849 )
850 }
851
852 let mut count = 0;
853 let mut epoch_row_count = 0;
854 loop {
855 let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
856 match select_barrier_and_snapshot_stream(
857 barrier_rx,
858 &mut snapshot_stream,
859 throttle_snapshot_stream,
860 )
861 .await?
862 {
863 Either::Left(barrier) => {
864 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
865 barrier_epoch = barrier.epoch;
866 if barrier_epoch.curr >= snapshot_epoch {
867 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
868 }
869 if let Some(chunk) = snapshot_stream.consume_builder() {
870 count += chunk.cardinality();
871 epoch_row_count += chunk.cardinality();
872 yield Message::Chunk(chunk);
873 }
874 snapshot_stream
875 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
876 if let Some(pk) = pk_progress {
877 backfill_state.update_epoch_progress(
878 vnode,
879 snapshot_epoch,
880 row_count,
881 pk,
882 );
883 } else {
884 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
885 }
886 })
887 .await?;
888 let post_commit = backfill_state.commit(barrier.epoch).await?;
889 debug!(?barrier_epoch, count, epoch_row_count, "update progress");
890 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
891 epoch_row_count = 0;
892
893 yield Message::Barrier(barrier);
894 post_commit.post_yield_barrier(None).await?;
895 }
896 Either::Right(Some(chunk)) => {
897 count += chunk.cardinality();
898 epoch_row_count += chunk.cardinality();
899 yield Message::Chunk(chunk);
900 }
901 Either::Right(None) => {
902 break;
903 }
904 }
905 }
906
907 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
909 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
910 barrier_epoch = barrier_to_report_finish.epoch;
911 info!(?barrier_epoch, count, "report finish");
912 snapshot_stream
913 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
914 assert_eq!(pk_progress, None);
915 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
916 })
917 .await?;
918 let post_commit = backfill_state.commit(barrier_epoch).await?;
919 progress.finish(barrier_epoch, count as _);
920 yield Message::Barrier(barrier_to_report_finish);
921 post_commit.post_yield_barrier(None).await?;
922
923 loop {
925 let barrier = receive_next_barrier(barrier_rx).await?;
926 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
927 barrier_epoch = barrier.epoch;
928 let post_commit = backfill_state.commit(barrier.epoch).await?;
929 yield Message::Barrier(barrier);
930 post_commit.post_yield_barrier(None).await?;
931 if barrier_epoch.curr == snapshot_epoch {
932 break;
933 }
934 }
935 info!(?barrier_epoch, "finish consuming snapshot");
936}