1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::common::PbThrottleType;
33use risingwave_storage::StateStore;
34use risingwave_storage::store::PrefetchOptions;
35use risingwave_storage::table::ChangeLogRow;
36use risingwave_storage::table::batch_table::BatchTable;
37use tokio::select;
38use tokio::sync::mpsc::UnboundedReceiver;
39
40use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
41use crate::executor::backfill::snapshot_backfill::state::{
42 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
43};
44use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
45use crate::executor::backfill::utils::{create_builder, mapping_message};
46use crate::executor::monitor::StreamingMetrics;
47use crate::executor::prelude::{StateTable, StreamExt, try_stream};
48use crate::executor::{
49 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
50 MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
51 expect_first_barrier,
52};
53use crate::task::CreateMviewProgressReporter;
54
55pub struct SnapshotBackfillExecutor<S: StateStore> {
56 upstream_table: BatchTable<S>,
58
59 progress_state_table: StateTable<S>,
61
62 upstream: MergeExecutorInput,
64
65 output_indices: Vec<usize>,
67
68 progress: CreateMviewProgressReporter,
69
70 chunk_size: usize,
71 rate_limit: RateLimit,
72
73 barrier_rx: UnboundedReceiver<Barrier>,
74
75 actor_ctx: ActorContextRef,
76 metrics: Arc<StreamingMetrics>,
77
78 snapshot_epoch: Option<u64>,
79}
80
81impl<S: StateStore> SnapshotBackfillExecutor<S> {
82 #[expect(clippy::too_many_arguments)]
83 pub(crate) fn new(
84 upstream_table: BatchTable<S>,
85 progress_state_table: StateTable<S>,
86 upstream: MergeExecutorInput,
87 output_indices: Vec<usize>,
88 actor_ctx: ActorContextRef,
89 progress: CreateMviewProgressReporter,
90 chunk_size: usize,
91 rate_limit: RateLimit,
92 barrier_rx: UnboundedReceiver<Barrier>,
93 metrics: Arc<StreamingMetrics>,
94 snapshot_epoch: Option<u64>,
95 ) -> Self {
96 assert_eq!(&upstream.info.schema, upstream_table.schema());
97 if upstream_table.pk_in_output_indices().is_none() {
98 panic!(
99 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
100 upstream_table.pk_indices(),
101 upstream_table.output_indices(),
102 upstream_table.schema()
103 )
104 };
105 if !matches!(rate_limit, RateLimit::Disabled) {
106 trace!(
107 ?rate_limit,
108 "create snapshot backfill executor with rate limit"
109 );
110 }
111 Self {
112 upstream_table,
113 progress_state_table,
114 upstream,
115 output_indices,
116 progress,
117 chunk_size,
118 rate_limit,
119 barrier_rx,
120 actor_ctx,
121 metrics,
122 snapshot_epoch,
123 }
124 }
125
126 #[try_stream(ok = Message, error = StreamExecutorError)]
127 async fn execute_inner(mut self) {
128 trace!("snapshot backfill executor start");
129 let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
130 trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
131 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
132 trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
133 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
134 self.snapshot_epoch
135 {
136 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
137 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
138 Some(snapshot_epoch)
139 } else {
140 None
141 }
142 } else {
143 if cfg!(debug_assertions) {
145 panic!(
146 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
147 first_upstream_barrier.epoch, first_recv_barrier.epoch
148 );
149 } else {
150 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
151 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
152 None
153 }
154 };
155 let first_recv_barrier_epoch = first_recv_barrier.epoch;
156 let initial_backfill_paused =
157 first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
158 yield Message::Barrier(first_recv_barrier);
159 let mut backfill_state = BackfillState::new(
160 self.progress_state_table,
161 first_recv_barrier_epoch,
162 self.upstream_table.pk_serializer().clone(),
163 )
164 .await?;
165
166 let (mut barrier_epoch, mut need_report_finish) = {
167 if let Some(snapshot_epoch) = should_snapshot_backfill {
168 let table_id_str = format!("{}", self.upstream_table.table_id());
169 let actor_id_str = format!("{}", self.actor_ctx.id);
170
171 let consume_upstream_row_count = self
172 .metrics
173 .snapshot_backfill_consume_row_count
174 .with_guarded_label_values(&[
175 table_id_str.as_str(),
176 actor_id_str.as_str(),
177 "consume_upstream",
178 ]);
179
180 let mut upstream_buffer = UpstreamBuffer::new(
181 &mut self.upstream,
182 first_upstream_barrier,
183 consume_upstream_row_count,
184 );
185
186 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
188 < snapshot_epoch
189 {
190 trace!(
191 table_id = %self.upstream_table.table_id(),
192 snapshot_epoch,
193 barrier_epoch = ?first_recv_barrier_epoch,
194 "start consuming snapshot"
195 );
196 {
197 let consuming_snapshot_row_count = self
198 .metrics
199 .snapshot_backfill_consume_row_count
200 .with_guarded_label_values(&[
201 table_id_str.as_str(),
202 actor_id_str.as_str(),
203 "consuming_snapshot",
204 ]);
205 let snapshot_stream = make_consume_snapshot_stream(
206 &self.upstream_table,
207 snapshot_epoch,
208 self.chunk_size,
209 &mut self.rate_limit,
210 &mut self.barrier_rx,
211 &mut self.progress,
212 &mut backfill_state,
213 first_recv_barrier_epoch,
214 initial_backfill_paused,
215 &self.actor_ctx,
216 );
217
218 pin_mut!(snapshot_stream);
219
220 while let Some(message) = upstream_buffer
221 .run_future(snapshot_stream.try_next())
222 .await?
223 {
224 if let Message::Chunk(chunk) = &message {
225 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
226 }
227 yield message;
228 }
229 }
230
231 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
232 let recv_barrier_epoch = recv_barrier.epoch;
233 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
234 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
235 yield Message::Barrier(recv_barrier);
236 post_commit.post_yield_barrier(None).await?;
237 (
238 recv_barrier_epoch,
239 upstream_buffer.start_consuming_log_store(snapshot_epoch),
240 )
241 } else {
242 trace!(
243 table_id = %self.upstream_table.table_id(),
244 snapshot_epoch,
245 barrier_epoch = ?first_recv_barrier_epoch,
246 "skip consuming snapshot"
247 );
248 (
249 first_recv_barrier_epoch,
250 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
251 )
252 };
253
254 if let Some(mut upstream_buffer) = upstream_buffer {
256 let initial_pending_lag = Duration::from_millis(
257 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
258 );
259 trace!(
260 ?barrier_epoch,
261 table_id = %self.upstream_table.table_id(),
262 ?initial_pending_lag,
263 "start consuming log store"
264 );
265
266 let consuming_log_store_row_count = self
267 .metrics
268 .snapshot_backfill_consume_row_count
269 .with_guarded_label_values(&[
270 table_id_str.as_str(),
271 actor_id_str.as_str(),
272 "consuming_log_store",
273 ]);
274 loop {
275 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
276 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
277 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
278 {
279 let next_prev_epoch =
282 self.upstream_table.next_epoch(barrier_epoch.prev).await?;
283 assert_eq!(next_prev_epoch, barrier.epoch.prev);
284 }
285 barrier_epoch = barrier.epoch;
286 trace!(?barrier_epoch, kind = ?barrier.kind, "start consume epoch change log");
287 let mut stream = upstream_buffer
291 .run_future(make_log_stream(
292 &self.upstream_table,
293 barrier_epoch.prev,
294 None,
295 self.chunk_size,
296 ))
297 .await?;
298 while let Some(chunk) =
299 upstream_buffer.run_future(stream.try_next()).await?
300 {
301 trace!(
302 ?barrier_epoch,
303 size = chunk.cardinality(),
304 "consume change log yield chunk",
305 );
306 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
307 yield Message::Chunk(chunk);
308 }
309
310 trace!(?barrier_epoch, "after consume change log");
311
312 if is_finished {
313 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
314 self.progress.finish_consuming_log_store(barrier.epoch);
315 } else {
316 self.progress.update_create_mview_log_store_progress(
317 barrier.epoch,
318 upstream_buffer.pending_epoch_lag(),
319 );
320 }
321
322 stream
323 .for_vnode_pk_progress(|vnode, row_count, progress| {
324 assert_eq!(progress, None);
325 backfill_state.finish_epoch(vnode, barrier.epoch.prev, row_count);
326 })
327 .await?;
328 let post_commit = backfill_state.commit(barrier.epoch).await?;
329 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
330 yield Message::Barrier(barrier);
331 post_commit.post_yield_barrier(None).await?;
332 if update_vnode_bitmap.is_some() {
333 return Err(anyhow!(
334 "should not update vnode bitmap during consuming log store"
335 )
336 .into());
337 }
338
339 if is_finished {
340 break;
341 }
342 }
343 trace!(
344 ?barrier_epoch,
345 table_id = %self.upstream_table.table_id(),
346 "finish consuming log store"
347 );
348
349 (barrier_epoch, false)
350 } else {
351 trace!(
352 ?barrier_epoch,
353 table_id = %self.upstream_table.table_id(),
354 "skip consuming log store and start consuming upstream directly"
355 );
356
357 (barrier_epoch, true)
358 }
359 } else {
360 backfill_state
361 .latest_progress()
362 .for_each(|(vnode, progress)| {
363 let progress = progress.expect("should not be empty");
364 assert_eq!(
365 progress.epoch, first_upstream_barrier.epoch.prev,
366 "vnode: {:?}",
367 vnode
368 );
369 assert_eq!(
370 progress.progress,
371 EpochBackfillProgress::Consumed,
372 "vnode: {:?}",
373 vnode
374 );
375 });
376 trace!(
377 table_id = %self.upstream_table.table_id(),
378 "skip backfill"
379 );
380 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
381 (first_upstream_barrier.epoch, true)
382 }
383 };
384 let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
385 let mut epoch_row_count = 0;
386 while let Some(msg) = upstream.try_next().await? {
388 match msg {
389 Message::Barrier(barrier) => {
390 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
391 self.upstream_table
392 .vnodes()
393 .iter_vnodes()
394 .for_each(|vnode| {
395 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
398 });
399 epoch_row_count = 0;
400 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
401 barrier_epoch = barrier.epoch;
402 if need_report_finish {
403 need_report_finish = false;
404 self.progress.finish_consuming_log_store(barrier_epoch);
405 }
406 let post_commit = backfill_state.commit(barrier.epoch).await?;
407 yield Message::Barrier(barrier);
408 if let Some(new_vnode_bitmap) =
409 post_commit.post_yield_barrier(update_vnode_bitmap).await?
410 {
411 let _prev_vnode_bitmap =
412 self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
413 backfill_state
414 .latest_progress()
415 .for_each(|(vnode, progress)| {
416 let progress = progress.expect("should not be empty");
417 assert_eq!(
418 progress.epoch, barrier_epoch.prev,
419 "vnode {:?} has unexpected progress epoch",
420 vnode
421 );
422 assert_eq!(
423 progress.progress,
424 EpochBackfillProgress::Consumed,
425 "vnode {:?} has unexpected progress",
426 vnode
427 );
428 });
429 }
430 }
431 msg => {
432 if let Message::Chunk(chunk) = &msg {
433 epoch_row_count += chunk.cardinality();
434 }
435 yield msg;
436 }
437 }
438 }
439 }
440}
441
442impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
443 fn execute(self: Box<Self>) -> BoxedMessageStream {
444 let output_indices = self.output_indices.clone();
445 self.execute_inner()
446 .filter_map(move |result| {
447 ready({
448 match result {
449 Ok(message) => mapping_message(message, &output_indices).map(Ok),
450 Err(e) => Some(Err(e)),
451 }
452 })
453 })
454 .boxed()
455 }
456}
457
458struct ConsumingSnapshot;
459struct ConsumingLogStore;
460
461struct PendingBarriers {
462 first_upstream_barrier_epoch: EpochPair,
463
464 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
467
468 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
472}
473
474impl PendingBarriers {
475 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
476 Self {
477 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
478 pending_non_checkpoint_barriers: Default::default(),
479 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
480 first_upstream_barrier,
481 ])]),
482 }
483 }
484
485 fn add(&mut self, barrier: DispatcherBarrier) {
486 let is_checkpoint = barrier.kind.is_checkpoint();
487 self.pending_non_checkpoint_barriers.push_front(barrier);
488 if is_checkpoint {
489 self.checkpoint_barrier_groups
490 .push_front(take(&mut self.pending_non_checkpoint_barriers));
491 }
492 }
493
494 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
495 self.checkpoint_barrier_groups.pop_back()
496 }
497
498 fn consume_epoch(&mut self, epoch: EpochPair) {
499 let barriers = self
500 .checkpoint_barrier_groups
501 .back_mut()
502 .expect("non-empty");
503 let oldest_upstream_barrier = barriers.back().expect("non-empty");
504 assert!(
505 oldest_upstream_barrier.epoch.prev >= epoch.prev,
506 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
507 oldest_upstream_barrier.epoch,
508 epoch
509 );
510 if oldest_upstream_barrier.epoch.prev == epoch.prev {
511 assert_eq!(oldest_upstream_barrier.epoch, epoch);
512 barriers.pop_back();
513 if barriers.is_empty() {
514 self.checkpoint_barrier_groups.pop_back();
515 }
516 }
517 }
518
519 fn latest_epoch(&self) -> Option<EpochPair> {
520 self.pending_non_checkpoint_barriers
521 .front()
522 .or_else(|| {
523 self.checkpoint_barrier_groups
524 .front()
525 .and_then(|barriers| barriers.front())
526 })
527 .map(|barrier| barrier.epoch)
528 }
529
530 fn checkpoint_epoch_count(&self) -> usize {
531 self.checkpoint_barrier_groups.len()
532 }
533
534 fn has_checkpoint_epoch(&self) -> bool {
535 !self.checkpoint_barrier_groups.is_empty()
536 }
537}
538
539struct UpstreamBuffer<'a, S> {
540 upstream: &'a mut MergeExecutorInput,
541 max_pending_epoch_lag: u64,
542 consumed_epoch: u64,
543 upstream_pending_barriers: PendingBarriers,
545 is_polling_epoch_data: bool,
550 consume_upstream_row_count: LabelGuardedIntCounter,
551 _phase: S,
552}
553
554impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
555 fn new(
556 upstream: &'a mut MergeExecutorInput,
557 first_upstream_barrier: DispatcherBarrier,
558 consume_upstream_row_count: LabelGuardedIntCounter,
559 ) -> Self {
560 Self {
561 upstream,
562 is_polling_epoch_data: false,
563 consume_upstream_row_count,
564 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
565 max_pending_epoch_lag: u64::MAX,
567 consumed_epoch: 0,
568 _phase: ConsumingSnapshot {},
569 }
570 }
571
572 fn start_consuming_log_store(
573 mut self,
574 consumed_epoch: u64,
575 ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
576 if self
577 .upstream_pending_barriers
578 .first_upstream_barrier_epoch
579 .prev
580 == consumed_epoch
581 {
582 assert_eq!(
583 1,
584 self.upstream_pending_barriers
585 .pop()
586 .expect("non-empty")
587 .len()
588 );
589 }
590 let max_pending_epoch_lag = self.pending_epoch_lag();
591 let buffer = UpstreamBuffer {
592 upstream: self.upstream,
593 upstream_pending_barriers: self.upstream_pending_barriers,
594 max_pending_epoch_lag,
595 is_polling_epoch_data: self.is_polling_epoch_data,
596 consume_upstream_row_count: self.consume_upstream_row_count,
597 consumed_epoch,
598 _phase: ConsumingLogStore {},
599 };
600 if buffer.is_finished() {
601 None
602 } else {
603 Some(buffer)
604 }
605 }
606}
607
608impl<S> UpstreamBuffer<'_, S> {
609 fn can_consume_upstream(&self) -> bool {
610 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
611 }
612
613 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
614 {
615 loop {
616 if let Err(e) = try {
617 if !self.can_consume_upstream() {
618 return pending().await;
620 }
621 self.consume_until_next_checkpoint_barrier().await?;
622 } {
623 break e;
624 }
625 }
626 }
627 }
628
629 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
631 loop {
632 let msg: DispatcherMessage = self
633 .upstream
634 .try_next()
635 .await?
636 .ok_or_else(|| anyhow!("end of upstream"))?;
637 match msg {
638 DispatcherMessage::Chunk(chunk) => {
639 self.is_polling_epoch_data = true;
640 self.consume_upstream_row_count
641 .inc_by(chunk.cardinality() as _);
642 }
643 DispatcherMessage::Barrier(barrier) => {
644 let is_checkpoint = barrier.kind.is_checkpoint();
645 self.upstream_pending_barriers.add(barrier);
646 if is_checkpoint {
647 self.is_polling_epoch_data = false;
648 break;
649 } else {
650 self.is_polling_epoch_data = true;
651 }
652 }
653 DispatcherMessage::Watermark(_) => {
654 self.is_polling_epoch_data = true;
655 }
656 }
657 }
658 Ok(())
659 }
660}
661
662impl UpstreamBuffer<'_, ConsumingLogStore> {
663 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
664 assert!(!self.is_finished());
665 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
666 assert!(self.is_polling_epoch_data);
669 self.consume_until_next_checkpoint_barrier().await?;
670 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
671 }
672 self.upstream_pending_barriers.consume_epoch(epoch);
673
674 {
675 {
676 let prev_epoch = epoch.prev;
677 assert!(self.consumed_epoch < prev_epoch);
678 let elapsed_epoch = prev_epoch - self.consumed_epoch;
679 self.consumed_epoch = prev_epoch;
680 if self.upstream_pending_barriers.has_checkpoint_epoch() {
681 while self.can_consume_upstream()
683 && let Some(result) =
684 self.consume_until_next_checkpoint_barrier().now_or_never()
685 {
686 result?;
687 }
688 }
689 self.max_pending_epoch_lag = min(
693 self.pending_epoch_lag(),
694 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
695 );
696 }
697 }
698 Ok(self.is_finished())
699 }
700
701 fn is_finished(&self) -> bool {
702 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
703 assert!(
704 self.upstream_pending_barriers
705 .pending_non_checkpoint_barriers
706 .is_empty()
707 )
708 }
709 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
710 }
711}
712
713impl<S> UpstreamBuffer<'_, S> {
714 async fn run_future<T, E: Into<StreamExecutorError>>(
717 &mut self,
718 future: impl Future<Output = Result<T, E>>,
719 ) -> StreamExecutorResult<T> {
720 select! {
721 biased;
722 e = self.concurrently_consume_upstream() => {
723 Err(e)
724 }
725 result = future => {
727 result.map_err(Into::into)
728 }
729 }
730 }
731
732 fn pending_epoch_lag(&self) -> u64 {
733 self.upstream_pending_barriers
734 .latest_epoch()
735 .map(|epoch| {
736 epoch
737 .prev
738 .checked_sub(self.consumed_epoch)
739 .expect("pending epoch must be later than consumed_epoch")
740 })
741 .unwrap_or(0)
742 }
743}
744
745async fn make_log_stream(
746 upstream_table: &BatchTable<impl StateStore>,
747 prev_epoch: u64,
748 start_pk: Option<OwnedRow>,
749 chunk_size: usize,
750) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
751 let data_types = upstream_table.schema().data_types();
752 let start_pk = start_pk.as_ref();
753 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
755 upstream_table
756 .batch_iter_vnode_log(
757 prev_epoch,
758 HummockReadEpoch::Committed(prev_epoch),
759 start_pk,
760 vnode,
761 )
762 .map_ok(move |stream| {
763 let stream = stream.map_err(Into::into);
764 (vnode, stream, 0)
765 })
766 }))
767 .await?;
768 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
769 Ok(VnodeStream::new(
770 vnode_streams,
771 upstream_table.pk_in_output_indices().expect("should exist"),
772 builder,
773 ))
774}
775
776async fn make_snapshot_stream(
777 upstream_table: &BatchTable<impl StateStore>,
778 snapshot_epoch: u64,
779 backfill_state: &BackfillState<impl StateStore>,
780 rate_limit: RateLimit,
781 chunk_size: usize,
782) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
783 let data_types = upstream_table.schema().data_types();
784 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
785 move |(vnode, progress)| {
786 let start_pk = match progress {
787 None => Some((None, 0)),
788 Some(VnodeBackfillProgress {
789 row_count,
790 progress: EpochBackfillProgress::Consuming { latest_pk },
791 ..
792 }) => Some((Some(latest_pk), *row_count)),
793 Some(VnodeBackfillProgress {
794 progress: EpochBackfillProgress::Consumed,
795 ..
796 }) => None,
797 };
798 start_pk.map(|(start_pk, row_count)| {
799 upstream_table
800 .batch_iter_vnode(
801 HummockReadEpoch::Committed(snapshot_epoch),
802 start_pk,
803 vnode,
804 PrefetchOptions::prefetch_for_large_range_scan(),
805 )
806 .map_ok(move |stream| {
807 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
808 (vnode, stream, row_count)
809 })
810 })
811 },
812 ))
813 .await?;
814 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
815 Ok(VnodeStream::new(
816 vnode_streams,
817 upstream_table.pk_in_output_indices().expect("should exist"),
818 builder,
819 ))
820}
821
822#[expect(clippy::too_many_arguments)]
823#[try_stream(ok = Message, error = StreamExecutorError)]
824async fn make_consume_snapshot_stream<'a, S: StateStore>(
825 upstream_table: &'a BatchTable<S>,
826 snapshot_epoch: u64,
827 chunk_size: usize,
828 rate_limit: &'a mut RateLimit,
829 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
830 progress: &'a mut CreateMviewProgressReporter,
831 backfill_state: &'a mut BackfillState<S>,
832 first_recv_barrier_epoch: EpochPair,
833 initial_backfill_paused: bool,
834 actor_ctx: &'a ActorContextRef,
835) {
836 let mut barrier_epoch = first_recv_barrier_epoch;
837
838 let mut snapshot_stream = make_snapshot_stream(
840 upstream_table,
841 snapshot_epoch,
842 &*backfill_state,
843 *rate_limit,
844 chunk_size,
845 )
846 .await?;
847
848 async fn select_barrier_and_snapshot_stream(
849 barrier_rx: &mut UnboundedReceiver<Barrier>,
850 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
851 throttle_snapshot_stream: bool,
852 backfill_paused: bool,
853 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
854 select!(
855 result = receive_next_barrier(barrier_rx) => {
856 Ok(Either::Left(result?))
857 },
858 result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
859 Ok(Either::Right(result?))
860 }
861 )
862 }
863
864 let mut count = 0;
865 let mut epoch_row_count = 0;
866 let mut backfill_paused = initial_backfill_paused;
867 loop {
868 let throttle_snapshot_stream = epoch_row_count as u64 > rate_limit.to_u64();
869 match select_barrier_and_snapshot_stream(
870 barrier_rx,
871 &mut snapshot_stream,
872 throttle_snapshot_stream,
873 backfill_paused,
874 )
875 .await?
876 {
877 Either::Left(barrier) => {
878 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
879 barrier_epoch = barrier.epoch;
880
881 if barrier_epoch.curr >= snapshot_epoch {
882 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
883 }
884 if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
885 if backfill_paused {
886 backfill_paused = false;
887 } else {
888 tracing::error!(
889 "received start fragment backfill mutation, but backfill is not paused"
890 );
891 }
892 }
893 if let Some(chunk) = snapshot_stream.consume_builder() {
894 count += chunk.cardinality();
895 epoch_row_count += chunk.cardinality();
896 yield Message::Chunk(chunk);
897 }
898 snapshot_stream
899 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
900 if let Some(pk) = pk_progress {
901 backfill_state.update_epoch_progress(
902 vnode,
903 snapshot_epoch,
904 row_count,
905 pk,
906 );
907 } else {
908 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
909 }
910 })
911 .await?;
912 let post_commit = backfill_state.commit(barrier.epoch).await?;
913 trace!(?barrier_epoch, count, epoch_row_count, "update progress");
914 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
915 epoch_row_count = 0;
916
917 let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
918 if let Mutation::Throttle(config) = &**m
919 && let Some(config) = config.get(&actor_ctx.fragment_id)
920 && config.throttle_type() == PbThrottleType::Backfill
921 {
922 Some(config.rate_limit)
923 } else {
924 None
925 }
926 });
927 yield Message::Barrier(barrier);
928 post_commit.post_yield_barrier(None).await?;
929
930 if let Some(new_rate_limit) = new_rate_limit {
931 let new_rate_limit = new_rate_limit.into();
932 *rate_limit = new_rate_limit;
933 snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
934 }
935 }
936 Either::Right(Some(chunk)) => {
937 if backfill_paused {
938 return Err(
939 anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
940 );
941 }
942 count += chunk.cardinality();
943 epoch_row_count += chunk.cardinality();
944 yield Message::Chunk(chunk);
945 }
946 Either::Right(None) => {
947 break;
948 }
949 }
950 }
951
952 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
954 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
955 barrier_epoch = barrier_to_report_finish.epoch;
956 trace!(?barrier_epoch, count, "report finish");
957 snapshot_stream
958 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
959 assert_eq!(pk_progress, None);
960 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
961 })
962 .await?;
963 let post_commit = backfill_state.commit(barrier_epoch).await?;
964 progress.finish(barrier_epoch, count as _);
965 yield Message::Barrier(barrier_to_report_finish);
966 post_commit.post_yield_barrier(None).await?;
967
968 loop {
970 let barrier = receive_next_barrier(barrier_rx).await?;
971 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
972 barrier_epoch = barrier.epoch;
973 let post_commit = backfill_state.commit(barrier.epoch).await?;
974 yield Message::Barrier(barrier);
975 post_commit.post_yield_barrier(None).await?;
976 if barrier_epoch.curr == snapshot_epoch {
977 break;
978 }
979 }
980 trace!(?barrier_epoch, "finish consuming snapshot");
981}