1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::ChangeLogRow;
35use risingwave_storage::table::batch_table::BatchTable;
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
40use crate::executor::backfill::snapshot_backfill::state::{
41 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
42};
43use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
44use crate::executor::backfill::utils::{create_builder, mapping_message};
45use crate::executor::monitor::StreamingMetrics;
46use crate::executor::prelude::{StateTable, StreamExt, try_stream};
47use crate::executor::{
48 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
49 MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, expect_first_barrier,
50};
51use crate::task::CreateMviewProgressReporter;
52
53pub struct SnapshotBackfillExecutor<S: StateStore> {
54 upstream_table: BatchTable<S>,
56
57 progress_state_table: StateTable<S>,
59
60 upstream: MergeExecutorInput,
62
63 output_indices: Vec<usize>,
65
66 progress: CreateMviewProgressReporter,
67
68 chunk_size: usize,
69 rate_limit: RateLimit,
70
71 barrier_rx: UnboundedReceiver<Barrier>,
72
73 actor_ctx: ActorContextRef,
74 metrics: Arc<StreamingMetrics>,
75
76 snapshot_epoch: Option<u64>,
77}
78
79impl<S: StateStore> SnapshotBackfillExecutor<S> {
80 #[expect(clippy::too_many_arguments)]
81 pub(crate) fn new(
82 upstream_table: BatchTable<S>,
83 progress_state_table: StateTable<S>,
84 upstream: MergeExecutorInput,
85 output_indices: Vec<usize>,
86 actor_ctx: ActorContextRef,
87 progress: CreateMviewProgressReporter,
88 chunk_size: usize,
89 rate_limit: RateLimit,
90 barrier_rx: UnboundedReceiver<Barrier>,
91 metrics: Arc<StreamingMetrics>,
92 snapshot_epoch: Option<u64>,
93 ) -> Self {
94 assert_eq!(&upstream.info.schema, upstream_table.schema());
95 if upstream_table.pk_in_output_indices().is_none() {
96 panic!(
97 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
98 upstream_table.pk_indices(),
99 upstream_table.output_indices(),
100 upstream_table.schema()
101 )
102 };
103 if !matches!(rate_limit, RateLimit::Disabled) {
104 debug!(
105 ?rate_limit,
106 "create snapshot backfill executor with rate limit"
107 );
108 }
109 Self {
110 upstream_table,
111 progress_state_table,
112 upstream,
113 output_indices,
114 progress,
115 chunk_size,
116 rate_limit,
117 barrier_rx,
118 actor_ctx,
119 metrics,
120 snapshot_epoch,
121 }
122 }
123
124 #[try_stream(ok = Message, error = StreamExecutorError)]
125 async fn execute_inner(mut self) {
126 debug!("snapshot backfill executor start");
127 let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
128 debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
129 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
130 debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
131 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
132 self.snapshot_epoch
133 {
134 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
135 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
136 Some(snapshot_epoch)
137 } else {
138 None
139 }
140 } else {
141 if cfg!(debug_assertions) {
143 panic!(
144 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
145 first_upstream_barrier.epoch, first_recv_barrier.epoch
146 );
147 } else {
148 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
149 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
150 None
151 }
152 };
153 let first_recv_barrier_epoch = first_recv_barrier.epoch;
154 let initial_backfill_paused =
155 first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
156 yield Message::Barrier(first_recv_barrier);
157 let mut backfill_state = BackfillState::new(
158 self.progress_state_table,
159 first_recv_barrier_epoch,
160 self.upstream_table.pk_serializer().clone(),
161 )
162 .await?;
163
164 let (mut barrier_epoch, mut need_report_finish) = {
165 if let Some(snapshot_epoch) = should_snapshot_backfill {
166 let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
167 let actor_id_str = format!("{}", self.actor_ctx.id);
168
169 let consume_upstream_row_count = self
170 .metrics
171 .snapshot_backfill_consume_row_count
172 .with_guarded_label_values(&[
173 table_id_str.as_str(),
174 actor_id_str.as_str(),
175 "consume_upstream",
176 ]);
177
178 let mut upstream_buffer = UpstreamBuffer::new(
179 &mut self.upstream,
180 first_upstream_barrier,
181 consume_upstream_row_count,
182 );
183
184 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
186 < snapshot_epoch
187 {
188 info!(
189 table_id = %self.upstream_table.table_id(),
190 snapshot_epoch,
191 barrier_epoch = ?first_recv_barrier_epoch,
192 "start consuming snapshot"
193 );
194 {
195 let consuming_snapshot_row_count = self
196 .metrics
197 .snapshot_backfill_consume_row_count
198 .with_guarded_label_values(&[
199 table_id_str.as_str(),
200 actor_id_str.as_str(),
201 "consuming_snapshot",
202 ]);
203 let snapshot_stream = make_consume_snapshot_stream(
204 &self.upstream_table,
205 snapshot_epoch,
206 self.chunk_size,
207 self.rate_limit,
208 &mut self.barrier_rx,
209 &mut self.progress,
210 &mut backfill_state,
211 first_recv_barrier_epoch,
212 initial_backfill_paused,
213 &self.actor_ctx,
214 );
215
216 pin_mut!(snapshot_stream);
217
218 while let Some(message) = upstream_buffer
219 .run_future(snapshot_stream.try_next())
220 .await?
221 {
222 if let Message::Chunk(chunk) = &message {
223 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
224 }
225 yield message;
226 }
227 }
228
229 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
230 let recv_barrier_epoch = recv_barrier.epoch;
231 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
232 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
233 yield Message::Barrier(recv_barrier);
234 post_commit.post_yield_barrier(None).await?;
235 (
236 recv_barrier_epoch,
237 upstream_buffer.start_consuming_log_store(snapshot_epoch),
238 )
239 } else {
240 info!(
241 table_id = %self.upstream_table.table_id(),
242 snapshot_epoch,
243 barrier_epoch = ?first_recv_barrier_epoch,
244 "skip consuming snapshot"
245 );
246 (
247 first_recv_barrier_epoch,
248 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
249 )
250 };
251
252 if let Some(mut upstream_buffer) = upstream_buffer {
254 let initial_pending_lag = Duration::from_millis(
255 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
256 );
257 info!(
258 ?barrier_epoch,
259 table_id = self.upstream_table.table_id().table_id,
260 ?initial_pending_lag,
261 "start consuming log store"
262 );
263
264 let consuming_log_store_row_count = self
265 .metrics
266 .snapshot_backfill_consume_row_count
267 .with_guarded_label_values(&[
268 table_id_str.as_str(),
269 actor_id_str.as_str(),
270 "consuming_log_store",
271 ]);
272 loop {
273 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
274 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
275 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
276 {
277 let next_prev_epoch =
280 self.upstream_table.next_epoch(barrier_epoch.prev).await?;
281 assert_eq!(next_prev_epoch, barrier.epoch.prev);
282 }
283 barrier_epoch = barrier.epoch;
284 debug!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
285 let mut stream = upstream_buffer
289 .run_future(make_log_stream(
290 &self.upstream_table,
291 barrier_epoch.prev,
292 None,
293 self.chunk_size,
294 ))
295 .await?;
296 while let Some(chunk) =
297 upstream_buffer.run_future(stream.try_next()).await?
298 {
299 trace!(
300 ?barrier_epoch,
301 size = chunk.cardinality(),
302 "consume change log yield chunk",
303 );
304 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
305 yield Message::Chunk(chunk);
306 }
307
308 trace!(?barrier_epoch, "after consume change log");
309
310 if is_finished {
311 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
312 self.progress.finish_consuming_log_store(barrier.epoch);
313 } else {
314 self.progress.update_create_mview_log_store_progress(
315 barrier.epoch,
316 upstream_buffer.pending_epoch_lag(),
317 );
318 }
319
320 stream
321 .for_vnode_pk_progress(|vnode, row_count, progress| {
322 assert_eq!(progress, None);
323 backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
324 })
325 .await?;
326 let post_commit = backfill_state.commit(barrier.epoch).await?;
327 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
328 yield Message::Barrier(barrier);
329 post_commit.post_yield_barrier(None).await?;
330 if update_vnode_bitmap.is_some() {
331 return Err(anyhow!(
332 "should not update vnode bitmap during consuming log store"
333 )
334 .into());
335 }
336
337 if is_finished {
338 break;
339 }
340 }
341 info!(
342 ?barrier_epoch,
343 table_id = self.upstream_table.table_id().table_id,
344 "finish consuming log store"
345 );
346
347 (barrier_epoch, false)
348 } else {
349 info!(
350 ?barrier_epoch,
351 table_id = self.upstream_table.table_id().table_id,
352 "skip consuming log store and start consuming upstream directly"
353 );
354
355 (barrier_epoch, true)
356 }
357 } else {
358 backfill_state
359 .latest_progress()
360 .for_each(|(vnode, progress)| {
361 let progress = progress.expect("should not be empty");
362 assert_eq!(
363 progress.epoch, first_upstream_barrier.epoch.prev,
364 "vnode: {:?}",
365 vnode
366 );
367 assert_eq!(
368 progress.progress,
369 EpochBackfillProgress::Consumed,
370 "vnode: {:?}",
371 vnode
372 );
373 });
374 info!(
375 table_id = self.upstream_table.table_id().table_id,
376 "skip backfill"
377 );
378 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
379 (first_upstream_barrier.epoch, true)
380 }
381 };
382 let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
383 let mut epoch_row_count = 0;
384 while let Some(msg) = upstream.try_next().await? {
386 match msg {
387 Message::Barrier(barrier) => {
388 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
389 self.upstream_table
390 .vnodes()
391 .iter_vnodes()
392 .for_each(|vnode| {
393 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
396 });
397 epoch_row_count = 0;
398 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
399 barrier_epoch = barrier.epoch;
400 if need_report_finish {
401 need_report_finish = false;
402 self.progress.finish_consuming_log_store(barrier_epoch);
403 }
404 let post_commit = backfill_state.commit(barrier.epoch).await?;
405 yield Message::Barrier(barrier);
406 if let Some(new_vnode_bitmap) =
407 post_commit.post_yield_barrier(update_vnode_bitmap).await?
408 {
409 let _prev_vnode_bitmap =
410 self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
411 backfill_state
412 .latest_progress()
413 .for_each(|(vnode, progress)| {
414 let progress = progress.expect("should not be empty");
415 assert_eq!(
416 progress.epoch, barrier_epoch.prev,
417 "vnode {:?} has unexpected progress epoch",
418 vnode
419 );
420 assert_eq!(
421 progress.progress,
422 EpochBackfillProgress::Consumed,
423 "vnode {:?} has unexpected progress",
424 vnode
425 );
426 });
427 }
428 }
429 msg => {
430 if let Message::Chunk(chunk) = &msg {
431 epoch_row_count += chunk.cardinality();
432 }
433 yield msg;
434 }
435 }
436 }
437 }
438}
439
440impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
441 fn execute(self: Box<Self>) -> BoxedMessageStream {
442 let output_indices = self.output_indices.clone();
443 self.execute_inner()
444 .filter_map(move |result| {
445 ready({
446 match result {
447 Ok(message) => mapping_message(message, &output_indices).map(Ok),
448 Err(e) => Some(Err(e)),
449 }
450 })
451 })
452 .boxed()
453 }
454}
455
456struct ConsumingSnapshot;
457struct ConsumingLogStore;
458
459struct PendingBarriers {
460 first_upstream_barrier_epoch: EpochPair,
461
462 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
465
466 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
470}
471
472impl PendingBarriers {
473 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
474 Self {
475 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
476 pending_non_checkpoint_barriers: Default::default(),
477 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
478 first_upstream_barrier,
479 ])]),
480 }
481 }
482
483 fn add(&mut self, barrier: DispatcherBarrier) {
484 let is_checkpoint = barrier.kind.is_checkpoint();
485 self.pending_non_checkpoint_barriers.push_front(barrier);
486 if is_checkpoint {
487 self.checkpoint_barrier_groups
488 .push_front(take(&mut self.pending_non_checkpoint_barriers));
489 }
490 }
491
492 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
493 self.checkpoint_barrier_groups.pop_back()
494 }
495
496 fn consume_epoch(&mut self, epoch: EpochPair) {
497 let barriers = self
498 .checkpoint_barrier_groups
499 .back_mut()
500 .expect("non-empty");
501 let oldest_upstream_barrier = barriers.back().expect("non-empty");
502 assert!(
503 oldest_upstream_barrier.epoch.prev >= epoch.prev,
504 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
505 oldest_upstream_barrier.epoch,
506 epoch
507 );
508 if oldest_upstream_barrier.epoch.prev == epoch.prev {
509 assert_eq!(oldest_upstream_barrier.epoch, epoch);
510 barriers.pop_back();
511 if barriers.is_empty() {
512 self.checkpoint_barrier_groups.pop_back();
513 }
514 }
515 }
516
517 fn latest_epoch(&self) -> Option<EpochPair> {
518 self.pending_non_checkpoint_barriers
519 .front()
520 .or_else(|| {
521 self.checkpoint_barrier_groups
522 .front()
523 .and_then(|barriers| barriers.front())
524 })
525 .map(|barrier| barrier.epoch)
526 }
527
528 fn checkpoint_epoch_count(&self) -> usize {
529 self.checkpoint_barrier_groups.len()
530 }
531
532 fn has_checkpoint_epoch(&self) -> bool {
533 !self.checkpoint_barrier_groups.is_empty()
534 }
535}
536
537struct UpstreamBuffer<'a, S> {
538 upstream: &'a mut MergeExecutorInput,
539 max_pending_epoch_lag: u64,
540 consumed_epoch: u64,
541 upstream_pending_barriers: PendingBarriers,
543 is_polling_epoch_data: bool,
548 consume_upstream_row_count: LabelGuardedIntCounter,
549 _phase: S,
550}
551
552impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
553 fn new(
554 upstream: &'a mut MergeExecutorInput,
555 first_upstream_barrier: DispatcherBarrier,
556 consume_upstream_row_count: LabelGuardedIntCounter,
557 ) -> Self {
558 Self {
559 upstream,
560 is_polling_epoch_data: false,
561 consume_upstream_row_count,
562 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
563 max_pending_epoch_lag: u64::MAX,
565 consumed_epoch: 0,
566 _phase: ConsumingSnapshot {},
567 }
568 }
569
570 fn start_consuming_log_store(
571 mut self,
572 consumed_epoch: u64,
573 ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
574 if self
575 .upstream_pending_barriers
576 .first_upstream_barrier_epoch
577 .prev
578 == consumed_epoch
579 {
580 assert_eq!(
581 1,
582 self.upstream_pending_barriers
583 .pop()
584 .expect("non-empty")
585 .len()
586 );
587 }
588 let max_pending_epoch_lag = self.pending_epoch_lag();
589 let buffer = UpstreamBuffer {
590 upstream: self.upstream,
591 upstream_pending_barriers: self.upstream_pending_barriers,
592 max_pending_epoch_lag,
593 is_polling_epoch_data: self.is_polling_epoch_data,
594 consume_upstream_row_count: self.consume_upstream_row_count,
595 consumed_epoch,
596 _phase: ConsumingLogStore {},
597 };
598 if buffer.is_finished() {
599 None
600 } else {
601 Some(buffer)
602 }
603 }
604}
605
606impl<S> UpstreamBuffer<'_, S> {
607 fn can_consume_upstream(&self) -> bool {
608 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
609 }
610
611 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
612 {
613 loop {
614 if let Err(e) = try {
615 if !self.can_consume_upstream() {
616 return pending().await;
618 }
619 self.consume_until_next_checkpoint_barrier().await?;
620 } {
621 break e;
622 }
623 }
624 }
625 }
626
627 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
629 loop {
630 let msg: DispatcherMessage = self
631 .upstream
632 .try_next()
633 .await?
634 .ok_or_else(|| anyhow!("end of upstream"))?;
635 match msg {
636 DispatcherMessage::Chunk(chunk) => {
637 self.is_polling_epoch_data = true;
638 self.consume_upstream_row_count
639 .inc_by(chunk.cardinality() as _);
640 }
641 DispatcherMessage::Barrier(barrier) => {
642 let is_checkpoint = barrier.kind.is_checkpoint();
643 self.upstream_pending_barriers.add(barrier);
644 if is_checkpoint {
645 self.is_polling_epoch_data = false;
646 break;
647 } else {
648 self.is_polling_epoch_data = true;
649 }
650 }
651 DispatcherMessage::Watermark(_) => {
652 self.is_polling_epoch_data = true;
653 }
654 }
655 }
656 Ok(())
657 }
658}
659
660impl UpstreamBuffer<'_, ConsumingLogStore> {
661 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
662 assert!(!self.is_finished());
663 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
664 assert!(self.is_polling_epoch_data);
667 self.consume_until_next_checkpoint_barrier().await?;
668 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
669 }
670 self.upstream_pending_barriers.consume_epoch(epoch);
671
672 {
673 {
674 let prev_epoch = epoch.prev;
675 assert!(self.consumed_epoch < prev_epoch);
676 let elapsed_epoch = prev_epoch - self.consumed_epoch;
677 self.consumed_epoch = prev_epoch;
678 if self.upstream_pending_barriers.has_checkpoint_epoch() {
679 while self.can_consume_upstream()
681 && let Some(result) =
682 self.consume_until_next_checkpoint_barrier().now_or_never()
683 {
684 result?;
685 }
686 }
687 self.max_pending_epoch_lag = min(
691 self.pending_epoch_lag(),
692 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
693 );
694 }
695 }
696 Ok(self.is_finished())
697 }
698
699 fn is_finished(&self) -> bool {
700 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
701 assert!(
702 self.upstream_pending_barriers
703 .pending_non_checkpoint_barriers
704 .is_empty()
705 )
706 }
707 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
708 }
709}
710
711impl<S> UpstreamBuffer<'_, S> {
712 async fn run_future<T, E: Into<StreamExecutorError>>(
715 &mut self,
716 future: impl Future<Output = Result<T, E>>,
717 ) -> StreamExecutorResult<T> {
718 select! {
719 biased;
720 e = self.concurrently_consume_upstream() => {
721 Err(e)
722 }
723 result = future => {
725 result.map_err(Into::into)
726 }
727 }
728 }
729
730 fn pending_epoch_lag(&self) -> u64 {
731 self.upstream_pending_barriers
732 .latest_epoch()
733 .map(|epoch| {
734 epoch
735 .prev
736 .checked_sub(self.consumed_epoch)
737 .expect("pending epoch must be later than consumed_epoch")
738 })
739 .unwrap_or(0)
740 }
741}
742
743async fn make_log_stream(
744 upstream_table: &BatchTable<impl StateStore>,
745 prev_epoch: u64,
746 start_pk: Option<OwnedRow>,
747 chunk_size: usize,
748) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
749 let data_types = upstream_table.schema().data_types();
750 let start_pk = start_pk.as_ref();
751 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
753 upstream_table
754 .batch_iter_vnode_log(
755 prev_epoch,
756 HummockReadEpoch::Committed(prev_epoch),
757 start_pk,
758 vnode,
759 )
760 .map_ok(move |stream| {
761 let stream = stream.map_err(Into::into);
762 (vnode, stream, 0)
763 })
764 }))
765 .await?;
766 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
767 Ok(VnodeStream::new(
768 vnode_streams,
769 upstream_table.pk_in_output_indices().expect("should exist"),
770 builder,
771 ))
772}
773
774async fn make_snapshot_stream(
775 upstream_table: &BatchTable<impl StateStore>,
776 snapshot_epoch: u64,
777 backfill_state: &BackfillState<impl StateStore>,
778 rate_limit: RateLimit,
779 chunk_size: usize,
780) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
781 let data_types = upstream_table.schema().data_types();
782 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
783 move |(vnode, progress)| {
784 let start_pk = match progress {
785 None => Some((None, 0)),
786 Some(VnodeBackfillProgress {
787 row_count,
788 progress: EpochBackfillProgress::Consuming { latest_pk },
789 ..
790 }) => Some((Some(latest_pk), *row_count)),
791 Some(VnodeBackfillProgress {
792 progress: EpochBackfillProgress::Consumed,
793 ..
794 }) => None,
795 };
796 start_pk.map(|(start_pk, row_count)| {
797 upstream_table
798 .batch_iter_vnode(
799 HummockReadEpoch::Committed(snapshot_epoch),
800 start_pk,
801 vnode,
802 PrefetchOptions::prefetch_for_large_range_scan(),
803 )
804 .map_ok(move |stream| {
805 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
806 (vnode, stream, row_count)
807 })
808 })
809 },
810 ))
811 .await?;
812 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
813 Ok(VnodeStream::new(
814 vnode_streams,
815 upstream_table.pk_in_output_indices().expect("should exist"),
816 builder,
817 ))
818}
819
820#[expect(clippy::too_many_arguments)]
821#[try_stream(ok = Message, error = StreamExecutorError)]
822async fn make_consume_snapshot_stream<'a, S: StateStore>(
823 upstream_table: &'a BatchTable<S>,
824 snapshot_epoch: u64,
825 chunk_size: usize,
826 rate_limit: RateLimit,
827 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
828 progress: &'a mut CreateMviewProgressReporter,
829 backfill_state: &'a mut BackfillState<S>,
830 first_recv_barrier_epoch: EpochPair,
831 initial_backfill_paused: bool,
832 actor_ctx: &'a ActorContextRef,
833) {
834 let mut barrier_epoch = first_recv_barrier_epoch;
835
836 let mut snapshot_stream = make_snapshot_stream(
838 upstream_table,
839 snapshot_epoch,
840 &*backfill_state,
841 rate_limit,
842 chunk_size,
843 )
844 .await?;
845
846 async fn select_barrier_and_snapshot_stream(
847 barrier_rx: &mut UnboundedReceiver<Barrier>,
848 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
849 throttle_snapshot_stream: bool,
850 backfill_paused: bool,
851 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
852 select!(
853 result = receive_next_barrier(barrier_rx) => {
854 Ok(Either::Left(result?))
855 },
856 result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
857 Ok(Either::Right(result?))
858 }
859 )
860 }
861
862 let mut count = 0;
863 let mut epoch_row_count = 0;
864 let mut backfill_paused = initial_backfill_paused;
865 loop {
866 let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
867 match select_barrier_and_snapshot_stream(
868 barrier_rx,
869 &mut snapshot_stream,
870 throttle_snapshot_stream,
871 backfill_paused,
872 )
873 .await?
874 {
875 Either::Left(barrier) => {
876 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
877 barrier_epoch = barrier.epoch;
878 if barrier_epoch.curr >= snapshot_epoch {
879 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
880 }
881 if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
882 if backfill_paused {
883 backfill_paused = false;
884 } else {
885 tracing::error!(
886 "received start fragment backfill mutation, but backfill is not paused"
887 );
888 }
889 }
890 if let Some(chunk) = snapshot_stream.consume_builder() {
891 count += chunk.cardinality();
892 epoch_row_count += chunk.cardinality();
893 yield Message::Chunk(chunk);
894 }
895 snapshot_stream
896 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
897 if let Some(pk) = pk_progress {
898 backfill_state.update_epoch_progress(
899 vnode,
900 snapshot_epoch,
901 row_count,
902 pk,
903 );
904 } else {
905 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
906 }
907 })
908 .await?;
909 let post_commit = backfill_state.commit(barrier.epoch).await?;
910 debug!(?barrier_epoch, count, epoch_row_count, "update progress");
911 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
912 epoch_row_count = 0;
913
914 yield Message::Barrier(barrier);
915 post_commit.post_yield_barrier(None).await?;
916 }
917 Either::Right(Some(chunk)) => {
918 if backfill_paused {
919 return Err(
920 anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
921 );
922 }
923 count += chunk.cardinality();
924 epoch_row_count += chunk.cardinality();
925 yield Message::Chunk(chunk);
926 }
927 Either::Right(None) => {
928 break;
929 }
930 }
931 }
932
933 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
935 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
936 barrier_epoch = barrier_to_report_finish.epoch;
937 info!(?barrier_epoch, count, "report finish");
938 snapshot_stream
939 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
940 assert_eq!(pk_progress, None);
941 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
942 })
943 .await?;
944 let post_commit = backfill_state.commit(barrier_epoch).await?;
945 progress.finish(barrier_epoch, count as _);
946 yield Message::Barrier(barrier_to_report_finish);
947 post_commit.post_yield_barrier(None).await?;
948
949 loop {
951 let barrier = receive_next_barrier(barrier_rx).await?;
952 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
953 barrier_epoch = barrier.epoch;
954 let post_commit = backfill_state.commit(barrier.epoch).await?;
955 yield Message::Barrier(barrier);
956 post_commit.post_yield_barrier(None).await?;
957 if barrier_epoch.curr == snapshot_epoch {
958 break;
959 }
960 }
961 info!(?barrier_epoch, "finish consuming snapshot");
962}