1use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::{FragmentTypeFlag, TableId};
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
23
24use crate::MetaResult;
25use crate::barrier::backfill_order_control::BackfillOrderState;
26use crate::barrier::info::InflightStreamingJobInfo;
27use crate::barrier::{CreateStreamingJobCommandInfo, FragmentBackfillProgress};
28use crate::controller::fragment::InflightFragmentInfo;
29use crate::manager::MetadataManager;
30use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
31use crate::stream::{SourceChange, SourceManagerRef};
32
33type ConsumedRows = u64;
34type BufferedRows = u64;
35
36#[derive(Debug, Clone, Copy)]
37pub(crate) struct ActorBackfillProgress {
38 pub(crate) actor_id: ActorId,
39 pub(crate) upstream_type: BackfillUpstreamType,
40 pub(crate) consumed_rows: u64,
41 pub(crate) done: bool,
42}
43
44#[derive(Clone, Copy, Debug)]
45enum BackfillState {
46 Init,
47 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
48 Done(ConsumedRows, BufferedRows),
49}
50
51#[derive(Debug, Default)]
53pub(super) struct PendingBackfillFragments {
54 pub next_backfill_nodes: Vec<FragmentId>,
56 pub truncate_locality_provider_state_tables: Vec<TableId>,
58}
59
60#[derive(Debug)]
62pub(super) struct Progress {
63 job_id: JobId,
64 states: HashMap<ActorId, BackfillState>,
66 backfill_order_state: BackfillOrderState,
67 done_count: usize,
68
69 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
71
72 upstream_mv_count: HashMap<TableId, usize>,
77 upstream_mvs_total_key_count: u64,
79 mv_backfill_consumed_rows: u64,
80 source_backfill_consumed_rows: u64,
81 mv_backfill_buffered_rows: u64,
84}
85
86impl Progress {
87 fn new(
89 job_id: JobId,
90 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
91 upstream_mv_count: HashMap<TableId, usize>,
92 upstream_total_key_count: u64,
93 backfill_order_state: BackfillOrderState,
94 ) -> Self {
95 let mut states = HashMap::new();
96 let mut backfill_upstream_types = HashMap::new();
97 for (actor, backfill_upstream_type) in actors {
98 states.insert(actor, BackfillState::Init);
99 backfill_upstream_types.insert(actor, backfill_upstream_type);
100 }
101 assert!(!states.is_empty());
102
103 Self {
104 job_id,
105 states,
106 backfill_upstream_types,
107 done_count: 0,
108 upstream_mv_count,
109 upstream_mvs_total_key_count: upstream_total_key_count,
110 mv_backfill_consumed_rows: 0,
111 source_backfill_consumed_rows: 0,
112 mv_backfill_buffered_rows: 0,
113 backfill_order_state,
114 }
115 }
116
117 fn update(
120 &mut self,
121 actor: ActorId,
122 new_state: BackfillState,
123 upstream_total_key_count: u64,
124 ) -> PendingBackfillFragments {
125 let mut result = PendingBackfillFragments::default();
126 self.upstream_mvs_total_key_count = upstream_total_key_count;
127 let total_actors = self.states.len();
128 let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
129 tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
130 return result;
131 };
132
133 let mut old_consumed_row = 0;
134 let mut new_consumed_row = 0;
135 let mut old_buffered_row = 0;
136 let mut new_buffered_row = 0;
137 let Some(prev_state) = self.states.remove(&actor) else {
138 tracing::warn!(%actor, "receive progress for actor not in state map");
139 return result;
140 };
141 match prev_state {
142 BackfillState::Init => {}
143 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
144 old_consumed_row = consumed_rows;
145 old_buffered_row = buffered_rows;
146 }
147 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
148 };
149 match &new_state {
150 BackfillState::Init => {}
151 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
152 new_consumed_row = *consumed_rows;
153 new_buffered_row = *buffered_rows;
154 }
155 BackfillState::Done(consumed_rows, buffered_rows) => {
156 tracing::debug!("actor {} done", actor);
157 new_consumed_row = *consumed_rows;
158 new_buffered_row = *buffered_rows;
159 self.done_count += 1;
160 let before_backfill_nodes = self
161 .backfill_order_state
162 .current_backfill_node_fragment_ids();
163 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
164 let after_backfill_nodes = self
165 .backfill_order_state
166 .current_backfill_node_fragment_ids();
167 let last_backfill_nodes_iter = before_backfill_nodes
169 .into_iter()
170 .filter(|x| !after_backfill_nodes.contains(x));
171 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
172 .filter_map(|fragment_id| {
173 self.backfill_order_state
174 .get_locality_fragment_state_table_mapping()
175 .get(&fragment_id)
176 })
177 .flatten()
178 .copied()
179 .collect();
180 tracing::debug!(
181 "{} actors out of {} complete",
182 self.done_count,
183 total_actors,
184 );
185 }
186 };
187 debug_assert!(
188 new_consumed_row >= old_consumed_row,
189 "backfill progress should not go backward"
190 );
191 debug_assert!(
192 new_buffered_row >= old_buffered_row,
193 "backfill progress should not go backward"
194 );
195 match backfill_upstream_type {
196 BackfillUpstreamType::MView => {
197 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
198 }
199 BackfillUpstreamType::Source => {
200 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
201 }
202 BackfillUpstreamType::Values => {
203 }
205 BackfillUpstreamType::LocalityProvider => {
206 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
209 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
210 }
211 }
212 self.states.insert(actor, new_state);
213 result
214 }
215
216 fn iter_actor_progress(&self) -> impl Iterator<Item = ActorBackfillProgress> + '_ {
217 self.states.iter().filter_map(|(actor_id, state)| {
218 let upstream_type = *self.backfill_upstream_types.get(actor_id)?;
219 let (consumed_rows, done) = match *state {
220 BackfillState::Init => (0, false),
221 BackfillState::ConsumingUpstream(_, consumed_rows, _) => (consumed_rows, false),
222 BackfillState::Done(consumed_rows, _) => (consumed_rows, true),
223 };
224 Some(ActorBackfillProgress {
225 actor_id: *actor_id,
226 upstream_type,
227 consumed_rows,
228 done,
229 })
230 })
231 }
232
233 fn is_done(&self) -> bool {
235 tracing::trace!(
236 "Progress::is_done? {}, {}, {:?}",
237 self.done_count,
238 self.states.len(),
239 self.states
240 );
241 self.done_count == self.states.len()
242 }
243
244 fn calculate_progress(&self) -> String {
246 if self.is_done() || self.states.is_empty() {
247 return "100%".to_owned();
248 }
249 let mut mv_count = 0;
250 let mut source_count = 0;
251 for backfill_upstream_type in self.backfill_upstream_types.values() {
252 match backfill_upstream_type {
253 BackfillUpstreamType::MView => mv_count += 1,
254 BackfillUpstreamType::Source => source_count += 1,
255 BackfillUpstreamType::Values => (),
256 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
258 }
259
260 let mv_progress = (mv_count > 0).then_some({
261 let total_rows_to_consume =
264 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
265 if total_rows_to_consume == 0 {
266 "99.99%".to_owned()
267 } else {
268 let mut progress =
269 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
270 if progress > 1.0 {
271 progress = 0.9999;
272 }
273 format!(
274 "{:.2}% ({}/{})",
275 progress * 100.0,
276 self.mv_backfill_consumed_rows,
277 total_rows_to_consume
278 )
279 }
280 });
281 let source_progress = (source_count > 0).then_some(format!(
282 "{} rows consumed",
283 self.source_backfill_consumed_rows
284 ));
285 match (mv_progress, source_progress) {
286 (Some(mv_progress), Some(source_progress)) => {
287 format!(
288 "MView Backfill: {}, Source Backfill: {}",
289 mv_progress, source_progress
290 )
291 }
292 (Some(mv_progress), None) => mv_progress,
293 (None, Some(source_progress)) => source_progress,
294 (None, None) => "Unknown".to_owned(),
295 }
296 }
297}
298
299pub struct TrackingJob {
306 job_id: JobId,
307 is_recovered: bool,
308 source_change: Option<SourceChange>,
309}
310
311impl std::fmt::Display for TrackingJob {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 write!(
314 f,
315 "{}{}",
316 self.job_id,
317 if self.is_recovered { "<recovered>" } else { "" }
318 )
319 }
320}
321
322impl TrackingJob {
323 pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
325 Self {
326 job_id: stream_job_fragments.stream_job_id,
327 is_recovered: false,
328 source_change: Some(SourceChange::CreateJobFinished {
329 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
330 }),
331 }
332 }
333
334 pub(crate) fn recovered(
336 job_id: JobId,
337 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
338 ) -> Self {
339 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
340 fragment_infos
341 .iter()
342 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
343 );
344 let source_change = if source_backfill_fragments.is_empty() {
345 None
346 } else {
347 Some(SourceChange::CreateJobFinished {
348 finished_backfill_fragments: source_backfill_fragments,
349 })
350 };
351 Self {
352 job_id,
353 is_recovered: true,
354 source_change,
355 }
356 }
357
358 pub(crate) fn job_id(&self) -> JobId {
359 self.job_id
360 }
361
362 pub(crate) async fn finish(
364 self,
365 metadata_manager: &MetadataManager,
366 source_manager: &SourceManagerRef,
367 ) -> MetaResult<()> {
368 metadata_manager
369 .catalog_controller
370 .finish_streaming_job(self.job_id)
371 .await?;
372 if let Some(source_change) = self.source_change {
373 source_manager.apply_source_change(source_change).await;
374 }
375 Ok(())
376 }
377}
378
379impl std::fmt::Debug for TrackingJob {
380 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381 if !self.is_recovered {
382 write!(f, "TrackingJob::New({})", self.job_id)
383 } else {
384 write!(f, "TrackingJob::Recovered({})", self.job_id)
385 }
386 }
387}
388
389#[derive(Debug, Default)]
391pub(super) struct StagingCommitInfo {
392 pub finished_jobs: Vec<TrackingJob>,
394 pub table_ids_to_truncate: Vec<TableId>,
396 pub finished_cdc_table_backfill: Vec<JobId>,
397}
398
399pub(super) enum UpdateProgressResult {
400 None,
401 Finished {
403 truncate_locality_provider_state_tables: Vec<TableId>,
404 },
405 BackfillNodeFinished(PendingBackfillFragments),
407}
408
409#[derive(Debug)]
410pub(super) struct CreateMviewProgressTracker {
411 tracking_job: TrackingJob,
412 status: CreateMviewStatus,
413}
414
415#[derive(Debug)]
416enum CreateMviewStatus {
417 Backfilling {
418 progress: Progress,
420
421 pending_backfill_nodes: Vec<FragmentId>,
423
424 table_ids_to_truncate: Vec<TableId>,
426 },
427 CdcSourceInit,
428 Finished {
429 table_ids_to_truncate: Vec<TableId>,
430 },
431}
432
433impl CreateMviewProgressTracker {
434 pub fn recover(
435 creating_job_id: JobId,
436 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
437 backfill_order_state: BackfillOrderState,
438 version_stats: &HummockVersionStats,
439 ) -> Self {
440 {
441 let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
442 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
443 let status = if actors.is_empty() {
444 CreateMviewStatus::Finished {
445 table_ids_to_truncate: vec![],
446 }
447 } else {
448 let mut states = HashMap::new();
449 let mut backfill_upstream_types = HashMap::new();
450
451 for (actor, backfill_upstream_type) in actors {
452 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
453 backfill_upstream_types.insert(actor, backfill_upstream_type);
454 }
455
456 let progress = Self::recover_progress(
457 creating_job_id,
458 states,
459 backfill_upstream_types,
460 StreamJobFragments::upstream_table_counts_impl(
461 fragment_infos.values().map(|fragment| &fragment.nodes),
462 ),
463 version_stats,
464 backfill_order_state,
465 );
466 let pending_backfill_nodes = progress
467 .backfill_order_state
468 .current_backfill_node_fragment_ids();
469 CreateMviewStatus::Backfilling {
470 progress,
471 pending_backfill_nodes,
472 table_ids_to_truncate: vec![],
473 }
474 };
475 Self {
476 tracking_job,
477 status,
478 }
479 }
480 }
481
482 fn recover_progress(
488 job_id: JobId,
489 states: HashMap<ActorId, BackfillState>,
490 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
491 upstream_mv_count: HashMap<TableId, usize>,
492 version_stats: &HummockVersionStats,
493 backfill_order_state: BackfillOrderState,
494 ) -> Progress {
495 let upstream_mvs_total_key_count =
496 calculate_total_key_count(&upstream_mv_count, version_stats);
497 Progress {
498 job_id,
499 states,
500 backfill_order_state,
501 backfill_upstream_types,
502 done_count: 0, upstream_mv_count,
504 upstream_mvs_total_key_count,
505 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, }
509 }
510
511 pub fn gen_backfill_progress(&self) -> String {
512 match &self.status {
513 CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
514 CreateMviewStatus::CdcSourceInit => "Initializing CDC source...".to_owned(),
515 CreateMviewStatus::Finished { .. } => "100%".to_owned(),
516 }
517 }
518
519 pub(crate) fn actor_progresses(&self) -> Vec<ActorBackfillProgress> {
520 match &self.status {
521 CreateMviewStatus::Backfilling { progress, .. } => {
522 progress.iter_actor_progress().collect()
523 }
524 CreateMviewStatus::CdcSourceInit | CreateMviewStatus::Finished { .. } => vec![],
525 }
526 }
527
528 pub(super) fn apply_progress(
531 &mut self,
532 create_mview_progress: &CreateMviewProgress,
533 version_stats: &HummockVersionStats,
534 ) {
535 let CreateMviewStatus::Backfilling {
536 progress,
537 pending_backfill_nodes,
538 table_ids_to_truncate,
539 } = &mut self.status
540 else {
541 tracing::warn!(
542 "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
543 );
544 return;
545 };
546 {
547 {
549 match progress.apply(create_mview_progress, version_stats) {
551 UpdateProgressResult::None => {
552 tracing::trace!(?progress, "update progress");
553 }
554 UpdateProgressResult::Finished {
555 truncate_locality_provider_state_tables,
556 } => {
557 let mut table_ids_to_truncate = take(table_ids_to_truncate);
558 table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
559 tracing::trace!(?progress, "finish progress");
560 self.status = CreateMviewStatus::Finished {
561 table_ids_to_truncate,
562 };
563 }
564 UpdateProgressResult::BackfillNodeFinished(pending) => {
565 table_ids_to_truncate
566 .extend(pending.truncate_locality_provider_state_tables.clone());
567 tracing::trace!(
568 ?progress,
569 next_backfill_nodes = ?pending.next_backfill_nodes,
570 "start next backfill node"
571 );
572 pending_backfill_nodes.extend(pending.next_backfill_nodes);
573 }
574 }
575 }
576 }
577 }
578
579 pub fn refresh_after_reschedule(
581 &mut self,
582 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
583 version_stats: &HummockVersionStats,
584 ) {
585 let CreateMviewStatus::Backfilling {
586 progress,
587 pending_backfill_nodes,
588 ..
589 } = &mut self.status
590 else {
591 return;
592 };
593
594 let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
595 fragment_infos
596 .values()
597 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
598 );
599
600 #[cfg(debug_assertions)]
601 {
602 use std::collections::HashSet;
603 let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
604 let new_actor_ids: HashSet<_> = new_tracking_actors
605 .iter()
606 .map(|(actor_id, _)| *actor_id)
607 .collect();
608 debug_assert!(
609 old_actor_ids.is_disjoint(&new_actor_ids),
610 "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
611 );
612 }
613
614 let mut new_states = HashMap::new();
615 let mut new_backfill_types = HashMap::new();
616 for (actor_id, upstream_type) in new_tracking_actors {
617 new_states.insert(actor_id, BackfillState::Init);
618 new_backfill_types.insert(actor_id, upstream_type);
619 }
620
621 let fragment_actors: HashMap<_, _> = fragment_infos
622 .iter()
623 .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
624 .collect();
625
626 let newly_scheduled = progress
627 .backfill_order_state
628 .refresh_actors(&fragment_actors);
629
630 progress.backfill_upstream_types = new_backfill_types;
631 progress.states = new_states;
632 progress.done_count = 0;
633
634 progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
635 fragment_infos.values().map(|fragment| &fragment.nodes),
636 );
637 progress.upstream_mvs_total_key_count =
638 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
639
640 progress.mv_backfill_consumed_rows = 0;
641 progress.source_backfill_consumed_rows = 0;
642 progress.mv_backfill_buffered_rows = 0;
643
644 let mut pending = progress
645 .backfill_order_state
646 .current_backfill_node_fragment_ids();
647 pending.extend(newly_scheduled);
648 pending.sort_unstable();
649 pending.dedup();
650 *pending_backfill_nodes = pending;
651 }
652
653 pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
654 match &mut self.status {
655 CreateMviewStatus::Backfilling {
656 pending_backfill_nodes,
657 ..
658 } => Some(pending_backfill_nodes.drain(..)),
659 CreateMviewStatus::CdcSourceInit => None,
660 CreateMviewStatus::Finished { .. } => None,
661 }
662 .into_iter()
663 .flatten()
664 }
665
666 pub(super) fn collect_staging_commit_info(
667 &mut self,
668 ) -> (bool, Box<dyn Iterator<Item = TableId> + '_>) {
669 match &mut self.status {
670 CreateMviewStatus::Backfilling {
671 table_ids_to_truncate,
672 ..
673 } => (false, Box::new(table_ids_to_truncate.drain(..))),
674 CreateMviewStatus::CdcSourceInit => (false, Box::new(std::iter::empty())),
675 CreateMviewStatus::Finished {
676 table_ids_to_truncate,
677 ..
678 } => (true, Box::new(table_ids_to_truncate.drain(..))),
679 }
680 }
681
682 pub(super) fn is_finished(&self) -> bool {
683 matches!(self.status, CreateMviewStatus::Finished { .. })
684 }
685
686 pub(super) fn mark_cdc_source_finished(&mut self) {
688 if matches!(self.status, CreateMviewStatus::CdcSourceInit) {
689 self.status = CreateMviewStatus::Finished {
690 table_ids_to_truncate: vec![],
691 };
692 }
693 }
694
695 pub(super) fn into_tracking_job(self) -> TrackingJob {
696 let CreateMviewStatus::Finished { .. } = self.status else {
697 panic!("should be called when finished");
698 };
699 self.tracking_job
700 }
701
702 pub(crate) fn job_id(&self) -> JobId {
703 self.tracking_job.job_id
704 }
705
706 pub(crate) fn collect_fragment_progress(
707 &self,
708 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
709 mark_done_when_empty: bool,
710 ) -> Vec<FragmentBackfillProgress> {
711 let actor_progresses = self.actor_progresses();
712 if actor_progresses.is_empty() {
713 if mark_done_when_empty && self.is_finished() {
714 return collect_done_fragments(self.job_id(), fragment_infos);
715 }
716 return vec![];
717 }
718 collect_fragment_progress_from_actors(self.job_id(), fragment_infos, &actor_progresses)
719 }
720
721 pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
726 tracing::trace!(?info, "add job to track");
727 let CreateStreamingJobCommandInfo {
728 stream_job_fragments,
729 fragment_backfill_ordering,
730 locality_fragment_state_table_mapping,
731 streaming_job,
732 ..
733 } = info;
734 let job_id = stream_job_fragments.stream_job_id();
735 let actors = stream_job_fragments.tracking_progress_actor_ids();
736 let tracking_job = TrackingJob::new(&info.stream_job_fragments);
737 if actors.is_empty() {
738 let is_cdc_source = matches!(
741 streaming_job,
742 crate::manager::StreamingJob::Source(source)
743 if source.info.as_ref().map(|info| info.is_shared()).unwrap_or(false) && source
744 .get_with_properties()
745 .get("connector")
746 .map(|connector| connector.to_lowercase().contains("-cdc"))
747 .unwrap_or(false)
748 );
749 if is_cdc_source {
750 return Self {
752 tracking_job,
753 status: CreateMviewStatus::CdcSourceInit,
754 };
755 }
756 return Self {
758 tracking_job,
759 status: CreateMviewStatus::Finished {
760 table_ids_to_truncate: vec![],
761 },
762 };
763 }
764
765 let upstream_mv_count = stream_job_fragments.upstream_table_counts();
766 let upstream_total_key_count: u64 =
767 calculate_total_key_count(&upstream_mv_count, version_stats);
768
769 let backfill_order_state = BackfillOrderState::new(
770 fragment_backfill_ordering,
771 stream_job_fragments,
772 locality_fragment_state_table_mapping.clone(),
773 );
774 let progress = Progress::new(
775 job_id,
776 actors,
777 upstream_mv_count,
778 upstream_total_key_count,
779 backfill_order_state,
780 );
781 let pending_backfill_nodes = progress
782 .backfill_order_state
783 .current_backfill_node_fragment_ids();
784 Self {
785 tracking_job,
786 status: CreateMviewStatus::Backfilling {
787 progress,
788 pending_backfill_nodes,
789 table_ids_to_truncate: vec![],
790 },
791 }
792 }
793}
794
795impl Progress {
796 fn apply(
800 &mut self,
801 progress: &CreateMviewProgress,
802 version_stats: &HummockVersionStats,
803 ) -> UpdateProgressResult {
804 tracing::trace!(?progress, "update progress");
805 let actor = progress.backfill_actor_id;
806 let job_id = self.job_id;
807
808 let new_state = if progress.done {
809 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
810 } else {
811 BackfillState::ConsumingUpstream(
812 progress.consumed_epoch.into(),
813 progress.consumed_rows,
814 progress.buffered_rows,
815 )
816 };
817
818 {
819 {
820 let progress_state = self;
821
822 let upstream_total_key_count: u64 =
823 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
824
825 tracing::trace!(%job_id, "updating progress for table");
826 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
827
828 if progress_state.is_done() {
829 tracing::debug!(
830 %job_id,
831 "all actors done for creating mview!",
832 );
833
834 let PendingBackfillFragments {
835 next_backfill_nodes,
836 truncate_locality_provider_state_tables,
837 } = pending;
838
839 assert!(next_backfill_nodes.is_empty());
840 UpdateProgressResult::Finished {
841 truncate_locality_provider_state_tables,
842 }
843 } else if !pending.next_backfill_nodes.is_empty()
844 || !pending.truncate_locality_provider_state_tables.is_empty()
845 {
846 UpdateProgressResult::BackfillNodeFinished(pending)
847 } else {
848 UpdateProgressResult::None
849 }
850 }
851 }
852 }
853}
854
855fn calculate_total_key_count(
856 table_count: &HashMap<TableId, usize>,
857 version_stats: &HummockVersionStats,
858) -> u64 {
859 table_count
860 .iter()
861 .map(|(table_id, count)| {
862 assert_ne!(*count, 0);
863 *count as u64
864 * version_stats
865 .table_stats
866 .get(table_id)
867 .map_or(0, |stat| stat.total_key_count as u64)
868 })
869 .sum()
870}
871
872pub(crate) fn collect_fragment_progress_from_actors(
873 job_id: JobId,
874 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
875 actor_progresses: &[ActorBackfillProgress],
876) -> Vec<FragmentBackfillProgress> {
877 let mut actor_to_fragment = HashMap::new();
878 for (fragment_id, info) in fragment_infos {
879 for actor_id in info.actors.keys() {
880 actor_to_fragment.insert(*actor_id, *fragment_id);
881 }
882 }
883
884 let mut per_fragment: HashMap<FragmentId, (u64, usize, usize, BackfillUpstreamType)> =
885 HashMap::new();
886 for progress in actor_progresses {
887 let Some(fragment_id) = actor_to_fragment.get(&progress.actor_id) else {
888 continue;
889 };
890 let entry = per_fragment
891 .entry(*fragment_id)
892 .or_insert((0, 0, 0, progress.upstream_type));
893 entry.0 = entry.0.saturating_add(progress.consumed_rows);
894 entry.1 += progress.done as usize;
895 entry.2 += 1;
896 }
897
898 per_fragment
899 .into_iter()
900 .map(
901 |(fragment_id, (consumed_rows, done_cnt, total_cnt, upstream_type))| {
902 FragmentBackfillProgress {
903 job_id,
904 fragment_id,
905 consumed_rows,
906 done: total_cnt > 0 && done_cnt == total_cnt,
907 upstream_type,
908 }
909 },
910 )
911 .collect()
912}
913
914pub(crate) fn collect_done_fragments(
915 job_id: JobId,
916 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
917) -> Vec<FragmentBackfillProgress> {
918 fragment_infos
919 .iter()
920 .filter(|(_, fragment)| {
921 fragment.fragment_type_mask.contains_any([
922 FragmentTypeFlag::StreamScan,
923 FragmentTypeFlag::SourceScan,
924 FragmentTypeFlag::LocalityProvider,
925 ])
926 })
927 .map(|(fragment_id, fragment)| FragmentBackfillProgress {
928 job_id,
929 fragment_id: *fragment_id,
930 consumed_rows: 0,
931 done: true,
932 upstream_type: BackfillUpstreamType::from_fragment_type_mask(
933 fragment.fragment_type_mask,
934 ),
935 })
936 .collect()
937}
938
939#[cfg(test)]
940mod tests {
941 use std::collections::HashSet;
942
943 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
944 use risingwave_common::id::WorkerId;
945 use risingwave_meta_model::fragment::DistributionType;
946 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
947
948 use super::*;
949 use crate::controller::fragment::InflightActorInfo;
950
951 fn sample_inflight_fragment(
952 fragment_id: FragmentId,
953 actor_ids: &[ActorId],
954 flag: FragmentTypeFlag,
955 ) -> InflightFragmentInfo {
956 let mut fragment_type_mask = FragmentTypeMask::empty();
957 fragment_type_mask.add(flag);
958 InflightFragmentInfo {
959 fragment_id,
960 distribution_type: DistributionType::Single,
961 fragment_type_mask,
962 vnode_count: 0,
963 nodes: PbStreamNode::default(),
964 actors: actor_ids
965 .iter()
966 .map(|actor_id| {
967 (
968 *actor_id,
969 InflightActorInfo {
970 worker_id: WorkerId::new(1),
971 vnode_bitmap: None,
972 splits: vec![],
973 },
974 )
975 })
976 .collect(),
977 state_table_ids: HashSet::new(),
978 }
979 }
980
981 fn sample_progress(actor_id: ActorId) -> Progress {
982 Progress {
983 job_id: JobId::new(1),
984 states: HashMap::from([(actor_id, BackfillState::Init)]),
985 backfill_order_state: BackfillOrderState::default(),
986 done_count: 0,
987 backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
988 upstream_mv_count: HashMap::new(),
989 upstream_mvs_total_key_count: 0,
990 mv_backfill_consumed_rows: 0,
991 source_backfill_consumed_rows: 0,
992 mv_backfill_buffered_rows: 0,
993 }
994 }
995
996 #[test]
997 fn update_ignores_unknown_actor() {
998 let actor_known = ActorId::new(1);
999 let actor_unknown = ActorId::new(2);
1000 let mut progress = sample_progress(actor_known);
1001
1002 let pending = progress.update(
1003 actor_unknown,
1004 BackfillState::Done(0, 0),
1005 progress.upstream_mvs_total_key_count,
1006 );
1007
1008 assert!(pending.next_backfill_nodes.is_empty());
1009 assert_eq!(progress.states.len(), 1);
1010 assert!(progress.states.contains_key(&actor_known));
1011 }
1012
1013 #[test]
1014 fn refresh_rebuilds_tracking_after_reschedule() {
1015 let actor_old = ActorId::new(1);
1016 let actor_new = ActorId::new(2);
1017
1018 let progress = Progress {
1019 job_id: JobId::new(1),
1020 states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
1021 backfill_order_state: BackfillOrderState::default(),
1022 done_count: 1,
1023 backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
1024 upstream_mv_count: HashMap::new(),
1025 upstream_mvs_total_key_count: 0,
1026 mv_backfill_consumed_rows: 5,
1027 source_backfill_consumed_rows: 0,
1028 mv_backfill_buffered_rows: 0,
1029 };
1030
1031 let mut tracker = CreateMviewProgressTracker {
1032 tracking_job: TrackingJob {
1033 job_id: JobId::new(1),
1034 is_recovered: false,
1035 source_change: None,
1036 },
1037 status: CreateMviewStatus::Backfilling {
1038 progress,
1039 pending_backfill_nodes: vec![],
1040 table_ids_to_truncate: vec![],
1041 },
1042 };
1043
1044 let fragment_infos = HashMap::from([(
1045 FragmentId::new(10),
1046 sample_inflight_fragment(
1047 FragmentId::new(10),
1048 &[actor_new],
1049 FragmentTypeFlag::StreamScan,
1050 ),
1051 )]);
1052
1053 tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
1054
1055 let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
1056 panic!("expected backfilling status");
1057 };
1058 assert!(progress.states.contains_key(&actor_new));
1059 assert!(!progress.states.contains_key(&actor_old));
1060 assert_eq!(progress.done_count, 0);
1061 assert_eq!(progress.mv_backfill_consumed_rows, 0);
1062 assert_eq!(progress.source_backfill_consumed_rows, 0);
1063 }
1064
1065 #[test]
1067 fn test_cdc_source_initialized_as_cdc_source_init() {
1068 use std::collections::BTreeMap;
1069
1070 use risingwave_pb::catalog::{CreateType, PbSource, StreamSourceInfo};
1071
1072 use crate::barrier::command::CreateStreamingJobCommandInfo;
1073 use crate::manager::{StreamingJob, StreamingJobType};
1074 use crate::model::StreamJobFragmentsToCreate;
1075
1076 let source_info = StreamSourceInfo {
1078 cdc_source_job: true,
1079 ..Default::default()
1080 };
1081
1082 let source = PbSource {
1083 id: risingwave_common::id::SourceId::new(100),
1084 info: Some(source_info),
1085 with_properties: BTreeMap::from([("connector".to_owned(), "fake-cdc".to_owned())]),
1086 ..Default::default()
1087 };
1088
1089 let fragments = StreamJobFragments::for_test(JobId::new(100), BTreeMap::new());
1091 let stream_job_fragments = StreamJobFragmentsToCreate {
1092 inner: fragments,
1093 downstreams: Default::default(),
1094 };
1095
1096 let info = CreateStreamingJobCommandInfo {
1097 stream_job_fragments,
1098 upstream_fragment_downstreams: Default::default(),
1099 init_split_assignment: Default::default(),
1100 definition: "CREATE SOURCE ...".to_owned(),
1101 job_type: StreamingJobType::Source,
1102 create_type: CreateType::Foreground,
1103 streaming_job: StreamingJob::Source(source),
1104 fragment_backfill_ordering: Default::default(),
1105 cdc_table_snapshot_splits: None,
1106 locality_fragment_state_table_mapping: Default::default(),
1107 is_serverless: false,
1108 };
1109
1110 let tracker = CreateMviewProgressTracker::new(&info, &HummockVersionStats::default());
1111
1112 assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1114 assert!(!tracker.is_finished());
1115 }
1116
1117 #[test]
1119 fn test_cdc_source_transitions_to_finished_on_offset_update() {
1120 let mut tracker = CreateMviewProgressTracker {
1121 tracking_job: TrackingJob {
1122 job_id: JobId::new(300),
1123 is_recovered: false,
1124 source_change: None,
1125 },
1126 status: CreateMviewStatus::CdcSourceInit,
1127 };
1128
1129 assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1131 assert!(!tracker.is_finished());
1132
1133 tracker.mark_cdc_source_finished();
1135
1136 assert!(matches!(tracker.status, CreateMviewStatus::Finished { .. }));
1138 assert!(tracker.is_finished());
1139 }
1140}