1use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::common::PbThrottleType;
33use risingwave_storage::StateStore;
34use risingwave_storage::store::PrefetchOptions;
35use risingwave_storage::table::ChangeLogRow;
36use risingwave_storage::table::batch_table::BatchTable;
37use tokio::select;
38use tokio::sync::mpsc::UnboundedReceiver;
39use tokio::time::sleep;
40
41use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
42use crate::executor::backfill::snapshot_backfill::state::{
43 BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
44};
45use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
46use crate::executor::backfill::utils::{create_builder, mapping_message};
47use crate::executor::monitor::StreamingMetrics;
48use crate::executor::prelude::{StateTable, StreamExt, try_stream};
49use crate::executor::{
50 ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
51 MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
52 expect_first_barrier,
53};
54use crate::task::CreateMviewProgressReporter;
55
56pub struct SnapshotBackfillExecutor<S: StateStore> {
57 upstream_table: BatchTable<S>,
59
60 progress_state_table: StateTable<S>,
62
63 upstream: MergeExecutorInput,
65
66 output_indices: Vec<usize>,
68
69 progress: CreateMviewProgressReporter,
70
71 chunk_size: usize,
72 rate_limit: RateLimit,
73
74 barrier_rx: UnboundedReceiver<Barrier>,
75
76 actor_ctx: ActorContextRef,
77 metrics: Arc<StreamingMetrics>,
78
79 snapshot_epoch: Option<u64>,
80}
81
82impl<S: StateStore> SnapshotBackfillExecutor<S> {
83 #[expect(clippy::too_many_arguments)]
84 pub(crate) fn new(
85 upstream_table: BatchTable<S>,
86 progress_state_table: StateTable<S>,
87 upstream: MergeExecutorInput,
88 output_indices: Vec<usize>,
89 actor_ctx: ActorContextRef,
90 progress: CreateMviewProgressReporter,
91 chunk_size: usize,
92 rate_limit: RateLimit,
93 barrier_rx: UnboundedReceiver<Barrier>,
94 metrics: Arc<StreamingMetrics>,
95 snapshot_epoch: Option<u64>,
96 ) -> Self {
97 assert_eq!(&upstream.info.schema, upstream_table.schema());
98 if upstream_table.pk_in_output_indices().is_none() {
99 panic!(
100 "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
101 upstream_table.pk_indices(),
102 upstream_table.output_indices(),
103 upstream_table.schema()
104 )
105 };
106 if !matches!(rate_limit, RateLimit::Disabled) {
107 trace!(
108 ?rate_limit,
109 "create snapshot backfill executor with rate limit"
110 );
111 }
112 Self {
113 upstream_table,
114 progress_state_table,
115 upstream,
116 output_indices,
117 progress,
118 chunk_size,
119 rate_limit,
120 barrier_rx,
121 actor_ctx,
122 metrics,
123 snapshot_epoch,
124 }
125 }
126
127 #[try_stream(ok = Message, error = StreamExecutorError)]
128 async fn execute_inner(mut self) {
129 trace!("snapshot backfill executor start");
130 let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
131 trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
132 let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
133 trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
134 let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
135 self.snapshot_epoch
136 {
137 if first_upstream_barrier.epoch != first_recv_barrier.epoch {
138 assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
139 Some(snapshot_epoch)
140 } else {
141 None
142 }
143 } else {
144 if cfg!(debug_assertions) {
146 panic!(
147 "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
148 first_upstream_barrier.epoch, first_recv_barrier.epoch
149 );
150 } else {
151 warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
152 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
153 None
154 }
155 };
156 let first_recv_barrier_epoch = first_recv_barrier.epoch;
157 let initial_backfill_paused =
158 first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
159 yield Message::Barrier(first_recv_barrier);
160 let mut backfill_state = BackfillState::new(
161 self.progress_state_table,
162 first_recv_barrier_epoch,
163 self.upstream_table.pk_serializer().clone(),
164 )
165 .await?;
166
167 let (mut barrier_epoch, mut need_report_finish) = {
168 if let Some(snapshot_epoch) = should_snapshot_backfill {
169 let table_id_str = format!("{}", self.upstream_table.table_id());
170 let actor_id_str = format!("{}", self.actor_ctx.id);
171
172 let consume_upstream_row_count = self
173 .metrics
174 .snapshot_backfill_consume_row_count
175 .with_guarded_label_values(&[
176 table_id_str.as_str(),
177 actor_id_str.as_str(),
178 "consume_upstream",
179 ]);
180
181 let mut upstream_buffer = UpstreamBuffer::new(
182 &mut self.upstream,
183 first_upstream_barrier,
184 consume_upstream_row_count,
185 );
186
187 let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
189 < snapshot_epoch
190 {
191 trace!(
192 table_id = %self.upstream_table.table_id(),
193 snapshot_epoch,
194 barrier_epoch = ?first_recv_barrier_epoch,
195 "start consuming snapshot"
196 );
197 {
198 let consuming_snapshot_row_count = self
199 .metrics
200 .snapshot_backfill_consume_row_count
201 .with_guarded_label_values(&[
202 table_id_str.as_str(),
203 actor_id_str.as_str(),
204 "consuming_snapshot",
205 ]);
206 let snapshot_stream = make_consume_snapshot_stream(
207 &self.upstream_table,
208 snapshot_epoch,
209 self.chunk_size,
210 &mut self.rate_limit,
211 &mut self.barrier_rx,
212 &mut self.progress,
213 &mut backfill_state,
214 first_recv_barrier_epoch,
215 initial_backfill_paused,
216 &self.actor_ctx,
217 );
218
219 pin_mut!(snapshot_stream);
220
221 while let Some(message) = upstream_buffer
222 .run_future(snapshot_stream.try_next())
223 .await?
224 {
225 if let Message::Chunk(chunk) = &message {
226 consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
227 }
228 yield message;
229 }
230 }
231
232 let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
233 let recv_barrier_epoch = recv_barrier.epoch;
234 assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
235 let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
236 yield Message::Barrier(recv_barrier);
237 post_commit.post_yield_barrier(None).await?;
238 (
239 recv_barrier_epoch,
240 upstream_buffer.start_consuming_log_store(snapshot_epoch),
241 )
242 } else {
243 trace!(
244 table_id = %self.upstream_table.table_id(),
245 snapshot_epoch,
246 barrier_epoch = ?first_recv_barrier_epoch,
247 "skip consuming snapshot"
248 );
249 (
250 first_recv_barrier_epoch,
251 upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
252 )
253 };
254
255 if let Some(mut upstream_buffer) = upstream_buffer {
257 let initial_pending_lag = Duration::from_millis(
258 Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
259 );
260 trace!(
261 ?barrier_epoch,
262 table_id = %self.upstream_table.table_id(),
263 ?initial_pending_lag,
264 "start consuming log store"
265 );
266
267 let consuming_log_store_row_count = self
268 .metrics
269 .snapshot_backfill_consume_row_count
270 .with_guarded_label_values(&[
271 table_id_str.as_str(),
272 actor_id_str.as_str(),
273 "consuming_log_store",
274 ]);
275 let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
276 loop {
277 let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
278 assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
279 let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
280 barrier_epoch = barrier.epoch;
295 if barrier.kind.is_checkpoint() {
296 let pending_non_checkpoint_barrier =
297 take(&mut pending_non_checkpoint_barrier);
298 let end_epoch = barrier_epoch.prev;
299 let start_epoch = pending_non_checkpoint_barrier
300 .first()
301 .map(|epoch| epoch.prev)
302 .unwrap_or(end_epoch);
303 trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
304 let mut stream = upstream_buffer
308 .run_future(make_log_stream(
309 &self.upstream_table,
310 start_epoch,
311 end_epoch,
312 None,
313 self.chunk_size,
314 ))
315 .await?;
316 while let Some(chunk) =
317 upstream_buffer.run_future(stream.try_next()).await?
318 {
319 trace!(
320 ?barrier_epoch,
321 size = chunk.cardinality(),
322 "consume change log yield chunk",
323 );
324 consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
325 yield Message::Chunk(chunk);
326 }
327
328 trace!(?barrier_epoch, "after consume change log");
329
330 stream
331 .for_vnode_pk_progress(|vnode, row_count, progress| {
332 assert_eq!(progress, None);
333 backfill_state.finish_epoch(
334 vnode,
335 barrier.epoch.prev,
336 row_count,
337 );
338 })
339 .await?;
340 } else {
341 pending_non_checkpoint_barrier.push(barrier.epoch);
342 }
343
344 if is_finished {
345 assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
346 self.progress.finish_consuming_log_store(barrier.epoch);
347 } else {
348 self.progress.update_create_mview_log_store_progress(
349 barrier.epoch,
350 upstream_buffer.pending_epoch_lag(),
351 );
352 }
353
354 let post_commit = backfill_state.commit(barrier.epoch).await?;
355 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
356 yield Message::Barrier(barrier);
357 post_commit.post_yield_barrier(None).await?;
358 if update_vnode_bitmap.is_some() {
359 return Err(anyhow!(
360 "should not update vnode bitmap during consuming log store"
361 )
362 .into());
363 }
364
365 if is_finished {
366 assert!(
367 pending_non_checkpoint_barrier.is_empty(),
368 "{pending_non_checkpoint_barrier:?}"
369 );
370 break;
371 }
372 }
373 trace!(
374 ?barrier_epoch,
375 table_id = %self.upstream_table.table_id(),
376 "finish consuming log store"
377 );
378
379 (barrier_epoch, false)
380 } else {
381 trace!(
382 ?barrier_epoch,
383 table_id = %self.upstream_table.table_id(),
384 "skip consuming log store and start consuming upstream directly"
385 );
386
387 (barrier_epoch, true)
388 }
389 } else {
390 backfill_state
391 .latest_progress()
392 .for_each(|(vnode, progress)| {
393 let progress = progress.expect("should not be empty");
394 assert_eq!(
395 progress.epoch, first_upstream_barrier.epoch.prev,
396 "vnode: {:?}",
397 vnode
398 );
399 assert_eq!(
400 progress.progress,
401 EpochBackfillProgress::Consumed,
402 "vnode: {:?}",
403 vnode
404 );
405 });
406 trace!(
407 table_id = %self.upstream_table.table_id(),
408 "skip backfill"
409 );
410 assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
411 (first_upstream_barrier.epoch, true)
412 }
413 };
414 let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
415 let mut epoch_row_count = 0;
416 while let Some(msg) = upstream.try_next().await? {
418 match msg {
419 Message::Barrier(barrier) => {
420 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
421 self.upstream_table
422 .vnodes()
423 .iter_vnodes()
424 .for_each(|vnode| {
425 backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
428 });
429 epoch_row_count = 0;
430 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
431 barrier_epoch = barrier.epoch;
432 if need_report_finish {
433 need_report_finish = false;
434 self.progress.finish_consuming_log_store(barrier_epoch);
435 }
436 let post_commit = backfill_state.commit(barrier.epoch).await?;
437 yield Message::Barrier(barrier);
438 if let Some(new_vnode_bitmap) =
439 post_commit.post_yield_barrier(update_vnode_bitmap).await?
440 {
441 let _prev_vnode_bitmap =
442 self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
443 backfill_state
444 .latest_progress()
445 .for_each(|(vnode, progress)| {
446 let progress = progress.expect("should not be empty");
447 assert_eq!(
448 progress.epoch, barrier_epoch.prev,
449 "vnode {:?} has unexpected progress epoch",
450 vnode
451 );
452 assert_eq!(
453 progress.progress,
454 EpochBackfillProgress::Consumed,
455 "vnode {:?} has unexpected progress",
456 vnode
457 );
458 });
459 }
460 }
461 msg => {
462 if let Message::Chunk(chunk) = &msg {
463 epoch_row_count += chunk.cardinality();
464 }
465 yield msg;
466 }
467 }
468 }
469 }
470}
471
472impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
473 fn execute(self: Box<Self>) -> BoxedMessageStream {
474 let output_indices = self.output_indices.clone();
475 self.execute_inner()
476 .filter_map(move |result| {
477 ready({
478 match result {
479 Ok(message) => mapping_message(message, &output_indices).map(Ok),
480 Err(e) => Some(Err(e)),
481 }
482 })
483 })
484 .boxed()
485 }
486}
487
488struct ConsumingSnapshot;
489struct ConsumingLogStore;
490
491#[derive(Debug)]
492struct PendingBarriers {
493 first_upstream_barrier_epoch: EpochPair,
494
495 pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
498
499 checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
503}
504
505impl PendingBarriers {
506 fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
507 Self {
508 first_upstream_barrier_epoch: first_upstream_barrier.epoch,
509 pending_non_checkpoint_barriers: Default::default(),
510 checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
511 first_upstream_barrier,
512 ])]),
513 }
514 }
515
516 fn add(&mut self, barrier: DispatcherBarrier) {
517 let is_checkpoint = barrier.kind.is_checkpoint();
518 self.pending_non_checkpoint_barriers.push_front(barrier);
519 if is_checkpoint {
520 self.checkpoint_barrier_groups
521 .push_front(take(&mut self.pending_non_checkpoint_barriers));
522 }
523 }
524
525 fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
526 self.checkpoint_barrier_groups.pop_back()
527 }
528
529 fn consume_epoch(&mut self, epoch: EpochPair) {
530 let barriers = self
531 .checkpoint_barrier_groups
532 .back_mut()
533 .expect("non-empty");
534 let oldest_upstream_barrier = barriers.back().expect("non-empty");
535 assert!(
536 oldest_upstream_barrier.epoch.prev >= epoch.prev,
537 "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
538 oldest_upstream_barrier.epoch,
539 epoch
540 );
541 if oldest_upstream_barrier.epoch.prev == epoch.prev {
542 assert_eq!(oldest_upstream_barrier.epoch, epoch);
543 barriers.pop_back();
544 if barriers.is_empty() {
545 self.checkpoint_barrier_groups.pop_back();
546 }
547 }
548 }
549
550 fn latest_epoch(&self) -> Option<EpochPair> {
551 self.pending_non_checkpoint_barriers
552 .front()
553 .or_else(|| {
554 self.checkpoint_barrier_groups
555 .front()
556 .and_then(|barriers| barriers.front())
557 })
558 .map(|barrier| barrier.epoch)
559 }
560
561 fn checkpoint_epoch_count(&self) -> usize {
562 self.checkpoint_barrier_groups.len()
563 }
564
565 fn has_checkpoint_epoch(&self) -> bool {
566 !self.checkpoint_barrier_groups.is_empty()
567 }
568}
569
570struct UpstreamBuffer<'a, S> {
571 upstream: &'a mut MergeExecutorInput,
572 max_pending_epoch_lag: u64,
573 consumed_epoch: u64,
574 upstream_pending_barriers: PendingBarriers,
576 is_polling_epoch_data: bool,
581 consume_upstream_row_count: LabelGuardedIntCounter,
582 _phase: S,
583}
584
585impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
586 fn new(
587 upstream: &'a mut MergeExecutorInput,
588 first_upstream_barrier: DispatcherBarrier,
589 consume_upstream_row_count: LabelGuardedIntCounter,
590 ) -> Self {
591 Self {
592 upstream,
593 is_polling_epoch_data: false,
594 consume_upstream_row_count,
595 upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
596 max_pending_epoch_lag: u64::MAX,
598 consumed_epoch: 0,
599 _phase: ConsumingSnapshot {},
600 }
601 }
602
603 fn start_consuming_log_store(
604 mut self,
605 consumed_epoch: u64,
606 ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
607 if self
608 .upstream_pending_barriers
609 .first_upstream_barrier_epoch
610 .prev
611 == consumed_epoch
612 {
613 assert_eq!(
614 1,
615 self.upstream_pending_barriers
616 .pop()
617 .expect("non-empty")
618 .len()
619 );
620 }
621 let max_pending_epoch_lag = self.pending_epoch_lag();
622 let buffer = UpstreamBuffer {
623 upstream: self.upstream,
624 upstream_pending_barriers: self.upstream_pending_barriers,
625 max_pending_epoch_lag,
626 is_polling_epoch_data: self.is_polling_epoch_data,
627 consume_upstream_row_count: self.consume_upstream_row_count,
628 consumed_epoch,
629 _phase: ConsumingLogStore {},
630 };
631 if buffer.is_finished() {
632 None
633 } else {
634 Some(buffer)
635 }
636 }
637}
638
639impl<S> UpstreamBuffer<'_, S> {
640 fn can_consume_upstream(&self) -> bool {
641 self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
642 }
643
644 async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
645 {
646 loop {
647 if let Err(e) = try {
648 if !self.can_consume_upstream() {
649 sleep(Duration::from_secs(30)).await;
651 warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
652 return pending().await;
653 }
654 self.consume_until_next_checkpoint_barrier().await?;
655 } {
656 break e;
657 }
658 }
659 }
660 }
661
662 async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
664 loop {
665 let msg: DispatcherMessage = self
666 .upstream
667 .try_next()
668 .await?
669 .ok_or_else(|| anyhow!("end of upstream"))?;
670 match msg {
671 DispatcherMessage::Chunk(chunk) => {
672 self.is_polling_epoch_data = true;
673 self.consume_upstream_row_count
674 .inc_by(chunk.cardinality() as _);
675 }
676 DispatcherMessage::Barrier(barrier) => {
677 let is_checkpoint = barrier.kind.is_checkpoint();
678 self.upstream_pending_barriers.add(barrier);
679 if is_checkpoint {
680 self.is_polling_epoch_data = false;
681 break;
682 } else {
683 self.is_polling_epoch_data = true;
684 }
685 }
686 DispatcherMessage::Watermark(_) => {
687 self.is_polling_epoch_data = true;
688 }
689 }
690 }
691 Ok(())
692 }
693}
694
695impl UpstreamBuffer<'_, ConsumingLogStore> {
696 #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
697 async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
698 assert!(!self.is_finished());
699 if !self.upstream_pending_barriers.has_checkpoint_epoch() {
700 assert!(self.is_polling_epoch_data);
703 self.consume_until_next_checkpoint_barrier().await?;
704 assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
705 }
706 self.upstream_pending_barriers.consume_epoch(epoch);
707
708 {
709 {
710 let prev_epoch = epoch.prev;
711 assert!(self.consumed_epoch < prev_epoch);
712 let elapsed_epoch = prev_epoch - self.consumed_epoch;
713 self.consumed_epoch = prev_epoch;
714 if self.upstream_pending_barriers.has_checkpoint_epoch() {
715 while self.can_consume_upstream()
717 && let Some(result) =
718 self.consume_until_next_checkpoint_barrier().now_or_never()
719 {
720 result?;
721 }
722 }
723 self.max_pending_epoch_lag = min(
727 self.pending_epoch_lag(),
728 self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
729 );
730 }
731 }
732 Ok(self.is_finished())
733 }
734
735 fn is_finished(&self) -> bool {
736 if cfg!(debug_assertions) && !self.is_polling_epoch_data {
737 assert!(
738 self.upstream_pending_barriers
739 .pending_non_checkpoint_barriers
740 .is_empty()
741 )
742 }
743 !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
744 }
745}
746
747impl<S> UpstreamBuffer<'_, S> {
748 async fn run_future<T, E: Into<StreamExecutorError>>(
751 &mut self,
752 future: impl Future<Output = Result<T, E>>,
753 ) -> StreamExecutorResult<T> {
754 select! {
755 biased;
756 e = self.concurrently_consume_upstream() => {
757 Err(e)
758 }
759 result = future => {
761 result.map_err(Into::into)
762 }
763 }
764 }
765
766 fn pending_epoch_lag(&self) -> u64 {
767 self.upstream_pending_barriers
768 .latest_epoch()
769 .map(|epoch| {
770 epoch
771 .prev
772 .checked_sub(self.consumed_epoch)
773 .expect("pending epoch must be later than consumed_epoch")
774 })
775 .unwrap_or(0)
776 }
777}
778
779#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
780async fn make_log_stream(
781 upstream_table: &BatchTable<impl StateStore>,
782 start_epoch: u64,
783 end_epoch: u64,
784 start_pk: Option<OwnedRow>,
785 chunk_size: usize,
786) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
787 let data_types = upstream_table.schema().data_types();
788 let start_pk = start_pk.as_ref();
789 let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
791 upstream_table
792 .batch_iter_vnode_log(
793 start_epoch,
794 HummockReadEpoch::Committed(end_epoch),
795 start_pk,
796 vnode,
797 )
798 .map_ok(move |stream| {
799 let stream = stream.map_err(Into::into);
800 (vnode, stream, 0)
801 })
802 }))
803 .await?;
804 let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
805 Ok(VnodeStream::new(
806 vnode_streams,
807 upstream_table.pk_in_output_indices().expect("should exist"),
808 builder,
809 ))
810}
811
812async fn make_snapshot_stream(
813 upstream_table: &BatchTable<impl StateStore>,
814 snapshot_epoch: u64,
815 backfill_state: &BackfillState<impl StateStore>,
816 rate_limit: RateLimit,
817 chunk_size: usize,
818 snapshot_rebuild_interval: Duration,
819) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
820 let data_types = upstream_table.schema().data_types();
821 let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
822 move |(vnode, progress)| {
823 let start_pk = match progress {
824 None => Some((None, 0)),
825 Some(VnodeBackfillProgress {
826 row_count,
827 progress: EpochBackfillProgress::Consuming { latest_pk },
828 ..
829 }) => Some((Some(latest_pk), *row_count)),
830 Some(VnodeBackfillProgress {
831 progress: EpochBackfillProgress::Consumed,
832 ..
833 }) => None,
834 };
835 start_pk.map(|(start_pk, row_count)| {
836 upstream_table
837 .batch_iter_vnode(
838 HummockReadEpoch::Committed(snapshot_epoch),
839 start_pk,
840 vnode,
841 PrefetchOptions::prefetch_for_large_range_scan(),
842 snapshot_rebuild_interval,
843 )
844 .map_ok(move |stream| {
845 let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
846 (vnode, stream, row_count)
847 })
848 })
849 },
850 ))
851 .await?;
852 let builder = create_builder(rate_limit, chunk_size, data_types.clone());
853 Ok(VnodeStream::new(
854 vnode_streams,
855 upstream_table.pk_in_output_indices().expect("should exist"),
856 builder,
857 ))
858}
859
860#[expect(clippy::too_many_arguments)]
861#[try_stream(ok = Message, error = StreamExecutorError)]
862async fn make_consume_snapshot_stream<'a, S: StateStore>(
863 upstream_table: &'a BatchTable<S>,
864 snapshot_epoch: u64,
865 chunk_size: usize,
866 rate_limit: &'a mut RateLimit,
867 barrier_rx: &'a mut UnboundedReceiver<Barrier>,
868 progress: &'a mut CreateMviewProgressReporter,
869 backfill_state: &'a mut BackfillState<S>,
870 first_recv_barrier_epoch: EpochPair,
871 initial_backfill_paused: bool,
872 actor_ctx: &'a ActorContextRef,
873) {
874 let mut barrier_epoch = first_recv_barrier_epoch;
875
876 let mut snapshot_stream = make_snapshot_stream(
878 upstream_table,
879 snapshot_epoch,
880 &*backfill_state,
881 *rate_limit,
882 chunk_size,
883 actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
884 )
885 .await?;
886
887 async fn select_barrier_and_snapshot_stream(
888 barrier_rx: &mut UnboundedReceiver<Barrier>,
889 snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
890 throttle_snapshot_stream: bool,
891 backfill_paused: bool,
892 ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
893 select!(
894 result = receive_next_barrier(barrier_rx) => {
895 Ok(Either::Left(result?))
896 },
897 result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
898 Ok(Either::Right(result?))
899 }
900 )
901 }
902
903 let mut count = 0;
904 let mut epoch_row_count = 0;
905 let mut backfill_paused = initial_backfill_paused;
906 loop {
907 let throttle_snapshot_stream = epoch_row_count as u64 >= rate_limit.to_u64();
908 match select_barrier_and_snapshot_stream(
909 barrier_rx,
910 &mut snapshot_stream,
911 throttle_snapshot_stream,
912 backfill_paused,
913 )
914 .await?
915 {
916 Either::Left(barrier) => {
917 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
918 barrier_epoch = barrier.epoch;
919
920 if barrier_epoch.curr >= snapshot_epoch {
921 return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
922 }
923 if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
924 backfill_paused = false;
925 }
926 if let Some(chunk) = snapshot_stream.consume_builder() {
927 count += chunk.cardinality();
928 epoch_row_count += chunk.cardinality();
929 yield Message::Chunk(chunk);
930 }
931 snapshot_stream
932 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
933 if let Some(pk) = pk_progress {
934 backfill_state.update_epoch_progress(
935 vnode,
936 snapshot_epoch,
937 row_count,
938 pk,
939 );
940 } else {
941 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
942 }
943 })
944 .await?;
945 let post_commit = backfill_state.commit(barrier.epoch).await?;
946 trace!(?barrier_epoch, count, epoch_row_count, "update progress");
947 progress.update(barrier_epoch, barrier_epoch.prev, count as _);
948 epoch_row_count = 0;
949
950 let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
951 if let Mutation::Throttle(config) = &**m
952 && let Some(config) = config.get(&actor_ctx.fragment_id)
953 && config.throttle_type() == PbThrottleType::Backfill
954 {
955 Some(config.rate_limit)
956 } else {
957 None
958 }
959 });
960 yield Message::Barrier(barrier);
961 post_commit.post_yield_barrier(None).await?;
962
963 if let Some(new_rate_limit) = new_rate_limit {
964 let new_rate_limit = new_rate_limit.into();
965 *rate_limit = new_rate_limit;
966 snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
967 }
968 }
969 Either::Right(Some(chunk)) => {
970 if backfill_paused {
971 return Err(
972 anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
973 );
974 }
975 count += chunk.cardinality();
976 epoch_row_count += chunk.cardinality();
977 yield Message::Chunk(chunk);
978 }
979 Either::Right(None) => {
980 break;
981 }
982 }
983 }
984
985 let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
987 assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
988 barrier_epoch = barrier_to_report_finish.epoch;
989 trace!(?barrier_epoch, count, "report finish");
990 snapshot_stream
991 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
992 assert_eq!(pk_progress, None);
993 backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
994 })
995 .await?;
996 let post_commit = backfill_state.commit(barrier_epoch).await?;
997 progress.finish(barrier_epoch, count as _);
998 yield Message::Barrier(barrier_to_report_finish);
999 post_commit.post_yield_barrier(None).await?;
1000
1001 loop {
1003 let barrier = receive_next_barrier(barrier_rx).await?;
1004 assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1005 barrier_epoch = barrier.epoch;
1006 let post_commit = backfill_state.commit(barrier.epoch).await?;
1007 yield Message::Barrier(barrier);
1008 post_commit.post_yield_barrier(None).await?;
1009 if barrier_epoch.curr == snapshot_epoch {
1010 break;
1011 }
1012 }
1013 trace!(?barrier_epoch, "finish consuming snapshot");
1014}