1use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
23
24use crate::MetaResult;
25use crate::barrier::CreateStreamingJobCommandInfo;
26use crate::barrier::backfill_order_control::BackfillOrderState;
27use crate::barrier::info::InflightStreamingJobInfo;
28use crate::controller::fragment::InflightFragmentInfo;
29use crate::manager::MetadataManager;
30use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
31use crate::stream::{SourceChange, SourceManagerRef};
32
33type ConsumedRows = u64;
34type BufferedRows = u64;
35
36#[derive(Clone, Copy, Debug)]
37enum BackfillState {
38 Init,
39 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
40 Done(ConsumedRows, BufferedRows),
41}
42
43#[derive(Debug, Default)]
45pub(super) struct PendingBackfillFragments {
46 pub next_backfill_nodes: Vec<FragmentId>,
48 pub truncate_locality_provider_state_tables: Vec<TableId>,
50}
51
52#[derive(Debug)]
54pub(super) struct Progress {
55 job_id: JobId,
56 states: HashMap<ActorId, BackfillState>,
58 backfill_order_state: BackfillOrderState,
59 done_count: usize,
60
61 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
63
64 upstream_mv_count: HashMap<TableId, usize>,
69 upstream_mvs_total_key_count: u64,
71 mv_backfill_consumed_rows: u64,
72 source_backfill_consumed_rows: u64,
73 mv_backfill_buffered_rows: u64,
76}
77
78impl Progress {
79 fn new(
81 job_id: JobId,
82 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
83 upstream_mv_count: HashMap<TableId, usize>,
84 upstream_total_key_count: u64,
85 backfill_order_state: BackfillOrderState,
86 ) -> Self {
87 let mut states = HashMap::new();
88 let mut backfill_upstream_types = HashMap::new();
89 for (actor, backfill_upstream_type) in actors {
90 states.insert(actor, BackfillState::Init);
91 backfill_upstream_types.insert(actor, backfill_upstream_type);
92 }
93 assert!(!states.is_empty());
94
95 Self {
96 job_id,
97 states,
98 backfill_upstream_types,
99 done_count: 0,
100 upstream_mv_count,
101 upstream_mvs_total_key_count: upstream_total_key_count,
102 mv_backfill_consumed_rows: 0,
103 source_backfill_consumed_rows: 0,
104 mv_backfill_buffered_rows: 0,
105 backfill_order_state,
106 }
107 }
108
109 fn update(
112 &mut self,
113 actor: ActorId,
114 new_state: BackfillState,
115 upstream_total_key_count: u64,
116 ) -> PendingBackfillFragments {
117 let mut result = PendingBackfillFragments::default();
118 self.upstream_mvs_total_key_count = upstream_total_key_count;
119 let total_actors = self.states.len();
120 let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
121 tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
122 return result;
123 };
124
125 let mut old_consumed_row = 0;
126 let mut new_consumed_row = 0;
127 let mut old_buffered_row = 0;
128 let mut new_buffered_row = 0;
129 let Some(prev_state) = self.states.remove(&actor) else {
130 tracing::warn!(%actor, "receive progress for actor not in state map");
131 return result;
132 };
133 match prev_state {
134 BackfillState::Init => {}
135 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
136 old_consumed_row = consumed_rows;
137 old_buffered_row = buffered_rows;
138 }
139 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
140 };
141 match &new_state {
142 BackfillState::Init => {}
143 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
144 new_consumed_row = *consumed_rows;
145 new_buffered_row = *buffered_rows;
146 }
147 BackfillState::Done(consumed_rows, buffered_rows) => {
148 tracing::debug!("actor {} done", actor);
149 new_consumed_row = *consumed_rows;
150 new_buffered_row = *buffered_rows;
151 self.done_count += 1;
152 let before_backfill_nodes = self
153 .backfill_order_state
154 .current_backfill_node_fragment_ids();
155 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
156 let after_backfill_nodes = self
157 .backfill_order_state
158 .current_backfill_node_fragment_ids();
159 let last_backfill_nodes_iter = before_backfill_nodes
161 .into_iter()
162 .filter(|x| !after_backfill_nodes.contains(x));
163 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
164 .filter_map(|fragment_id| {
165 self.backfill_order_state
166 .get_locality_fragment_state_table_mapping()
167 .get(&fragment_id)
168 })
169 .flatten()
170 .copied()
171 .collect();
172 tracing::debug!(
173 "{} actors out of {} complete",
174 self.done_count,
175 total_actors,
176 );
177 }
178 };
179 debug_assert!(
180 new_consumed_row >= old_consumed_row,
181 "backfill progress should not go backward"
182 );
183 debug_assert!(
184 new_buffered_row >= old_buffered_row,
185 "backfill progress should not go backward"
186 );
187 match backfill_upstream_type {
188 BackfillUpstreamType::MView => {
189 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
190 }
191 BackfillUpstreamType::Source => {
192 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
193 }
194 BackfillUpstreamType::Values => {
195 }
197 BackfillUpstreamType::LocalityProvider => {
198 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
201 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
202 }
203 }
204 self.states.insert(actor, new_state);
205 result
206 }
207
208 fn is_done(&self) -> bool {
210 tracing::trace!(
211 "Progress::is_done? {}, {}, {:?}",
212 self.done_count,
213 self.states.len(),
214 self.states
215 );
216 self.done_count == self.states.len()
217 }
218
219 fn calculate_progress(&self) -> String {
221 if self.is_done() || self.states.is_empty() {
222 return "100%".to_owned();
223 }
224 let mut mv_count = 0;
225 let mut source_count = 0;
226 for backfill_upstream_type in self.backfill_upstream_types.values() {
227 match backfill_upstream_type {
228 BackfillUpstreamType::MView => mv_count += 1,
229 BackfillUpstreamType::Source => source_count += 1,
230 BackfillUpstreamType::Values => (),
231 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
233 }
234
235 let mv_progress = (mv_count > 0).then_some({
236 let total_rows_to_consume =
239 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
240 if total_rows_to_consume == 0 {
241 "99.99%".to_owned()
242 } else {
243 let mut progress =
244 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
245 if progress > 1.0 {
246 progress = 0.9999;
247 }
248 format!(
249 "{:.2}% ({}/{})",
250 progress * 100.0,
251 self.mv_backfill_consumed_rows,
252 total_rows_to_consume
253 )
254 }
255 });
256 let source_progress = (source_count > 0).then_some(format!(
257 "{} rows consumed",
258 self.source_backfill_consumed_rows
259 ));
260 match (mv_progress, source_progress) {
261 (Some(mv_progress), Some(source_progress)) => {
262 format!(
263 "MView Backfill: {}, Source Backfill: {}",
264 mv_progress, source_progress
265 )
266 }
267 (Some(mv_progress), None) => mv_progress,
268 (None, Some(source_progress)) => source_progress,
269 (None, None) => "Unknown".to_owned(),
270 }
271 }
272}
273
274pub struct TrackingJob {
281 job_id: JobId,
282 is_recovered: bool,
283 source_change: Option<SourceChange>,
284}
285
286impl std::fmt::Display for TrackingJob {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 write!(
289 f,
290 "{}{}",
291 self.job_id,
292 if self.is_recovered { "<recovered>" } else { "" }
293 )
294 }
295}
296
297impl TrackingJob {
298 pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
300 Self {
301 job_id: stream_job_fragments.stream_job_id,
302 is_recovered: false,
303 source_change: Some(SourceChange::CreateJobFinished {
304 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
305 }),
306 }
307 }
308
309 pub(crate) fn recovered(
311 job_id: JobId,
312 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
313 ) -> Self {
314 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
315 fragment_infos
316 .iter()
317 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
318 );
319 let source_change = if source_backfill_fragments.is_empty() {
320 None
321 } else {
322 Some(SourceChange::CreateJobFinished {
323 finished_backfill_fragments: source_backfill_fragments,
324 })
325 };
326 Self {
327 job_id,
328 is_recovered: true,
329 source_change,
330 }
331 }
332
333 pub(crate) fn job_id(&self) -> JobId {
334 self.job_id
335 }
336
337 pub(crate) async fn finish(
339 self,
340 metadata_manager: &MetadataManager,
341 source_manager: &SourceManagerRef,
342 ) -> MetaResult<()> {
343 metadata_manager
344 .catalog_controller
345 .finish_streaming_job(self.job_id)
346 .await?;
347 if let Some(source_change) = self.source_change {
348 source_manager.apply_source_change(source_change).await;
349 }
350 Ok(())
351 }
352}
353
354impl std::fmt::Debug for TrackingJob {
355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 if !self.is_recovered {
357 write!(f, "TrackingJob::New({})", self.job_id)
358 } else {
359 write!(f, "TrackingJob::Recovered({})", self.job_id)
360 }
361 }
362}
363
364#[derive(Debug, Default)]
366pub(super) struct StagingCommitInfo {
367 pub finished_jobs: Vec<TrackingJob>,
369 pub table_ids_to_truncate: Vec<TableId>,
371 pub finished_cdc_table_backfill: Vec<JobId>,
372}
373
374pub(super) enum UpdateProgressResult {
375 None,
376 Finished {
378 truncate_locality_provider_state_tables: Vec<TableId>,
379 },
380 BackfillNodeFinished(PendingBackfillFragments),
382}
383
384#[derive(Debug)]
385pub(super) struct CreateMviewProgressTracker {
386 tracking_job: TrackingJob,
387 status: CreateMviewStatus,
388}
389
390#[derive(Debug)]
391enum CreateMviewStatus {
392 Backfilling {
393 progress: Progress,
395
396 pending_backfill_nodes: Vec<FragmentId>,
398
399 table_ids_to_truncate: Vec<TableId>,
401 },
402 Finished {
403 table_ids_to_truncate: Vec<TableId>,
404 },
405}
406
407impl CreateMviewProgressTracker {
408 pub fn recover(
409 creating_job_id: JobId,
410 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
411 backfill_order_state: BackfillOrderState,
412 version_stats: &HummockVersionStats,
413 ) -> Self {
414 {
415 let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
416 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
417 let status = if actors.is_empty() {
418 CreateMviewStatus::Finished {
419 table_ids_to_truncate: vec![],
420 }
421 } else {
422 let mut states = HashMap::new();
423 let mut backfill_upstream_types = HashMap::new();
424
425 for (actor, backfill_upstream_type) in actors {
426 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
427 backfill_upstream_types.insert(actor, backfill_upstream_type);
428 }
429
430 let progress = Self::recover_progress(
431 creating_job_id,
432 states,
433 backfill_upstream_types,
434 StreamJobFragments::upstream_table_counts_impl(
435 fragment_infos.values().map(|fragment| &fragment.nodes),
436 ),
437 version_stats,
438 backfill_order_state,
439 );
440 let pending_backfill_nodes = progress
441 .backfill_order_state
442 .current_backfill_node_fragment_ids();
443 CreateMviewStatus::Backfilling {
444 progress,
445 pending_backfill_nodes,
446 table_ids_to_truncate: vec![],
447 }
448 };
449 Self {
450 tracking_job,
451 status,
452 }
453 }
454 }
455
456 fn recover_progress(
462 job_id: JobId,
463 states: HashMap<ActorId, BackfillState>,
464 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
465 upstream_mv_count: HashMap<TableId, usize>,
466 version_stats: &HummockVersionStats,
467 backfill_order_state: BackfillOrderState,
468 ) -> Progress {
469 let upstream_mvs_total_key_count =
470 calculate_total_key_count(&upstream_mv_count, version_stats);
471 Progress {
472 job_id,
473 states,
474 backfill_order_state,
475 backfill_upstream_types,
476 done_count: 0, upstream_mv_count,
478 upstream_mvs_total_key_count,
479 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, }
483 }
484
485 pub fn gen_backfill_progress(&self) -> String {
486 match &self.status {
487 CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
488 CreateMviewStatus::Finished { .. } => "100%".to_owned(),
489 }
490 }
491
492 pub(super) fn apply_progress(
495 &mut self,
496 create_mview_progress: &CreateMviewProgress,
497 version_stats: &HummockVersionStats,
498 ) {
499 let CreateMviewStatus::Backfilling {
500 progress,
501 pending_backfill_nodes,
502 table_ids_to_truncate,
503 } = &mut self.status
504 else {
505 tracing::warn!(
506 "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
507 );
508 return;
509 };
510 {
511 {
513 match progress.apply(create_mview_progress, version_stats) {
515 UpdateProgressResult::None => {
516 tracing::trace!(?progress, "update progress");
517 }
518 UpdateProgressResult::Finished {
519 truncate_locality_provider_state_tables,
520 } => {
521 let mut table_ids_to_truncate = take(table_ids_to_truncate);
522 table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
523 tracing::trace!(?progress, "finish progress");
524 self.status = CreateMviewStatus::Finished {
525 table_ids_to_truncate,
526 };
527 }
528 UpdateProgressResult::BackfillNodeFinished(pending) => {
529 table_ids_to_truncate
530 .extend(pending.truncate_locality_provider_state_tables.clone());
531 tracing::trace!(
532 ?progress,
533 next_backfill_nodes = ?pending.next_backfill_nodes,
534 "start next backfill node"
535 );
536 pending_backfill_nodes.extend(pending.next_backfill_nodes);
537 }
538 }
539 }
540 }
541 }
542
543 pub fn refresh_after_reschedule(
545 &mut self,
546 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
547 version_stats: &HummockVersionStats,
548 ) {
549 let CreateMviewStatus::Backfilling {
550 progress,
551 pending_backfill_nodes,
552 ..
553 } = &mut self.status
554 else {
555 return;
556 };
557
558 let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
559 fragment_infos
560 .values()
561 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
562 );
563
564 #[cfg(debug_assertions)]
565 {
566 use std::collections::HashSet;
567 let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
568 let new_actor_ids: HashSet<_> = new_tracking_actors
569 .iter()
570 .map(|(actor_id, _)| *actor_id)
571 .collect();
572 debug_assert!(
573 old_actor_ids.is_disjoint(&new_actor_ids),
574 "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
575 );
576 }
577
578 let mut new_states = HashMap::new();
579 let mut new_backfill_types = HashMap::new();
580 for (actor_id, upstream_type) in new_tracking_actors {
581 new_states.insert(actor_id, BackfillState::Init);
582 new_backfill_types.insert(actor_id, upstream_type);
583 }
584
585 let fragment_actors: HashMap<_, _> = fragment_infos
586 .iter()
587 .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
588 .collect();
589
590 let newly_scheduled = progress
591 .backfill_order_state
592 .refresh_actors(&fragment_actors);
593
594 progress.backfill_upstream_types = new_backfill_types;
595 progress.states = new_states;
596 progress.done_count = 0;
597
598 progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
599 fragment_infos.values().map(|fragment| &fragment.nodes),
600 );
601 progress.upstream_mvs_total_key_count =
602 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
603
604 progress.mv_backfill_consumed_rows = 0;
605 progress.source_backfill_consumed_rows = 0;
606 progress.mv_backfill_buffered_rows = 0;
607
608 let mut pending = progress
609 .backfill_order_state
610 .current_backfill_node_fragment_ids();
611 pending.extend(newly_scheduled);
612 pending.sort_unstable();
613 pending.dedup();
614 *pending_backfill_nodes = pending;
615 }
616
617 pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
618 match &mut self.status {
619 CreateMviewStatus::Backfilling {
620 pending_backfill_nodes,
621 ..
622 } => Some(pending_backfill_nodes.drain(..)),
623 CreateMviewStatus::Finished { .. } => None,
624 }
625 .into_iter()
626 .flatten()
627 }
628
629 pub(super) fn collect_staging_commit_info(
630 &mut self,
631 ) -> (bool, impl Iterator<Item = TableId> + '_) {
632 let (is_finished, table_ids) = match &mut self.status {
633 CreateMviewStatus::Backfilling {
634 table_ids_to_truncate,
635 ..
636 } => (false, table_ids_to_truncate),
637 CreateMviewStatus::Finished {
638 table_ids_to_truncate,
639 ..
640 } => (true, table_ids_to_truncate),
641 };
642 (is_finished, table_ids.drain(..))
643 }
644
645 pub(super) fn is_finished(&self) -> bool {
646 matches!(self.status, CreateMviewStatus::Finished { .. })
647 }
648
649 pub(super) fn into_tracking_job(self) -> TrackingJob {
650 let CreateMviewStatus::Finished { .. } = self.status else {
651 panic!("should be called when finished");
652 };
653 self.tracking_job
654 }
655
656 pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
660 tracing::trace!(?info, "add job to track");
661 let CreateStreamingJobCommandInfo {
662 stream_job_fragments,
663 fragment_backfill_ordering,
664 locality_fragment_state_table_mapping,
665 ..
666 } = info;
667 let job_id = stream_job_fragments.stream_job_id();
668 let actors = stream_job_fragments.tracking_progress_actor_ids();
669 let tracking_job = TrackingJob::new(&info.stream_job_fragments);
670 if actors.is_empty() {
671 return Self {
673 tracking_job,
674 status: CreateMviewStatus::Finished {
675 table_ids_to_truncate: vec![],
676 },
677 };
678 }
679
680 let upstream_mv_count = stream_job_fragments.upstream_table_counts();
681 let upstream_total_key_count: u64 =
682 calculate_total_key_count(&upstream_mv_count, version_stats);
683
684 let backfill_order_state = BackfillOrderState::new(
685 fragment_backfill_ordering,
686 stream_job_fragments,
687 locality_fragment_state_table_mapping.clone(),
688 );
689 let progress = Progress::new(
690 job_id,
691 actors,
692 upstream_mv_count,
693 upstream_total_key_count,
694 backfill_order_state,
695 );
696 let pending_backfill_nodes = progress
697 .backfill_order_state
698 .current_backfill_node_fragment_ids();
699 Self {
700 tracking_job,
701 status: CreateMviewStatus::Backfilling {
702 progress,
703 pending_backfill_nodes,
704 table_ids_to_truncate: vec![],
705 },
706 }
707 }
708}
709
710impl Progress {
711 fn apply(
715 &mut self,
716 progress: &CreateMviewProgress,
717 version_stats: &HummockVersionStats,
718 ) -> UpdateProgressResult {
719 tracing::trace!(?progress, "update progress");
720 let actor = progress.backfill_actor_id;
721 let job_id = self.job_id;
722
723 let new_state = if progress.done {
724 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
725 } else {
726 BackfillState::ConsumingUpstream(
727 progress.consumed_epoch.into(),
728 progress.consumed_rows,
729 progress.buffered_rows,
730 )
731 };
732
733 {
734 {
735 let progress_state = self;
736
737 let upstream_total_key_count: u64 =
738 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
739
740 tracing::trace!(%job_id, "updating progress for table");
741 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
742
743 if progress_state.is_done() {
744 tracing::debug!(
745 %job_id,
746 "all actors done for creating mview!",
747 );
748
749 let PendingBackfillFragments {
750 next_backfill_nodes,
751 truncate_locality_provider_state_tables,
752 } = pending;
753
754 assert!(next_backfill_nodes.is_empty());
755 UpdateProgressResult::Finished {
756 truncate_locality_provider_state_tables,
757 }
758 } else if !pending.next_backfill_nodes.is_empty()
759 || !pending.truncate_locality_provider_state_tables.is_empty()
760 {
761 UpdateProgressResult::BackfillNodeFinished(pending)
762 } else {
763 UpdateProgressResult::None
764 }
765 }
766 }
767 }
768}
769
770fn calculate_total_key_count(
771 table_count: &HashMap<TableId, usize>,
772 version_stats: &HummockVersionStats,
773) -> u64 {
774 table_count
775 .iter()
776 .map(|(table_id, count)| {
777 assert_ne!(*count, 0);
778 *count as u64
779 * version_stats
780 .table_stats
781 .get(table_id)
782 .map_or(0, |stat| stat.total_key_count as u64)
783 })
784 .sum()
785}
786
787#[cfg(test)]
788mod tests {
789 use std::collections::HashSet;
790
791 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
792 use risingwave_common::id::WorkerId;
793 use risingwave_meta_model::fragment::DistributionType;
794 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
795
796 use super::*;
797 use crate::controller::fragment::InflightActorInfo;
798
799 fn sample_inflight_fragment(
800 fragment_id: FragmentId,
801 actor_ids: &[ActorId],
802 flag: FragmentTypeFlag,
803 ) -> InflightFragmentInfo {
804 let mut fragment_type_mask = FragmentTypeMask::empty();
805 fragment_type_mask.add(flag);
806 InflightFragmentInfo {
807 fragment_id,
808 distribution_type: DistributionType::Single,
809 fragment_type_mask,
810 vnode_count: 0,
811 nodes: PbStreamNode::default(),
812 actors: actor_ids
813 .iter()
814 .map(|actor_id| {
815 (
816 *actor_id,
817 InflightActorInfo {
818 worker_id: WorkerId::new(1),
819 vnode_bitmap: None,
820 splits: vec![],
821 },
822 )
823 })
824 .collect(),
825 state_table_ids: HashSet::new(),
826 }
827 }
828
829 fn sample_progress(actor_id: ActorId) -> Progress {
830 Progress {
831 job_id: JobId::new(1),
832 states: HashMap::from([(actor_id, BackfillState::Init)]),
833 backfill_order_state: BackfillOrderState::default(),
834 done_count: 0,
835 backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
836 upstream_mv_count: HashMap::new(),
837 upstream_mvs_total_key_count: 0,
838 mv_backfill_consumed_rows: 0,
839 source_backfill_consumed_rows: 0,
840 mv_backfill_buffered_rows: 0,
841 }
842 }
843
844 #[test]
845 fn update_ignores_unknown_actor() {
846 let actor_known = ActorId::new(1);
847 let actor_unknown = ActorId::new(2);
848 let mut progress = sample_progress(actor_known);
849
850 let pending = progress.update(
851 actor_unknown,
852 BackfillState::Done(0, 0),
853 progress.upstream_mvs_total_key_count,
854 );
855
856 assert!(pending.next_backfill_nodes.is_empty());
857 assert_eq!(progress.states.len(), 1);
858 assert!(progress.states.contains_key(&actor_known));
859 }
860
861 #[test]
862 fn refresh_rebuilds_tracking_after_reschedule() {
863 let actor_old = ActorId::new(1);
864 let actor_new = ActorId::new(2);
865
866 let progress = Progress {
867 job_id: JobId::new(1),
868 states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
869 backfill_order_state: BackfillOrderState::default(),
870 done_count: 1,
871 backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
872 upstream_mv_count: HashMap::new(),
873 upstream_mvs_total_key_count: 0,
874 mv_backfill_consumed_rows: 5,
875 source_backfill_consumed_rows: 0,
876 mv_backfill_buffered_rows: 0,
877 };
878
879 let mut tracker = CreateMviewProgressTracker {
880 tracking_job: TrackingJob {
881 job_id: JobId::new(1),
882 is_recovered: false,
883 source_change: None,
884 },
885 status: CreateMviewStatus::Backfilling {
886 progress,
887 pending_backfill_nodes: vec![],
888 table_ids_to_truncate: vec![],
889 },
890 };
891
892 let fragment_infos = HashMap::from([(
893 FragmentId::new(10),
894 sample_inflight_fragment(
895 FragmentId::new(10),
896 &[actor_new],
897 FragmentTypeFlag::StreamScan,
898 ),
899 )]);
900
901 tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
902
903 let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
904 panic!("expected backfilling status");
905 };
906 assert!(progress.states.contains_key(&actor_new));
907 assert!(!progress.states.contains_key(&actor_old));
908 assert_eq!(progress.done_count, 0);
909 assert_eq!(progress.mv_backfill_consumed_rows, 0);
910 assert_eq!(progress.source_backfill_consumed_rows, 0);
911 }
912}