1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::ChangeLogRow;
35use risingwave_storage::table::batch_table::BatchTable;
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
40use crate::executor::backfill::snapshot_backfill::state::{
41 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
42};
43use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
44use crate::executor::backfill::utils::{create_builder, mapping_message};
45use crate::executor::monitor::StreamingMetrics;
46use crate::executor::prelude::{StateTable, StreamExt, try_stream};
47use crate::executor::{
48 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
49 MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, expect_first_barrier,
50};
51use crate::task::CreateMviewProgressReporter;
52
53pub struct SnapshotBackfillExecutor<S: StateStore> {
54 upstream_table: BatchTable<S>,
56
57 progress_state_table: StateTable<S>,
59
60 upstream: MergeExecutorInput,
62
63 output_indices: Vec<usize>,
65
66 progress: CreateMviewProgressReporter,
67
68 chunk_size: usize,
69 rate_limit: RateLimit,
70
71 barrier_rx: UnboundedReceiver<Barrier>,
72
73 actor_ctx: ActorContextRef,
74 metrics: Arc<StreamingMetrics>,
75
76 snapshot_epoch: Option<u64>,
77}
78
79impl<S: StateStore> SnapshotBackfillExecutor<S> {
80 #[expect(clippy::too_many_arguments)]
81 pub(crate) fn new(
82 upstream_table: BatchTable<S>,
83 progress_state_table: StateTable<S>,
84 upstream: MergeExecutorInput,
85 output_indices: Vec<usize>,
86 actor_ctx: ActorContextRef,
87 progress: CreateMviewProgressReporter,
88 chunk_size: usize,
89 rate_limit: RateLimit,
90 barrier_rx: UnboundedReceiver<Barrier>,
91 metrics: Arc<StreamingMetrics>,
92 snapshot_epoch: Option<u64>,
93 ) -> Self {
94 assert_eq!(&upstream.info.schema, upstream_table.schema());
95 if upstream_table.pk_in_output_indices().is_none() {
96 panic!(
97 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
98 upstream_table.pk_indices(),
99 upstream_table.output_indices(),
100 upstream_table.schema()
101 )
102 };
103 if !matches!(rate_limit, RateLimit::Disabled) {
104 debug!(
105 ?rate_limit,
106 "create snapshot backfill executor with rate limit"
107 );
108 }
109 Self {
110 upstream_table,
111 progress_state_table,
112 upstream,
113 output_indices,
114 progress,
115 chunk_size,
116 rate_limit,
117 barrier_rx,
118 actor_ctx,
119 metrics,
120 snapshot_epoch,
121 }
122 }
123
124 #[try_stream(ok = Message, error = StreamExecutorError)]
125 async fn execute_inner(mut self) {
126 debug!("snapshot backfill executor start");
127 let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
128 debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
129 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
130 debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
131 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
132 self.snapshot_epoch
133 {
134 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
135 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
136 Some(snapshot_epoch)
137 } else {
138 None
139 }
140 } else {
141 if cfg!(debug_assertions) {
143 panic!(
144 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
145 first_upstream_barrier.epoch, first_recv_barrier.epoch
146 );
147 } else {
148 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
149 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
150 None
151 }
152 };
153 let first_recv_barrier_epoch = first_recv_barrier.epoch;
154 let initial_backfill_paused =
155 first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
156 yield Message::Barrier(first_recv_barrier);
157 let mut backfill_state = BackfillState::new(
158 self.progress_state_table,
159 first_recv_barrier_epoch,
160 self.upstream_table.pk_serializer().clone(),
161 )
162 .await?;
163
164 let (mut barrier_epoch, mut need_report_finish) = {
165 if let Some(snapshot_epoch) = should_snapshot_backfill {
166 let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
167 let actor_id_str = format!("{}", self.actor_ctx.id);
168
169 let consume_upstream_row_count = self
170 .metrics
171 .snapshot_backfill_consume_row_count
172 .with_guarded_label_values(&[
173 table_id_str.as_str(),
174 actor_id_str.as_str(),
175 "consume_upstream",
176 ]);
177
178 let mut upstream_buffer = UpstreamBuffer::new(
179 &mut self.upstream,
180 first_upstream_barrier,
181 consume_upstream_row_count,
182 );
183
184 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
186 < snapshot_epoch
187 {
188 info!(
189 table_id = %self.upstream_table.table_id(),
190 snapshot_epoch,
191 barrier_epoch = ?first_recv_barrier_epoch,
192 "start consuming snapshot"
193 );
194 {
195 let consuming_snapshot_row_count = self
196 .metrics
197 .snapshot_backfill_consume_row_count
198 .with_guarded_label_values(&[
199 table_id_str.as_str(),
200 actor_id_str.as_str(),
201 "consuming_snapshot",
202 ]);
203 let snapshot_stream = make_consume_snapshot_stream(
204 &self.upstream_table,
205 snapshot_epoch,
206 self.chunk_size,
207 self.rate_limit,
208 &mut self.barrier_rx,
209 &mut self.progress,
210 &mut backfill_state,
211 first_recv_barrier_epoch,
212 initial_backfill_paused,
213 &self.actor_ctx,
214 );
215
216 pin_mut!(snapshot_stream);
217
218 while let Some(message) = upstream_buffer
219 .run_future(snapshot_stream.try_next())
220 .await?
221 {
222 if let Message::Chunk(chunk) = &message {
223 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
224 }
225 yield message;
226 }
227 }
228
229 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
230 let recv_barrier_epoch = recv_barrier.epoch;
231 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
232 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
233 yield Message::Barrier(recv_barrier);
234 post_commit.post_yield_barrier(None).await?;
235 (
236 recv_barrier_epoch,
237 upstream_buffer.start_consuming_log_store(snapshot_epoch),
238 )
239 } else {
240 info!(
241 table_id = %self.upstream_table.table_id(),
242 snapshot_epoch,
243 barrier_epoch = ?first_recv_barrier_epoch,
244 "skip consuming snapshot"
245 );
246 (
247 first_recv_barrier_epoch,
248 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
249 )
250 };
251
252 if let Some(mut upstream_buffer) = upstream_buffer {
254 let initial_pending_lag = Duration::from_millis(
255 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
256 );
257 info!(
258 ?barrier_epoch,
259 table_id = self.upstream_table.table_id().table_id,
260 ?initial_pending_lag,
261 "start consuming log store"
262 );
263
264 let consuming_log_store_row_count = self
265 .metrics
266 .snapshot_backfill_consume_row_count
267 .with_guarded_label_values(&[
268 table_id_str.as_str(),
269 actor_id_str.as_str(),
270 "consuming_log_store",
271 ]);
272 loop {
273 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
274 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
275 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
276 {
277 let next_prev_epoch =
280 self.upstream_table.next_epoch(barrier_epoch.prev).await?;
281 assert_eq!(next_prev_epoch, barrier.epoch.prev);
282 }
283 barrier_epoch = barrier.epoch;
284 debug!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
285 let mut stream = upstream_buffer
289 .run_future(make_log_stream(
290 &self.upstream_table,
291 barrier_epoch.prev,
292 None,
293 self.chunk_size,
294 ))
295 .await?;
296 while let Some(chunk) =
297 upstream_buffer.run_future(stream.try_next()).await?
298 {
299 trace!(
300 ?barrier_epoch,
301 size = chunk.cardinality(),
302 "consume change log yield chunk",
303 );
304 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
305 yield Message::Chunk(chunk);
306 }
307
308 trace!(?barrier_epoch, "after consume change log");
309
310 if is_finished {
311 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
312 self.progress.finish_consuming_log_store(barrier.epoch);
313 } else {
314 self.progress.update_create_mview_log_store_progress(
315 barrier.epoch,
316 upstream_buffer.pending_epoch_lag(),
317 );
318 }
319
320 stream
321 .for_vnode_pk_progress(|vnode, row_count, progress| {
322 assert_eq!(progress, None);
323 backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
324 })
325 .await?;
326 let post_commit = backfill_state.commit(barrier.epoch).await?;
327 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
328 yield Message::Barrier(barrier);
329 post_commit.post_yield_barrier(None).await?;
330 if update_vnode_bitmap.is_some() {
331 return Err(anyhow!(
332 "should not update vnode bitmap during consuming log store"
333 )
334 .into());
335 }
336
337 if is_finished {
338 break;
339 }
340 }
341 info!(
342 ?barrier_epoch,
343 table_id = self.upstream_table.table_id().table_id,
344 "finish consuming log store"
345 );
346
347 (barrier_epoch, false)
348 } else {
349 info!(
350 ?barrier_epoch,
351 table_id = self.upstream_table.table_id().table_id,
352 "skip consuming log store and start consuming upstream directly"
353 );
354
355 (barrier_epoch, true)
356 }
357 } else {
358 backfill_state
359 .latest_progress()
360 .for_each(|(vnode, progress)| {
361 let progress = progress.expect("should not be empty");
362 assert_eq!(
363 progress.epoch, first_upstream_barrier.epoch.prev,
364 "vnode: {:?}",
365 vnode
366 );
367 assert_eq!(
368 progress.progress,
369 EpochBackfillProgress::Consumed,
370 "vnode: {:?}",
371 vnode
372 );
373 });
374 info!(
375 table_id = self.upstream_table.table_id().table_id,
376 "skip backfill"
377 );
378 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
379 (first_upstream_barrier.epoch, true)
380 }
381 };
382 let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
383 let mut epoch_row_count = 0;
384 while let Some(msg) = upstream.try_next().await? {
386 match msg {
387 Message::Barrier(barrier) => {
388 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
389 self.upstream_table
390 .vnodes()
391 .iter_vnodes()
392 .for_each(|vnode| {
393 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
396 });
397 epoch_row_count = 0;
398 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
399 barrier_epoch = barrier.epoch;
400 if need_report_finish {
401 need_report_finish = false;
402 self.progress.finish_consuming_log_store(barrier_epoch);
403 }
404 let post_commit = backfill_state.commit(barrier.epoch).await?;
405 yield Message::Barrier(barrier);
406 if let Some(new_vnode_bitmap) =
407 post_commit.post_yield_barrier(update_vnode_bitmap).await?
408 {
409 let _prev_vnode_bitmap = self
410 .upstream_table
411 .update_vnode_bitmap(new_vnode_bitmap.clone());
412 backfill_state
413 .latest_progress()
414 .for_each(|(vnode, progress)| {
415 let progress = progress.expect("should not be empty");
416 assert_eq!(
417 progress.epoch, barrier_epoch.prev,
418 "vnode {:?} has unexpected progress epoch",
419 vnode
420 );
421 assert_eq!(
422 progress.progress,
423 EpochBackfillProgress::Consumed,
424 "vnode {:?} has unexpected progress",
425 vnode
426 );
427 });
428 }
429 }
430 msg => {
431 if let Message::Chunk(chunk) = &msg {
432 epoch_row_count += chunk.cardinality();
433 }
434 yield msg;
435 }
436 }
437 }
438 }
439}
440
441impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
442 fn execute(self: Box<Self>) -> BoxedMessageStream {
443 let output_indices = self.output_indices.clone();
444 self.execute_inner()
445 .filter_map(move |result| {
446 ready({
447 match result {
448 Ok(message) => mapping_message(message, &output_indices).map(Ok),
449 Err(e) => Some(Err(e)),
450 }
451 })
452 })
453 .boxed()
454 }
455}
456
457struct ConsumingSnapshot;
458struct ConsumingLogStore;
459
460struct PendingBarriers {
461 first_upstream_barrier_epoch: EpochPair,
462
463 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
466
467 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
471}
472
473impl PendingBarriers {
474 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
475 Self {
476 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
477 pending_non_checkpoint_barriers: Default::default(),
478 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
479 first_upstream_barrier,
480 ])]),
481 }
482 }
483
484 fn add(&mut self, barrier: DispatcherBarrier) {
485 let is_checkpoint = barrier.kind.is_checkpoint();
486 self.pending_non_checkpoint_barriers.push_front(barrier);
487 if is_checkpoint {
488 self.checkpoint_barrier_groups
489 .push_front(take(&mut self.pending_non_checkpoint_barriers));
490 }
491 }
492
493 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
494 self.checkpoint_barrier_groups.pop_back()
495 }
496
497 fn consume_epoch(&mut self, epoch: EpochPair) {
498 let barriers = self
499 .checkpoint_barrier_groups
500 .back_mut()
501 .expect("non-empty");
502 let oldest_upstream_barrier = barriers.back().expect("non-empty");
503 assert!(
504 oldest_upstream_barrier.epoch.prev >= epoch.prev,
505 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
506 oldest_upstream_barrier.epoch,
507 epoch
508 );
509 if oldest_upstream_barrier.epoch.prev == epoch.prev {
510 assert_eq!(oldest_upstream_barrier.epoch, epoch);
511 barriers.pop_back();
512 if barriers.is_empty() {
513 self.checkpoint_barrier_groups.pop_back();
514 }
515 }
516 }
517
518 fn latest_epoch(&self) -> Option<EpochPair> {
519 self.pending_non_checkpoint_barriers
520 .front()
521 .or_else(|| {
522 self.checkpoint_barrier_groups
523 .front()
524 .and_then(|barriers| barriers.front())
525 })
526 .map(|barrier| barrier.epoch)
527 }
528
529 fn checkpoint_epoch_count(&self) -> usize {
530 self.checkpoint_barrier_groups.len()
531 }
532
533 fn has_checkpoint_epoch(&self) -> bool {
534 !self.checkpoint_barrier_groups.is_empty()
535 }
536}
537
538struct UpstreamBuffer<'a, S> {
539 upstream: &'a mut MergeExecutorInput,
540 max_pending_epoch_lag: u64,
541 consumed_epoch: u64,
542 upstream_pending_barriers: PendingBarriers,
544 is_polling_epoch_data: bool,
549 consume_upstream_row_count: LabelGuardedIntCounter,
550 _phase: S,
551}
552
553impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
554 fn new(
555 upstream: &'a mut MergeExecutorInput,
556 first_upstream_barrier: DispatcherBarrier,
557 consume_upstream_row_count: LabelGuardedIntCounter,
558 ) -> Self {
559 Self {
560 upstream,
561 is_polling_epoch_data: false,
562 consume_upstream_row_count,
563 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
564 max_pending_epoch_lag: u64::MAX,
566 consumed_epoch: 0,
567 _phase: ConsumingSnapshot {},
568 }
569 }
570
571 fn start_consuming_log_store(
572 mut self,
573 consumed_epoch: u64,
574 ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
575 if self
576 .upstream_pending_barriers
577 .first_upstream_barrier_epoch
578 .prev
579 == consumed_epoch
580 {
581 assert_eq!(
582 1,
583 self.upstream_pending_barriers
584 .pop()
585 .expect("non-empty")
586 .len()
587 );
588 }
589 let max_pending_epoch_lag = self.pending_epoch_lag();
590 let buffer = UpstreamBuffer {
591 upstream: self.upstream,
592 upstream_pending_barriers: self.upstream_pending_barriers,
593 max_pending_epoch_lag,
594 is_polling_epoch_data: self.is_polling_epoch_data,
595 consume_upstream_row_count: self.consume_upstream_row_count,
596 consumed_epoch,
597 _phase: ConsumingLogStore {},
598 };
599 if buffer.is_finished() {
600 None
601 } else {
602 Some(buffer)
603 }
604 }
605}
606
607impl<S> UpstreamBuffer<'_, S> {
608 fn can_consume_upstream(&self) -> bool {
609 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
610 }
611
612 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
613 {
614 loop {
615 if let Err(e) = try {
616 if !self.can_consume_upstream() {
617 return pending().await;
619 }
620 self.consume_until_next_checkpoint_barrier().await?;
621 } {
622 break e;
623 }
624 }
625 }
626 }
627
628 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
630 loop {
631 let msg: DispatcherMessage = self
632 .upstream
633 .try_next()
634 .await?
635 .ok_or_else(|| anyhow!("end of upstream"))?;
636 match msg {
637 DispatcherMessage::Chunk(chunk) => {
638 self.is_polling_epoch_data = true;
639 self.consume_upstream_row_count
640 .inc_by(chunk.cardinality() as _);
641 }
642 DispatcherMessage::Barrier(barrier) => {
643 let is_checkpoint = barrier.kind.is_checkpoint();
644 self.upstream_pending_barriers.add(barrier);
645 if is_checkpoint {
646 self.is_polling_epoch_data = false;
647 break;
648 } else {
649 self.is_polling_epoch_data = true;
650 }
651 }
652 DispatcherMessage::Watermark(_) => {
653 self.is_polling_epoch_data = true;
654 }
655 }
656 }
657 Ok(())
658 }
659}
660
661impl UpstreamBuffer<'_, ConsumingLogStore> {
662 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
663 assert!(!self.is_finished());
664 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
665 assert!(self.is_polling_epoch_data);
668 self.consume_until_next_checkpoint_barrier().await?;
669 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
670 }
671 self.upstream_pending_barriers.consume_epoch(epoch);
672
673 {
674 {
675 let prev_epoch = epoch.prev;
676 assert!(self.consumed_epoch < prev_epoch);
677 let elapsed_epoch = prev_epoch - self.consumed_epoch;
678 self.consumed_epoch = prev_epoch;
679 if self.upstream_pending_barriers.has_checkpoint_epoch() {
680 while self.can_consume_upstream()
682 && let Some(result) =
683 self.consume_until_next_checkpoint_barrier().now_or_never()
684 {
685 result?;
686 }
687 }
688 self.max_pending_epoch_lag = min(
692 self.pending_epoch_lag(),
693 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
694 );
695 }
696 }
697 Ok(self.is_finished())
698 }
699
700 fn is_finished(&self) -> bool {
701 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
702 assert!(
703 self.upstream_pending_barriers
704 .pending_non_checkpoint_barriers
705 .is_empty()
706 )
707 }
708 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
709 }
710}
711
712impl<S> UpstreamBuffer<'_, S> {
713 async fn run_future<T, E: Into<StreamExecutorError>>(
716 &mut self,
717 future: impl Future<Output = Result<T, E>>,
718 ) -> StreamExecutorResult<T> {
719 select! {
720 biased;
721 e = self.concurrently_consume_upstream() => {
722 Err(e)
723 }
724 result = future => {
726 result.map_err(Into::into)
727 }
728 }
729 }
730
731 fn pending_epoch_lag(&self) -> u64 {
732 self.upstream_pending_barriers
733 .latest_epoch()
734 .map(|epoch| {
735 epoch
736 .prev
737 .checked_sub(self.consumed_epoch)
738 .expect("pending epoch must be later than consumed_epoch")
739 })
740 .unwrap_or(0)
741 }
742}
743
744async fn make_log_stream(
745 upstream_table: &BatchTable<impl StateStore>,
746 prev_epoch: u64,
747 start_pk: Option<OwnedRow>,
748 chunk_size: usize,
749) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
750 let data_types = upstream_table.schema().data_types();
751 let start_pk = start_pk.as_ref();
752 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
754 upstream_table
755 .batch_iter_vnode_log(
756 prev_epoch,
757 HummockReadEpoch::Committed(prev_epoch),
758 start_pk,
759 vnode,
760 )
761 .map_ok(move |stream| {
762 let stream = stream.map_err(Into::into);
763 (vnode, stream, 0)
764 })
765 }))
766 .await?;
767 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
768 Ok(VnodeStream::new(
769 vnode_streams,
770 upstream_table.pk_in_output_indices().expect("should exist"),
771 builder,
772 ))
773}
774
775async fn make_snapshot_stream(
776 upstream_table: &BatchTable<impl StateStore>,
777 snapshot_epoch: u64,
778 backfill_state: &BackfillState<impl StateStore>,
779 rate_limit: RateLimit,
780 chunk_size: usize,
781) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
782 let data_types = upstream_table.schema().data_types();
783 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
784 move |(vnode, progress)| {
785 let start_pk = match progress {
786 None => Some((None, 0)),
787 Some(VnodeBackfillProgress {
788 row_count,
789 progress: EpochBackfillProgress::Consuming { latest_pk },
790 ..
791 }) => Some((Some(latest_pk), *row_count)),
792 Some(VnodeBackfillProgress {
793 progress: EpochBackfillProgress::Consumed,
794 ..
795 }) => None,
796 };
797 start_pk.map(|(start_pk, row_count)| {
798 upstream_table
799 .batch_iter_vnode(
800 HummockReadEpoch::Committed(snapshot_epoch),
801 start_pk,
802 vnode,
803 PrefetchOptions::prefetch_for_large_range_scan(),
804 )
805 .map_ok(move |stream| {
806 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
807 (vnode, stream, row_count)
808 })
809 })
810 },
811 ))
812 .await?;
813 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
814 Ok(VnodeStream::new(
815 vnode_streams,
816 upstream_table.pk_in_output_indices().expect("should exist"),
817 builder,
818 ))
819}
820
821#[expect(clippy::too_many_arguments)]
822#[try_stream(ok = Message, error = StreamExecutorError)]
823async fn make_consume_snapshot_stream<'a, S: StateStore>(
824 upstream_table: &'a BatchTable<S>,
825 snapshot_epoch: u64,
826 chunk_size: usize,
827 rate_limit: RateLimit,
828 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
829 progress: &'a mut CreateMviewProgressReporter,
830 backfill_state: &'a mut BackfillState<S>,
831 first_recv_barrier_epoch: EpochPair,
832 initial_backfill_paused: bool,
833 actor_ctx: &'a ActorContextRef,
834) {
835 let mut barrier_epoch = first_recv_barrier_epoch;
836
837 let mut snapshot_stream = make_snapshot_stream(
839 upstream_table,
840 snapshot_epoch,
841 &*backfill_state,
842 rate_limit,
843 chunk_size,
844 )
845 .await?;
846
847 async fn select_barrier_and_snapshot_stream(
848 barrier_rx: &mut UnboundedReceiver<Barrier>,
849 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
850 throttle_snapshot_stream: bool,
851 backfill_paused: bool,
852 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
853 select!(
854 result = receive_next_barrier(barrier_rx) => {
855 Ok(Either::Left(result?))
856 },
857 result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
858 Ok(Either::Right(result?))
859 }
860 )
861 }
862
863 let mut count = 0;
864 let mut epoch_row_count = 0;
865 let mut backfill_paused = initial_backfill_paused;
866 loop {
867 let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
868 match select_barrier_and_snapshot_stream(
869 barrier_rx,
870 &mut snapshot_stream,
871 throttle_snapshot_stream,
872 backfill_paused,
873 )
874 .await?
875 {
876 Either::Left(barrier) => {
877 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
878 barrier_epoch = barrier.epoch;
879 if barrier_epoch.curr >= snapshot_epoch {
880 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
881 }
882 if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
883 if backfill_paused {
884 backfill_paused = false;
885 } else {
886 tracing::error!(
887 "received start fragment backfill mutation, but backfill is not paused"
888 );
889 }
890 }
891 if let Some(chunk) = snapshot_stream.consume_builder() {
892 count += chunk.cardinality();
893 epoch_row_count += chunk.cardinality();
894 yield Message::Chunk(chunk);
895 }
896 snapshot_stream
897 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
898 if let Some(pk) = pk_progress {
899 backfill_state.update_epoch_progress(
900 vnode,
901 snapshot_epoch,
902 row_count,
903 pk,
904 );
905 } else {
906 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
907 }
908 })
909 .await?;
910 let post_commit = backfill_state.commit(barrier.epoch).await?;
911 debug!(?barrier_epoch, count, epoch_row_count, "update progress");
912 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
913 epoch_row_count = 0;
914
915 yield Message::Barrier(barrier);
916 post_commit.post_yield_barrier(None).await?;
917 }
918 Either::Right(Some(chunk)) => {
919 if backfill_paused {
920 return Err(
921 anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
922 );
923 }
924 count += chunk.cardinality();
925 epoch_row_count += chunk.cardinality();
926 yield Message::Chunk(chunk);
927 }
928 Either::Right(None) => {
929 break;
930 }
931 }
932 }
933
934 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
936 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
937 barrier_epoch = barrier_to_report_finish.epoch;
938 info!(?barrier_epoch, count, "report finish");
939 snapshot_stream
940 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
941 assert_eq!(pk_progress, None);
942 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
943 })
944 .await?;
945 let post_commit = backfill_state.commit(barrier_epoch).await?;
946 progress.finish(barrier_epoch, count as _);
947 yield Message::Barrier(barrier_to_report_finish);
948 post_commit.post_yield_barrier(None).await?;
949
950 loop {
952 let barrier = receive_next_barrier(barrier_rx).await?;
953 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
954 barrier_epoch = barrier.epoch;
955 let post_commit = backfill_state.commit(barrier.epoch).await?;
956 yield Message::Barrier(barrier);
957 post_commit.post_yield_barrier(None).await?;
958 if barrier_epoch.curr == snapshot_epoch {
959 break;
960 }
961 }
962 info!(?barrier_epoch, "finish consuming snapshot");
963}