1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::ChangeLogRow;
35use risingwave_storage::table::batch_table::BatchTable;
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
40use crate::executor::backfill::snapshot_backfill::state::{
41 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
42};
43use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
44use crate::executor::backfill::utils::{create_builder, mapping_message};
45use crate::executor::monitor::StreamingMetrics;
46use crate::executor::prelude::{StateTable, StreamExt, try_stream};
47use crate::executor::{
48 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
49 MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, expect_first_barrier,
50};
51use crate::task::CreateMviewProgressReporter;
52
53pub struct SnapshotBackfillExecutor<S: StateStore> {
54 upstream_table: BatchTable<S>,
56
57 progress_state_table: StateTable<S>,
59
60 upstream: MergeExecutorInput,
62
63 output_indices: Vec<usize>,
65
66 progress: CreateMviewProgressReporter,
67
68 chunk_size: usize,
69 rate_limit: RateLimit,
70
71 barrier_rx: UnboundedReceiver<Barrier>,
72
73 actor_ctx: ActorContextRef,
74 metrics: Arc<StreamingMetrics>,
75
76 snapshot_epoch: Option<u64>,
77}
78
79impl<S: StateStore> SnapshotBackfillExecutor<S> {
80 #[expect(clippy::too_many_arguments)]
81 pub(crate) fn new(
82 upstream_table: BatchTable<S>,
83 progress_state_table: StateTable<S>,
84 upstream: MergeExecutorInput,
85 output_indices: Vec<usize>,
86 actor_ctx: ActorContextRef,
87 progress: CreateMviewProgressReporter,
88 chunk_size: usize,
89 rate_limit: RateLimit,
90 barrier_rx: UnboundedReceiver<Barrier>,
91 metrics: Arc<StreamingMetrics>,
92 snapshot_epoch: Option<u64>,
93 ) -> Self {
94 assert_eq!(&upstream.info.schema, upstream_table.schema());
95 if upstream_table.pk_in_output_indices().is_none() {
96 panic!(
97 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
98 upstream_table.pk_indices(),
99 upstream_table.output_indices(),
100 upstream_table.schema()
101 )
102 };
103 if !matches!(rate_limit, RateLimit::Disabled) {
104 debug!(
105 ?rate_limit,
106 "create snapshot backfill executor with rate limit"
107 );
108 }
109 Self {
110 upstream_table,
111 progress_state_table,
112 upstream,
113 output_indices,
114 progress,
115 chunk_size,
116 rate_limit,
117 barrier_rx,
118 actor_ctx,
119 metrics,
120 snapshot_epoch,
121 }
122 }
123
124 #[try_stream(ok = Message, error = StreamExecutorError)]
125 async fn execute_inner(mut self) {
126 debug!("snapshot backfill executor start");
127 let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
128 debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
129 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
130 debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
131 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
132 self.snapshot_epoch
133 {
134 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
135 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
136 Some(snapshot_epoch)
137 } else {
138 None
139 }
140 } else {
141 if cfg!(debug_assertions) {
143 panic!(
144 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
145 first_upstream_barrier.epoch, first_recv_barrier.epoch
146 );
147 } else {
148 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
149 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
150 None
151 }
152 };
153 let first_recv_barrier_epoch = first_recv_barrier.epoch;
154 yield Message::Barrier(first_recv_barrier);
155 let mut backfill_state = BackfillState::new(
156 self.progress_state_table,
157 first_recv_barrier_epoch,
158 self.upstream_table.pk_serializer().clone(),
159 )
160 .await?;
161
162 let (mut barrier_epoch, mut need_report_finish) = {
163 if let Some(snapshot_epoch) = should_snapshot_backfill {
164 let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
165 let actor_id_str = format!("{}", self.actor_ctx.id);
166
167 let consume_upstream_row_count = self
168 .metrics
169 .snapshot_backfill_consume_row_count
170 .with_guarded_label_values(&[
171 table_id_str.as_str(),
172 actor_id_str.as_str(),
173 "consume_upstream",
174 ]);
175
176 let mut upstream_buffer = UpstreamBuffer::new(
177 &mut self.upstream,
178 first_upstream_barrier,
179 consume_upstream_row_count,
180 );
181
182 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
184 < snapshot_epoch
185 {
186 info!(
187 table_id = %self.upstream_table.table_id(),
188 snapshot_epoch,
189 barrier_epoch = ?first_recv_barrier_epoch,
190 "start consuming snapshot"
191 );
192 {
193 let consuming_snapshot_row_count = self
194 .metrics
195 .snapshot_backfill_consume_row_count
196 .with_guarded_label_values(&[
197 table_id_str.as_str(),
198 actor_id_str.as_str(),
199 "consuming_snapshot",
200 ]);
201 let snapshot_stream = make_consume_snapshot_stream(
202 &self.upstream_table,
203 snapshot_epoch,
204 self.chunk_size,
205 self.rate_limit,
206 &mut self.barrier_rx,
207 &mut self.progress,
208 &mut backfill_state,
209 first_recv_barrier_epoch,
210 );
211
212 pin_mut!(snapshot_stream);
213
214 while let Some(message) = upstream_buffer
215 .run_future(snapshot_stream.try_next())
216 .await?
217 {
218 if let Message::Chunk(chunk) = &message {
219 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
220 }
221 yield message;
222 }
223 }
224
225 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
226 let recv_barrier_epoch = recv_barrier.epoch;
227 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
228 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
229 yield Message::Barrier(recv_barrier);
230 post_commit.post_yield_barrier(None).await?;
231 (
232 recv_barrier_epoch,
233 upstream_buffer.start_consuming_log_store(snapshot_epoch),
234 )
235 } else {
236 info!(
237 table_id = %self.upstream_table.table_id(),
238 snapshot_epoch,
239 barrier_epoch = ?first_recv_barrier_epoch,
240 "skip consuming snapshot"
241 );
242 (
243 first_recv_barrier_epoch,
244 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
245 )
246 };
247
248 if let Some(mut upstream_buffer) = upstream_buffer {
250 let initial_pending_lag = Duration::from_millis(
251 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
252 );
253 info!(
254 ?barrier_epoch,
255 table_id = self.upstream_table.table_id().table_id,
256 ?initial_pending_lag,
257 "start consuming log store"
258 );
259
260 let consuming_log_store_row_count = self
261 .metrics
262 .snapshot_backfill_consume_row_count
263 .with_guarded_label_values(&[
264 table_id_str.as_str(),
265 actor_id_str.as_str(),
266 "consuming_log_store",
267 ]);
268 loop {
269 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
270 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
271 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
272 {
273 let next_prev_epoch =
276 self.upstream_table.next_epoch(barrier_epoch.prev).await?;
277 assert_eq!(next_prev_epoch, barrier.epoch.prev);
278 }
279 barrier_epoch = barrier.epoch;
280 debug!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
281 let mut stream = upstream_buffer
285 .run_future(make_log_stream(
286 &self.upstream_table,
287 barrier_epoch.prev,
288 None,
289 self.chunk_size,
290 ))
291 .await?;
292 while let Some(chunk) =
293 upstream_buffer.run_future(stream.try_next()).await?
294 {
295 trace!(
296 ?barrier_epoch,
297 size = chunk.cardinality(),
298 "consume change log yield chunk",
299 );
300 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
301 yield Message::Chunk(chunk);
302 }
303
304 trace!(?barrier_epoch, "after consume change log");
305
306 if is_finished {
307 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
308 self.progress.finish_consuming_log_store(barrier.epoch);
309 } else {
310 self.progress.update_create_mview_log_store_progress(
311 barrier.epoch,
312 upstream_buffer.pending_epoch_lag(),
313 );
314 }
315
316 stream
317 .for_vnode_pk_progress(|vnode, row_count, progress| {
318 assert_eq!(progress, None);
319 backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
320 })
321 .await?;
322 let post_commit = backfill_state.commit(barrier.epoch).await?;
323 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
324 yield Message::Barrier(barrier);
325 post_commit.post_yield_barrier(None).await?;
326 if update_vnode_bitmap.is_some() {
327 return Err(anyhow!(
328 "should not update vnode bitmap during consuming log store"
329 )
330 .into());
331 }
332
333 if is_finished {
334 break;
335 }
336 }
337 info!(
338 ?barrier_epoch,
339 table_id = self.upstream_table.table_id().table_id,
340 "finish consuming log store"
341 );
342
343 (barrier_epoch, false)
344 } else {
345 info!(
346 ?barrier_epoch,
347 table_id = self.upstream_table.table_id().table_id,
348 "skip consuming log store and start consuming upstream directly"
349 );
350
351 (barrier_epoch, true)
352 }
353 } else {
354 backfill_state
355 .latest_progress()
356 .for_each(|(vnode, progress)| {
357 let progress = progress.expect("should not be empty");
358 assert_eq!(
359 progress.epoch, first_upstream_barrier.epoch.prev,
360 "vnode: {:?}",
361 vnode
362 );
363 assert_eq!(
364 progress.progress,
365 EpochBackfillProgress::Consumed,
366 "vnode: {:?}",
367 vnode
368 );
369 });
370 info!(
371 table_id = self.upstream_table.table_id().table_id,
372 "skip backfill"
373 );
374 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
375 (first_upstream_barrier.epoch, true)
376 }
377 };
378 let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
379 let mut epoch_row_count = 0;
380 while let Some(msg) = upstream.try_next().await? {
382 match msg {
383 Message::Barrier(barrier) => {
384 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
385 self.upstream_table
386 .vnodes()
387 .iter_vnodes()
388 .for_each(|vnode| {
389 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
392 });
393 epoch_row_count = 0;
394 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
395 barrier_epoch = barrier.epoch;
396 if need_report_finish {
397 need_report_finish = false;
398 self.progress.finish_consuming_log_store(barrier_epoch);
399 }
400 let post_commit = backfill_state.commit(barrier.epoch).await?;
401 yield Message::Barrier(barrier);
402 if let Some(new_vnode_bitmap) =
403 post_commit.post_yield_barrier(update_vnode_bitmap).await?
404 {
405 let _prev_vnode_bitmap = self
406 .upstream_table
407 .update_vnode_bitmap(new_vnode_bitmap.clone());
408 backfill_state
409 .latest_progress()
410 .for_each(|(vnode, progress)| {
411 let progress = progress.expect("should not be empty");
412 assert_eq!(
413 progress.epoch, barrier_epoch.prev,
414 "vnode {:?} has unexpected progress epoch",
415 vnode
416 );
417 assert_eq!(
418 progress.progress,
419 EpochBackfillProgress::Consumed,
420 "vnode {:?} has unexpected progress",
421 vnode
422 );
423 });
424 }
425 }
426 msg => {
427 if let Message::Chunk(chunk) = &msg {
428 epoch_row_count += chunk.cardinality();
429 }
430 yield msg;
431 }
432 }
433 }
434 }
435}
436
437impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
438 fn execute(self: Box<Self>) -> BoxedMessageStream {
439 let output_indices = self.output_indices.clone();
440 self.execute_inner()
441 .filter_map(move |result| {
442 ready({
443 match result {
444 Ok(message) => mapping_message(message, &output_indices).map(Ok),
445 Err(e) => Some(Err(e)),
446 }
447 })
448 })
449 .boxed()
450 }
451}
452
453struct ConsumingSnapshot;
454struct ConsumingLogStore;
455
456struct PendingBarriers {
457 first_upstream_barrier_epoch: EpochPair,
458
459 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
462
463 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
467}
468
469impl PendingBarriers {
470 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
471 Self {
472 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
473 pending_non_checkpoint_barriers: Default::default(),
474 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
475 first_upstream_barrier,
476 ])]),
477 }
478 }
479
480 fn add(&mut self, barrier: DispatcherBarrier) {
481 let is_checkpoint = barrier.kind.is_checkpoint();
482 self.pending_non_checkpoint_barriers.push_front(barrier);
483 if is_checkpoint {
484 self.checkpoint_barrier_groups
485 .push_front(take(&mut self.pending_non_checkpoint_barriers));
486 }
487 }
488
489 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
490 self.checkpoint_barrier_groups.pop_back()
491 }
492
493 fn consume_epoch(&mut self, epoch: EpochPair) {
494 let barriers = self
495 .checkpoint_barrier_groups
496 .back_mut()
497 .expect("non-empty");
498 let oldest_upstream_barrier = barriers.back().expect("non-empty");
499 assert!(
500 oldest_upstream_barrier.epoch.prev >= epoch.prev,
501 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
502 oldest_upstream_barrier.epoch,
503 epoch
504 );
505 if oldest_upstream_barrier.epoch.prev == epoch.prev {
506 assert_eq!(oldest_upstream_barrier.epoch, epoch);
507 barriers.pop_back();
508 if barriers.is_empty() {
509 self.checkpoint_barrier_groups.pop_back();
510 }
511 }
512 }
513
514 fn latest_epoch(&self) -> Option<EpochPair> {
515 self.pending_non_checkpoint_barriers
516 .front()
517 .or_else(|| {
518 self.checkpoint_barrier_groups
519 .front()
520 .and_then(|barriers| barriers.front())
521 })
522 .map(|barrier| barrier.epoch)
523 }
524
525 fn checkpoint_epoch_count(&self) -> usize {
526 self.checkpoint_barrier_groups.len()
527 }
528
529 fn has_checkpoint_epoch(&self) -> bool {
530 !self.checkpoint_barrier_groups.is_empty()
531 }
532}
533
534struct UpstreamBuffer<'a, S> {
535 upstream: &'a mut MergeExecutorInput,
536 max_pending_epoch_lag: u64,
537 consumed_epoch: u64,
538 upstream_pending_barriers: PendingBarriers,
540 is_polling_epoch_data: bool,
545 consume_upstream_row_count: LabelGuardedIntCounter,
546 _phase: S,
547}
548
549impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
550 fn new(
551 upstream: &'a mut MergeExecutorInput,
552 first_upstream_barrier: DispatcherBarrier,
553 consume_upstream_row_count: LabelGuardedIntCounter,
554 ) -> Self {
555 Self {
556 upstream,
557 is_polling_epoch_data: false,
558 consume_upstream_row_count,
559 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
560 max_pending_epoch_lag: u64::MAX,
562 consumed_epoch: 0,
563 _phase: ConsumingSnapshot {},
564 }
565 }
566
567 fn start_consuming_log_store(
568 mut self,
569 consumed_epoch: u64,
570 ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
571 if self
572 .upstream_pending_barriers
573 .first_upstream_barrier_epoch
574 .prev
575 == consumed_epoch
576 {
577 assert_eq!(
578 1,
579 self.upstream_pending_barriers
580 .pop()
581 .expect("non-empty")
582 .len()
583 );
584 }
585 let max_pending_epoch_lag = self.pending_epoch_lag();
586 let buffer = UpstreamBuffer {
587 upstream: self.upstream,
588 upstream_pending_barriers: self.upstream_pending_barriers,
589 max_pending_epoch_lag,
590 is_polling_epoch_data: self.is_polling_epoch_data,
591 consume_upstream_row_count: self.consume_upstream_row_count,
592 consumed_epoch,
593 _phase: ConsumingLogStore {},
594 };
595 if buffer.is_finished() {
596 None
597 } else {
598 Some(buffer)
599 }
600 }
601}
602
603impl<S> UpstreamBuffer<'_, S> {
604 fn can_consume_upstream(&self) -> bool {
605 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
606 }
607
608 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
609 {
610 loop {
611 if let Err(e) = try {
612 if !self.can_consume_upstream() {
613 return pending().await;
615 }
616 self.consume_until_next_checkpoint_barrier().await?;
617 } {
618 break e;
619 }
620 }
621 }
622 }
623
624 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
626 loop {
627 let msg: DispatcherMessage = self
628 .upstream
629 .try_next()
630 .await?
631 .ok_or_else(|| anyhow!("end of upstream"))?;
632 match msg {
633 DispatcherMessage::Chunk(chunk) => {
634 self.is_polling_epoch_data = true;
635 self.consume_upstream_row_count
636 .inc_by(chunk.cardinality() as _);
637 }
638 DispatcherMessage::Barrier(barrier) => {
639 let is_checkpoint = barrier.kind.is_checkpoint();
640 self.upstream_pending_barriers.add(barrier);
641 if is_checkpoint {
642 self.is_polling_epoch_data = false;
643 break;
644 } else {
645 self.is_polling_epoch_data = true;
646 }
647 }
648 DispatcherMessage::Watermark(_) => {
649 self.is_polling_epoch_data = true;
650 }
651 }
652 }
653 Ok(())
654 }
655}
656
657impl UpstreamBuffer<'_, ConsumingLogStore> {
658 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
659 assert!(!self.is_finished());
660 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
661 assert!(self.is_polling_epoch_data);
664 self.consume_until_next_checkpoint_barrier().await?;
665 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
666 }
667 self.upstream_pending_barriers.consume_epoch(epoch);
668
669 {
670 {
671 let prev_epoch = epoch.prev;
672 assert!(self.consumed_epoch < prev_epoch);
673 let elapsed_epoch = prev_epoch - self.consumed_epoch;
674 self.consumed_epoch = prev_epoch;
675 if self.upstream_pending_barriers.has_checkpoint_epoch() {
676 while self.can_consume_upstream()
678 && let Some(result) =
679 self.consume_until_next_checkpoint_barrier().now_or_never()
680 {
681 result?;
682 }
683 }
684 self.max_pending_epoch_lag = min(
688 self.pending_epoch_lag(),
689 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
690 );
691 }
692 }
693 Ok(self.is_finished())
694 }
695
696 fn is_finished(&self) -> bool {
697 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
698 assert!(
699 self.upstream_pending_barriers
700 .pending_non_checkpoint_barriers
701 .is_empty()
702 )
703 }
704 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
705 }
706}
707
708impl<S> UpstreamBuffer<'_, S> {
709 async fn run_future<T, E: Into<StreamExecutorError>>(
712 &mut self,
713 future: impl Future<Output = Result<T, E>>,
714 ) -> StreamExecutorResult<T> {
715 select! {
716 biased;
717 e = self.concurrently_consume_upstream() => {
718 Err(e)
719 }
720 result = future => {
722 result.map_err(Into::into)
723 }
724 }
725 }
726
727 fn pending_epoch_lag(&self) -> u64 {
728 self.upstream_pending_barriers
729 .latest_epoch()
730 .map(|epoch| {
731 epoch
732 .prev
733 .checked_sub(self.consumed_epoch)
734 .expect("pending epoch must be later than consumed_epoch")
735 })
736 .unwrap_or(0)
737 }
738}
739
740async fn make_log_stream(
741 upstream_table: &BatchTable<impl StateStore>,
742 prev_epoch: u64,
743 start_pk: Option<OwnedRow>,
744 chunk_size: usize,
745) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
746 let data_types = upstream_table.schema().data_types();
747 let start_pk = start_pk.as_ref();
748 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
750 upstream_table
751 .batch_iter_vnode_log(
752 prev_epoch,
753 HummockReadEpoch::Committed(prev_epoch),
754 start_pk,
755 vnode,
756 )
757 .map_ok(move |stream| {
758 let stream = stream.map_err(Into::into);
759 (vnode, stream, 0)
760 })
761 }))
762 .await?;
763 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
764 Ok(VnodeStream::new(
765 vnode_streams,
766 upstream_table.pk_in_output_indices().expect("should exist"),
767 builder,
768 ))
769}
770
771async fn make_snapshot_stream(
772 upstream_table: &BatchTable<impl StateStore>,
773 snapshot_epoch: u64,
774 backfill_state: &BackfillState<impl StateStore>,
775 rate_limit: RateLimit,
776 chunk_size: usize,
777) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
778 let data_types = upstream_table.schema().data_types();
779 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
780 move |(vnode, progress)| {
781 let start_pk = match progress {
782 None => Some((None, 0)),
783 Some(VnodeBackfillProgress {
784 row_count,
785 progress: EpochBackfillProgress::Consuming { latest_pk },
786 ..
787 }) => Some((Some(latest_pk), *row_count)),
788 Some(VnodeBackfillProgress {
789 progress: EpochBackfillProgress::Consumed,
790 ..
791 }) => None,
792 };
793 start_pk.map(|(start_pk, row_count)| {
794 upstream_table
795 .batch_iter_vnode(
796 HummockReadEpoch::Committed(snapshot_epoch),
797 start_pk,
798 vnode,
799 PrefetchOptions::prefetch_for_large_range_scan(),
800 )
801 .map_ok(move |stream| {
802 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
803 (vnode, stream, row_count)
804 })
805 })
806 },
807 ))
808 .await?;
809 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
810 Ok(VnodeStream::new(
811 vnode_streams,
812 upstream_table.pk_in_output_indices().expect("should exist"),
813 builder,
814 ))
815}
816
817#[expect(clippy::too_many_arguments)]
818#[try_stream(ok = Message, error = StreamExecutorError)]
819async fn make_consume_snapshot_stream<'a, S: StateStore>(
820 upstream_table: &'a BatchTable<S>,
821 snapshot_epoch: u64,
822 chunk_size: usize,
823 rate_limit: RateLimit,
824 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
825 progress: &'a mut CreateMviewProgressReporter,
826 backfill_state: &'a mut BackfillState<S>,
827 first_recv_barrier_epoch: EpochPair,
828) {
829 let mut barrier_epoch = first_recv_barrier_epoch;
830
831 let mut snapshot_stream = make_snapshot_stream(
833 upstream_table,
834 snapshot_epoch,
835 &*backfill_state,
836 rate_limit,
837 chunk_size,
838 )
839 .await?;
840
841 async fn select_barrier_and_snapshot_stream(
842 barrier_rx: &mut UnboundedReceiver<Barrier>,
843 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
844 throttle_snapshot_stream: bool,
845 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
846 select!(
847 result = receive_next_barrier(barrier_rx) => {
848 Ok(Either::Left(result?))
849 },
850 result = snapshot_stream.try_next(), if !throttle_snapshot_stream => {
851 Ok(Either::Right(result?))
852 }
853 )
854 }
855
856 let mut count = 0;
857 let mut epoch_row_count = 0;
858 loop {
859 let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
860 match select_barrier_and_snapshot_stream(
861 barrier_rx,
862 &mut snapshot_stream,
863 throttle_snapshot_stream,
864 )
865 .await?
866 {
867 Either::Left(barrier) => {
868 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
869 barrier_epoch = barrier.epoch;
870 if barrier_epoch.curr >= snapshot_epoch {
871 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
872 }
873 if let Some(chunk) = snapshot_stream.consume_builder() {
874 count += chunk.cardinality();
875 epoch_row_count += chunk.cardinality();
876 yield Message::Chunk(chunk);
877 }
878 snapshot_stream
879 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
880 if let Some(pk) = pk_progress {
881 backfill_state.update_epoch_progress(
882 vnode,
883 snapshot_epoch,
884 row_count,
885 pk,
886 );
887 } else {
888 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
889 }
890 })
891 .await?;
892 let post_commit = backfill_state.commit(barrier.epoch).await?;
893 debug!(?barrier_epoch, count, epoch_row_count, "update progress");
894 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
895 epoch_row_count = 0;
896
897 yield Message::Barrier(barrier);
898 post_commit.post_yield_barrier(None).await?;
899 }
900 Either::Right(Some(chunk)) => {
901 count += chunk.cardinality();
902 epoch_row_count += chunk.cardinality();
903 yield Message::Chunk(chunk);
904 }
905 Either::Right(None) => {
906 break;
907 }
908 }
909 }
910
911 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
913 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
914 barrier_epoch = barrier_to_report_finish.epoch;
915 info!(?barrier_epoch, count, "report finish");
916 snapshot_stream
917 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
918 assert_eq!(pk_progress, None);
919 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
920 })
921 .await?;
922 let post_commit = backfill_state.commit(barrier_epoch).await?;
923 progress.finish(barrier_epoch, count as _);
924 yield Message::Barrier(barrier_to_report_finish);
925 post_commit.post_yield_barrier(None).await?;
926
927 loop {
929 let barrier = receive_next_barrier(barrier_rx).await?;
930 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
931 barrier_epoch = barrier.epoch;
932 let post_commit = backfill_state.commit(barrier.epoch).await?;
933 yield Message::Barrier(barrier);
934 post_commit.post_yield_barrier(None).await?;
935 if barrier_epoch.curr == snapshot_epoch {
936 break;
937 }
938 }
939 info!(?barrier_epoch, "finish consuming snapshot");
940}