1use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::{FragmentTypeFlag, TableId};
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
23
24use crate::MetaResult;
25use crate::barrier::backfill_order_control::BackfillOrderState;
26use crate::barrier::info::InflightStreamingJobInfo;
27use crate::barrier::{CreateStreamingJobCommandInfo, FragmentBackfillProgress};
28use crate::controller::fragment::InflightFragmentInfo;
29use crate::manager::MetadataManager;
30use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
31use crate::stream::{SourceChange, SourceManagerRef};
32
33type ConsumedRows = u64;
34type BufferedRows = u64;
35
36#[derive(Debug, Clone, Copy)]
37pub(crate) struct ActorBackfillProgress {
38 pub(crate) actor_id: ActorId,
39 pub(crate) upstream_type: BackfillUpstreamType,
40 pub(crate) consumed_rows: u64,
41 pub(crate) done: bool,
42}
43
44#[derive(Clone, Copy, Debug)]
45enum BackfillState {
46 Init,
47 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
48 Done(ConsumedRows, BufferedRows),
49}
50
51#[derive(Debug, Default)]
53pub(super) struct PendingBackfillFragments {
54 pub next_backfill_nodes: Vec<FragmentId>,
56 pub truncate_locality_provider_state_tables: Vec<TableId>,
58}
59
60#[derive(Debug)]
62pub(super) struct Progress {
63 job_id: JobId,
64 states: HashMap<ActorId, BackfillState>,
66 backfill_order_state: BackfillOrderState,
67 done_count: usize,
68
69 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
71
72 upstream_mv_count: HashMap<TableId, usize>,
77 upstream_mvs_total_key_count: u64,
79 mv_backfill_consumed_rows: u64,
80 source_backfill_consumed_rows: u64,
81 mv_backfill_buffered_rows: u64,
84}
85
86impl Progress {
87 fn new(
89 job_id: JobId,
90 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
91 upstream_mv_count: HashMap<TableId, usize>,
92 upstream_total_key_count: u64,
93 backfill_order_state: BackfillOrderState,
94 ) -> Self {
95 let mut states = HashMap::new();
96 let mut backfill_upstream_types = HashMap::new();
97 for (actor, backfill_upstream_type) in actors {
98 states.insert(actor, BackfillState::Init);
99 backfill_upstream_types.insert(actor, backfill_upstream_type);
100 }
101 assert!(!states.is_empty());
102
103 Self {
104 job_id,
105 states,
106 backfill_upstream_types,
107 done_count: 0,
108 upstream_mv_count,
109 upstream_mvs_total_key_count: upstream_total_key_count,
110 mv_backfill_consumed_rows: 0,
111 source_backfill_consumed_rows: 0,
112 mv_backfill_buffered_rows: 0,
113 backfill_order_state,
114 }
115 }
116
117 fn update(
120 &mut self,
121 actor: ActorId,
122 new_state: BackfillState,
123 upstream_total_key_count: u64,
124 ) -> PendingBackfillFragments {
125 let mut result = PendingBackfillFragments::default();
126 self.upstream_mvs_total_key_count = upstream_total_key_count;
127 let total_actors = self.states.len();
128 let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
129 tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
130 return result;
131 };
132
133 let mut old_consumed_row = 0;
134 let mut new_consumed_row = 0;
135 let mut old_buffered_row = 0;
136 let mut new_buffered_row = 0;
137 let Some(prev_state) = self.states.remove(&actor) else {
138 tracing::warn!(%actor, "receive progress for actor not in state map");
139 return result;
140 };
141 match prev_state {
142 BackfillState::Init => {}
143 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
144 old_consumed_row = consumed_rows;
145 old_buffered_row = buffered_rows;
146 }
147 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
148 };
149 match &new_state {
150 BackfillState::Init => {}
151 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
152 new_consumed_row = *consumed_rows;
153 new_buffered_row = *buffered_rows;
154 }
155 BackfillState::Done(consumed_rows, buffered_rows) => {
156 tracing::debug!("actor {} done", actor);
157 new_consumed_row = *consumed_rows;
158 new_buffered_row = *buffered_rows;
159 self.done_count += 1;
160 let before_backfill_nodes = self
161 .backfill_order_state
162 .current_backfill_node_fragment_ids();
163 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
164 let after_backfill_nodes = self
165 .backfill_order_state
166 .current_backfill_node_fragment_ids();
167 let last_backfill_nodes_iter = before_backfill_nodes
169 .into_iter()
170 .filter(|x| !after_backfill_nodes.contains(x));
171 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
172 .filter_map(|fragment_id| {
173 self.backfill_order_state
174 .get_locality_fragment_state_table_mapping()
175 .get(&fragment_id)
176 })
177 .flatten()
178 .copied()
179 .collect();
180 tracing::debug!(
181 "{} actors out of {} complete",
182 self.done_count,
183 total_actors,
184 );
185 }
186 };
187 debug_assert!(
188 new_consumed_row >= old_consumed_row,
189 "backfill progress should not go backward"
190 );
191 debug_assert!(
192 new_buffered_row >= old_buffered_row,
193 "backfill progress should not go backward"
194 );
195 match backfill_upstream_type {
196 BackfillUpstreamType::MView => {
197 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
198 }
199 BackfillUpstreamType::Source => {
200 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
201 }
202 BackfillUpstreamType::Values => {
203 }
205 BackfillUpstreamType::LocalityProvider => {
206 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
209 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
210 }
211 }
212 self.states.insert(actor, new_state);
213 result
214 }
215
216 fn iter_actor_progress(&self) -> impl Iterator<Item = ActorBackfillProgress> + '_ {
217 self.states.iter().filter_map(|(actor_id, state)| {
218 let upstream_type = *self.backfill_upstream_types.get(actor_id)?;
219 let (consumed_rows, done) = match *state {
220 BackfillState::Init => (0, false),
221 BackfillState::ConsumingUpstream(_, consumed_rows, _) => (consumed_rows, false),
222 BackfillState::Done(consumed_rows, _) => (consumed_rows, true),
223 };
224 Some(ActorBackfillProgress {
225 actor_id: *actor_id,
226 upstream_type,
227 consumed_rows,
228 done,
229 })
230 })
231 }
232
233 fn is_done(&self) -> bool {
235 tracing::trace!(
236 "Progress::is_done? {}, {}, {:?}",
237 self.done_count,
238 self.states.len(),
239 self.states
240 );
241 self.done_count == self.states.len()
242 }
243
244 fn calculate_progress(&self) -> String {
246 if self.is_done() || self.states.is_empty() {
247 return "100%".to_owned();
248 }
249 let mut mv_count = 0;
250 let mut source_count = 0;
251 for backfill_upstream_type in self.backfill_upstream_types.values() {
252 match backfill_upstream_type {
253 BackfillUpstreamType::MView => mv_count += 1,
254 BackfillUpstreamType::Source => source_count += 1,
255 BackfillUpstreamType::Values => (),
256 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
258 }
259
260 let mv_progress = (mv_count > 0).then_some({
261 let total_rows_to_consume =
264 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
265 if total_rows_to_consume == 0 {
266 "99.99%".to_owned()
267 } else {
268 let mut progress =
269 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
270 if progress > 1.0 {
271 progress = 0.9999;
272 }
273 format!(
274 "{:.2}% ({}/{})",
275 progress * 100.0,
276 self.mv_backfill_consumed_rows,
277 total_rows_to_consume
278 )
279 }
280 });
281 let source_progress = (source_count > 0).then_some(format!(
282 "{} rows consumed",
283 self.source_backfill_consumed_rows
284 ));
285 match (mv_progress, source_progress) {
286 (Some(mv_progress), Some(source_progress)) => {
287 format!(
288 "MView Backfill: {}, Source Backfill: {}",
289 mv_progress, source_progress
290 )
291 }
292 (Some(mv_progress), None) => mv_progress,
293 (None, Some(source_progress)) => source_progress,
294 (None, None) => "Unknown".to_owned(),
295 }
296 }
297}
298
299pub struct TrackingJob {
306 job_id: JobId,
307 is_recovered: bool,
308 source_change: Option<SourceChange>,
309}
310
311impl std::fmt::Display for TrackingJob {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 write!(
314 f,
315 "{}{}",
316 self.job_id,
317 if self.is_recovered { "<recovered>" } else { "" }
318 )
319 }
320}
321
322impl TrackingJob {
323 pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
325 Self {
326 job_id: stream_job_fragments.stream_job_id,
327 is_recovered: false,
328 source_change: Some(SourceChange::CreateJobFinished {
329 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
330 }),
331 }
332 }
333
334 pub(crate) fn recovered(
336 job_id: JobId,
337 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
338 ) -> Self {
339 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
340 fragment_infos
341 .iter()
342 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
343 );
344 let source_change = if source_backfill_fragments.is_empty() {
345 None
346 } else {
347 Some(SourceChange::CreateJobFinished {
348 finished_backfill_fragments: source_backfill_fragments,
349 })
350 };
351 Self {
352 job_id,
353 is_recovered: true,
354 source_change,
355 }
356 }
357
358 pub(crate) fn job_id(&self) -> JobId {
359 self.job_id
360 }
361
362 pub(crate) async fn finish(
364 self,
365 metadata_manager: &MetadataManager,
366 source_manager: &SourceManagerRef,
367 ) -> MetaResult<()> {
368 metadata_manager
369 .catalog_controller
370 .finish_streaming_job(self.job_id)
371 .await?;
372 if let Some(source_change) = self.source_change {
373 source_manager.apply_source_change(source_change).await;
374 }
375 Ok(())
376 }
377}
378
379impl std::fmt::Debug for TrackingJob {
380 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381 if !self.is_recovered {
382 write!(f, "TrackingJob::New({})", self.job_id)
383 } else {
384 write!(f, "TrackingJob::Recovered({})", self.job_id)
385 }
386 }
387}
388
389#[derive(Debug, Default)]
391pub(super) struct StagingCommitInfo {
392 pub finished_jobs: Vec<TrackingJob>,
394 pub table_ids_to_truncate: Vec<TableId>,
396 pub finished_cdc_table_backfill: Vec<JobId>,
397}
398
399pub(super) enum UpdateProgressResult {
400 None,
401 Finished {
403 truncate_locality_provider_state_tables: Vec<TableId>,
404 },
405 BackfillNodeFinished(PendingBackfillFragments),
407}
408
409#[derive(Debug)]
410pub(super) struct CreateMviewProgressTracker {
411 tracking_job: TrackingJob,
412 status: CreateMviewStatus,
413}
414
415#[derive(Debug)]
416enum CreateMviewStatus {
417 Backfilling {
418 progress: Progress,
420
421 pending_backfill_nodes: Vec<FragmentId>,
423
424 table_ids_to_truncate: Vec<TableId>,
426 },
427 CdcSourceInit,
428 Finished {
429 table_ids_to_truncate: Vec<TableId>,
430 },
431}
432
433impl CreateMviewProgressTracker {
434 pub fn recover(
435 creating_job_id: JobId,
436 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
437 backfill_order_state: BackfillOrderState,
438 version_stats: &HummockVersionStats,
439 ) -> Self {
440 {
441 let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
442 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
443 let status = if actors.is_empty() {
444 CreateMviewStatus::Finished {
445 table_ids_to_truncate: vec![],
446 }
447 } else {
448 let mut states = HashMap::new();
449 let mut backfill_upstream_types = HashMap::new();
450
451 for (actor, backfill_upstream_type) in actors {
452 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
453 backfill_upstream_types.insert(actor, backfill_upstream_type);
454 }
455
456 let progress = Self::recover_progress(
457 creating_job_id,
458 states,
459 backfill_upstream_types,
460 StreamJobFragments::upstream_table_counts_impl(
461 fragment_infos.values().map(|fragment| &fragment.nodes),
462 ),
463 version_stats,
464 backfill_order_state,
465 );
466 let pending_backfill_nodes = progress
467 .backfill_order_state
468 .current_backfill_node_fragment_ids();
469 CreateMviewStatus::Backfilling {
470 progress,
471 pending_backfill_nodes,
472 table_ids_to_truncate: vec![],
473 }
474 };
475 Self {
476 tracking_job,
477 status,
478 }
479 }
480 }
481
482 fn recover_progress(
488 job_id: JobId,
489 states: HashMap<ActorId, BackfillState>,
490 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
491 upstream_mv_count: HashMap<TableId, usize>,
492 version_stats: &HummockVersionStats,
493 backfill_order_state: BackfillOrderState,
494 ) -> Progress {
495 let upstream_mvs_total_key_count =
496 calculate_total_key_count(&upstream_mv_count, version_stats);
497 Progress {
498 job_id,
499 states,
500 backfill_order_state,
501 backfill_upstream_types,
502 done_count: 0, upstream_mv_count,
504 upstream_mvs_total_key_count,
505 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, }
509 }
510
511 pub fn gen_backfill_progress(&self) -> String {
512 match &self.status {
513 CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
514 CreateMviewStatus::CdcSourceInit => "Initializing CDC source...".to_owned(),
515 CreateMviewStatus::Finished { .. } => "100%".to_owned(),
516 }
517 }
518
519 pub(crate) fn actor_progresses(&self) -> Vec<ActorBackfillProgress> {
520 match &self.status {
521 CreateMviewStatus::Backfilling { progress, .. } => {
522 progress.iter_actor_progress().collect()
523 }
524 CreateMviewStatus::CdcSourceInit | CreateMviewStatus::Finished { .. } => vec![],
525 }
526 }
527
528 pub(super) fn apply_progress(
531 &mut self,
532 create_mview_progress: &CreateMviewProgress,
533 version_stats: &HummockVersionStats,
534 ) {
535 let CreateMviewStatus::Backfilling {
536 progress,
537 pending_backfill_nodes,
538 table_ids_to_truncate,
539 } = &mut self.status
540 else {
541 tracing::warn!(
542 "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
543 );
544 return;
545 };
546 {
547 {
549 match progress.apply(create_mview_progress, version_stats) {
551 UpdateProgressResult::None => {
552 tracing::trace!(?progress, "update progress");
553 }
554 UpdateProgressResult::Finished {
555 truncate_locality_provider_state_tables,
556 } => {
557 let mut table_ids_to_truncate = take(table_ids_to_truncate);
558 table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
559 tracing::trace!(?progress, "finish progress");
560 self.status = CreateMviewStatus::Finished {
561 table_ids_to_truncate,
562 };
563 }
564 UpdateProgressResult::BackfillNodeFinished(pending) => {
565 table_ids_to_truncate
566 .extend(pending.truncate_locality_provider_state_tables.clone());
567 tracing::trace!(
568 ?progress,
569 next_backfill_nodes = ?pending.next_backfill_nodes,
570 "start next backfill node"
571 );
572 pending_backfill_nodes.extend(pending.next_backfill_nodes);
573 }
574 }
575 }
576 }
577 }
578
579 pub fn refresh_after_reschedule(
581 &mut self,
582 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
583 version_stats: &HummockVersionStats,
584 ) {
585 let CreateMviewStatus::Backfilling {
586 progress,
587 pending_backfill_nodes,
588 ..
589 } = &mut self.status
590 else {
591 return;
592 };
593
594 let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
595 fragment_infos
596 .values()
597 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
598 );
599
600 #[cfg(debug_assertions)]
601 {
602 use std::collections::HashSet;
603 let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
604 let new_actor_ids: HashSet<_> = new_tracking_actors
605 .iter()
606 .map(|(actor_id, _)| *actor_id)
607 .collect();
608 debug_assert!(
609 old_actor_ids.is_disjoint(&new_actor_ids),
610 "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
611 );
612 }
613
614 let mut new_states = HashMap::new();
615 let mut new_backfill_types = HashMap::new();
616 for (actor_id, upstream_type) in new_tracking_actors {
617 new_states.insert(actor_id, BackfillState::Init);
618 new_backfill_types.insert(actor_id, upstream_type);
619 }
620
621 let fragment_actors: HashMap<_, _> = fragment_infos
622 .iter()
623 .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
624 .collect();
625
626 let newly_scheduled = progress
627 .backfill_order_state
628 .refresh_actors(&fragment_actors);
629
630 progress.backfill_upstream_types = new_backfill_types;
631 progress.states = new_states;
632 progress.done_count = 0;
633
634 progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
635 fragment_infos.values().map(|fragment| &fragment.nodes),
636 );
637 progress.upstream_mvs_total_key_count =
638 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
639
640 progress.mv_backfill_consumed_rows = 0;
641 progress.source_backfill_consumed_rows = 0;
642 progress.mv_backfill_buffered_rows = 0;
643
644 let mut pending = progress
645 .backfill_order_state
646 .current_backfill_node_fragment_ids();
647 pending.extend(newly_scheduled);
648 pending.sort_unstable();
649 pending.dedup();
650 *pending_backfill_nodes = pending;
651 }
652
653 pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
654 match &mut self.status {
655 CreateMviewStatus::Backfilling {
656 pending_backfill_nodes,
657 ..
658 } => Some(pending_backfill_nodes.drain(..)),
659 CreateMviewStatus::CdcSourceInit => None,
660 CreateMviewStatus::Finished { .. } => None,
661 }
662 .into_iter()
663 .flatten()
664 }
665
666 pub(super) fn collect_staging_commit_info(
667 &mut self,
668 ) -> (bool, Box<dyn Iterator<Item = TableId> + '_>) {
669 match &mut self.status {
670 CreateMviewStatus::Backfilling {
671 table_ids_to_truncate,
672 ..
673 } => (false, Box::new(table_ids_to_truncate.drain(..))),
674 CreateMviewStatus::CdcSourceInit => (false, Box::new(std::iter::empty())),
675 CreateMviewStatus::Finished {
676 table_ids_to_truncate,
677 ..
678 } => (true, Box::new(table_ids_to_truncate.drain(..))),
679 }
680 }
681
682 pub(super) fn is_finished(&self) -> bool {
683 matches!(self.status, CreateMviewStatus::Finished { .. })
684 }
685
686 pub(super) fn mark_cdc_source_finished(&mut self) {
688 if matches!(self.status, CreateMviewStatus::CdcSourceInit) {
689 self.status = CreateMviewStatus::Finished {
690 table_ids_to_truncate: vec![],
691 };
692 }
693 }
694
695 pub(super) fn into_tracking_job(self) -> TrackingJob {
696 let CreateMviewStatus::Finished { .. } = self.status else {
697 panic!("should be called when finished");
698 };
699 self.tracking_job
700 }
701
702 pub(crate) fn job_id(&self) -> JobId {
703 self.tracking_job.job_id
704 }
705
706 pub(crate) fn collect_fragment_progress(
707 &self,
708 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
709 mark_done_when_empty: bool,
710 ) -> Vec<FragmentBackfillProgress> {
711 let actor_progresses = self.actor_progresses();
712 if actor_progresses.is_empty() {
713 if mark_done_when_empty && self.is_finished() {
714 return collect_done_fragments(self.job_id(), fragment_infos);
715 }
716 return vec![];
717 }
718 collect_fragment_progress_from_actors(self.job_id(), fragment_infos, &actor_progresses)
719 }
720
721 pub fn new(
726 info: &CreateStreamingJobCommandInfo,
727 version_stats: &HummockVersionStats,
728 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
729 ) -> Self {
730 tracing::trace!(?info, "add job to track");
731 let CreateStreamingJobCommandInfo {
732 stream_job_fragments,
733 fragment_backfill_ordering,
734 locality_fragment_state_table_mapping,
735 streaming_job,
736 ..
737 } = info;
738 let job_id = stream_job_fragments.stream_job_id();
739 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
740 let tracking_job = TrackingJob::new(&info.stream_job_fragments);
741 if actors.is_empty() {
742 let is_cdc_source = matches!(
745 streaming_job,
746 crate::manager::StreamingJob::Source(source)
747 if source.info.as_ref().map(|info| info.is_shared()).unwrap_or(false) && source
748 .get_with_properties()
749 .get("connector")
750 .map(|connector| connector.to_lowercase().contains("-cdc"))
751 .unwrap_or(false)
752 );
753 if is_cdc_source {
754 return Self {
756 tracking_job,
757 status: CreateMviewStatus::CdcSourceInit,
758 };
759 }
760 return Self {
762 tracking_job,
763 status: CreateMviewStatus::Finished {
764 table_ids_to_truncate: vec![],
765 },
766 };
767 }
768
769 let upstream_mv_count = stream_job_fragments.upstream_table_counts();
770 let upstream_total_key_count: u64 =
771 calculate_total_key_count(&upstream_mv_count, version_stats);
772
773 let backfill_order_state = BackfillOrderState::new(
774 fragment_backfill_ordering,
775 fragment_infos,
776 locality_fragment_state_table_mapping.clone(),
777 );
778 let progress = Progress::new(
779 job_id,
780 actors,
781 upstream_mv_count,
782 upstream_total_key_count,
783 backfill_order_state,
784 );
785 let pending_backfill_nodes = progress
786 .backfill_order_state
787 .current_backfill_node_fragment_ids();
788 Self {
789 tracking_job,
790 status: CreateMviewStatus::Backfilling {
791 progress,
792 pending_backfill_nodes,
793 table_ids_to_truncate: vec![],
794 },
795 }
796 }
797}
798
799impl Progress {
800 fn apply(
804 &mut self,
805 progress: &CreateMviewProgress,
806 version_stats: &HummockVersionStats,
807 ) -> UpdateProgressResult {
808 tracing::trace!(?progress, "update progress");
809 let actor = progress.backfill_actor_id;
810 let job_id = self.job_id;
811
812 let new_state = if progress.done {
813 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
814 } else {
815 BackfillState::ConsumingUpstream(
816 progress.consumed_epoch.into(),
817 progress.consumed_rows,
818 progress.buffered_rows,
819 )
820 };
821
822 {
823 {
824 let progress_state = self;
825
826 let upstream_total_key_count: u64 =
827 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
828
829 tracing::trace!(%job_id, "updating progress for table");
830 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
831
832 if progress_state.is_done() {
833 tracing::debug!(
834 %job_id,
835 "all actors done for creating mview!",
836 );
837
838 let PendingBackfillFragments {
839 next_backfill_nodes,
840 truncate_locality_provider_state_tables,
841 } = pending;
842
843 assert!(next_backfill_nodes.is_empty());
844 UpdateProgressResult::Finished {
845 truncate_locality_provider_state_tables,
846 }
847 } else if !pending.next_backfill_nodes.is_empty()
848 || !pending.truncate_locality_provider_state_tables.is_empty()
849 {
850 UpdateProgressResult::BackfillNodeFinished(pending)
851 } else {
852 UpdateProgressResult::None
853 }
854 }
855 }
856 }
857}
858
859fn calculate_total_key_count(
860 table_count: &HashMap<TableId, usize>,
861 version_stats: &HummockVersionStats,
862) -> u64 {
863 table_count
864 .iter()
865 .map(|(table_id, count)| {
866 assert_ne!(*count, 0);
867 *count as u64
868 * version_stats
869 .table_stats
870 .get(table_id)
871 .map_or(0, |stat| stat.total_key_count as u64)
872 })
873 .sum()
874}
875
876pub(crate) fn collect_fragment_progress_from_actors(
877 job_id: JobId,
878 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
879 actor_progresses: &[ActorBackfillProgress],
880) -> Vec<FragmentBackfillProgress> {
881 let mut actor_to_fragment = HashMap::new();
882 for (fragment_id, info) in fragment_infos {
883 for actor_id in info.actors.keys() {
884 actor_to_fragment.insert(*actor_id, *fragment_id);
885 }
886 }
887
888 let mut per_fragment: HashMap<FragmentId, (u64, usize, usize, BackfillUpstreamType)> =
889 HashMap::new();
890 for progress in actor_progresses {
891 let Some(fragment_id) = actor_to_fragment.get(&progress.actor_id) else {
892 continue;
893 };
894 let entry = per_fragment
895 .entry(*fragment_id)
896 .or_insert((0, 0, 0, progress.upstream_type));
897 entry.0 = entry.0.saturating_add(progress.consumed_rows);
898 entry.1 += progress.done as usize;
899 entry.2 += 1;
900 }
901
902 per_fragment
903 .into_iter()
904 .map(
905 |(fragment_id, (consumed_rows, done_cnt, total_cnt, upstream_type))| {
906 FragmentBackfillProgress {
907 job_id,
908 fragment_id,
909 consumed_rows,
910 done: total_cnt > 0 && done_cnt == total_cnt,
911 upstream_type,
912 }
913 },
914 )
915 .collect()
916}
917
918pub(crate) fn collect_done_fragments(
919 job_id: JobId,
920 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
921) -> Vec<FragmentBackfillProgress> {
922 fragment_infos
923 .iter()
924 .filter(|(_, fragment)| {
925 fragment.fragment_type_mask.contains_any([
926 FragmentTypeFlag::StreamScan,
927 FragmentTypeFlag::SourceScan,
928 FragmentTypeFlag::LocalityProvider,
929 ])
930 })
931 .map(|(fragment_id, fragment)| FragmentBackfillProgress {
932 job_id,
933 fragment_id: *fragment_id,
934 consumed_rows: 0,
935 done: true,
936 upstream_type: BackfillUpstreamType::from_fragment_type_mask(
937 fragment.fragment_type_mask,
938 ),
939 })
940 .collect()
941}
942
943#[cfg(test)]
944mod tests {
945 use std::collections::HashSet;
946
947 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
948 use risingwave_common::id::WorkerId;
949 use risingwave_meta_model::fragment::DistributionType;
950 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
951
952 use super::*;
953 use crate::controller::fragment::InflightActorInfo;
954
955 fn sample_inflight_fragment(
956 fragment_id: FragmentId,
957 actor_ids: &[ActorId],
958 flag: FragmentTypeFlag,
959 ) -> InflightFragmentInfo {
960 let mut fragment_type_mask = FragmentTypeMask::empty();
961 fragment_type_mask.add(flag);
962 InflightFragmentInfo {
963 fragment_id,
964 distribution_type: DistributionType::Single,
965 fragment_type_mask,
966 vnode_count: 0,
967 nodes: PbStreamNode::default(),
968 actors: actor_ids
969 .iter()
970 .map(|actor_id| {
971 (
972 *actor_id,
973 InflightActorInfo {
974 worker_id: WorkerId::new(1),
975 vnode_bitmap: None,
976 splits: vec![],
977 },
978 )
979 })
980 .collect(),
981 state_table_ids: HashSet::new(),
982 }
983 }
984
985 fn sample_progress(actor_id: ActorId) -> Progress {
986 Progress {
987 job_id: JobId::new(1),
988 states: HashMap::from([(actor_id, BackfillState::Init)]),
989 backfill_order_state: BackfillOrderState::default(),
990 done_count: 0,
991 backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
992 upstream_mv_count: HashMap::new(),
993 upstream_mvs_total_key_count: 0,
994 mv_backfill_consumed_rows: 0,
995 source_backfill_consumed_rows: 0,
996 mv_backfill_buffered_rows: 0,
997 }
998 }
999
1000 #[test]
1001 fn update_ignores_unknown_actor() {
1002 let actor_known = ActorId::new(1);
1003 let actor_unknown = ActorId::new(2);
1004 let mut progress = sample_progress(actor_known);
1005
1006 let pending = progress.update(
1007 actor_unknown,
1008 BackfillState::Done(0, 0),
1009 progress.upstream_mvs_total_key_count,
1010 );
1011
1012 assert!(pending.next_backfill_nodes.is_empty());
1013 assert_eq!(progress.states.len(), 1);
1014 assert!(progress.states.contains_key(&actor_known));
1015 }
1016
1017 #[test]
1018 fn refresh_rebuilds_tracking_after_reschedule() {
1019 let actor_old = ActorId::new(1);
1020 let actor_new = ActorId::new(2);
1021
1022 let progress = Progress {
1023 job_id: JobId::new(1),
1024 states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
1025 backfill_order_state: BackfillOrderState::default(),
1026 done_count: 1,
1027 backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
1028 upstream_mv_count: HashMap::new(),
1029 upstream_mvs_total_key_count: 0,
1030 mv_backfill_consumed_rows: 5,
1031 source_backfill_consumed_rows: 0,
1032 mv_backfill_buffered_rows: 0,
1033 };
1034
1035 let mut tracker = CreateMviewProgressTracker {
1036 tracking_job: TrackingJob {
1037 job_id: JobId::new(1),
1038 is_recovered: false,
1039 source_change: None,
1040 },
1041 status: CreateMviewStatus::Backfilling {
1042 progress,
1043 pending_backfill_nodes: vec![],
1044 table_ids_to_truncate: vec![],
1045 },
1046 };
1047
1048 let fragment_infos = HashMap::from([(
1049 FragmentId::new(10),
1050 sample_inflight_fragment(
1051 FragmentId::new(10),
1052 &[actor_new],
1053 FragmentTypeFlag::StreamScan,
1054 ),
1055 )]);
1056
1057 tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
1058
1059 let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
1060 panic!("expected backfilling status");
1061 };
1062 assert!(progress.states.contains_key(&actor_new));
1063 assert!(!progress.states.contains_key(&actor_old));
1064 assert_eq!(progress.done_count, 0);
1065 assert_eq!(progress.mv_backfill_consumed_rows, 0);
1066 assert_eq!(progress.source_backfill_consumed_rows, 0);
1067 }
1068
1069 #[test]
1071 fn test_cdc_source_initialized_as_cdc_source_init() {
1072 use std::collections::BTreeMap;
1073
1074 use risingwave_meta_model::streaming_job;
1075 use risingwave_pb::catalog::{CreateType, PbSource, StreamSourceInfo};
1076
1077 use crate::barrier::command::CreateStreamingJobCommandInfo;
1078 use crate::manager::{StreamingJob, StreamingJobType};
1079 use crate::model::StreamJobFragmentsToCreate;
1080
1081 let source_info = StreamSourceInfo {
1083 cdc_source_job: true,
1084 ..Default::default()
1085 };
1086
1087 let source = PbSource {
1088 id: risingwave_common::id::SourceId::new(100),
1089 info: Some(source_info),
1090 with_properties: BTreeMap::from([("connector".to_owned(), "fake-cdc".to_owned())]),
1091 ..Default::default()
1092 };
1093
1094 let fragments = StreamJobFragments::for_test(JobId::new(100), BTreeMap::new());
1096 let stream_job_fragments = StreamJobFragmentsToCreate {
1097 inner: fragments,
1098 downstreams: Default::default(),
1099 };
1100
1101 let info = CreateStreamingJobCommandInfo {
1102 stream_job_fragments,
1103 upstream_fragment_downstreams: Default::default(),
1104 init_split_assignment: Default::default(),
1105 definition: "CREATE SOURCE ...".to_owned(),
1106 job_type: StreamingJobType::Source,
1107 create_type: CreateType::Foreground,
1108 streaming_job: StreamingJob::Source(source),
1109 database_resource_group: risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP
1110 .to_owned(),
1111 fragment_backfill_ordering: Default::default(),
1112 cdc_table_snapshot_splits: None,
1113 locality_fragment_state_table_mapping: Default::default(),
1114 is_serverless: false,
1115 streaming_job_model: streaming_job::Model {
1116 job_id: JobId::new(100),
1117 job_status: risingwave_meta_model::JobStatus::Creating,
1118 create_type: risingwave_meta_model::CreateType::Foreground,
1119 timezone: None,
1120 config_override: None,
1121 adaptive_parallelism_strategy: None,
1122 parallelism: risingwave_meta_model::StreamingParallelism::Adaptive,
1123 backfill_parallelism: None,
1124 backfill_orders: None,
1125 max_parallelism: 256,
1126 specific_resource_group: None,
1127 is_serverless_backfill: false,
1128 },
1129 };
1130
1131 let tracker = CreateMviewProgressTracker::new(
1132 &info,
1133 &HummockVersionStats::default(),
1134 &HashMap::new(),
1135 );
1136
1137 assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1139 assert!(!tracker.is_finished());
1140 }
1141
1142 #[test]
1144 fn test_cdc_source_transitions_to_finished_on_offset_update() {
1145 let mut tracker = CreateMviewProgressTracker {
1146 tracking_job: TrackingJob {
1147 job_id: JobId::new(300),
1148 is_recovered: false,
1149 source_change: None,
1150 },
1151 status: CreateMviewStatus::CdcSourceInit,
1152 };
1153
1154 assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1156 assert!(!tracker.is_finished());
1157
1158 tracker.mark_cdc_source_finished();
1160
1161 assert!(matches!(tracker.status, CreateMviewStatus::Finished { .. }));
1163 assert!(tracker.is_finished());
1164 }
1165}