1use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::{FragmentTypeFlag, TableId};
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
24
25use crate::MetaResult;
26use crate::barrier::backfill_order_control::BackfillOrderState;
27use crate::barrier::info::InflightStreamingJobInfo;
28use crate::barrier::{CreateStreamingJobCommandInfo, FragmentBackfillProgress};
29use crate::controller::fragment::InflightFragmentInfo;
30use crate::manager::MetadataManager;
31use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
32use crate::stream::{SourceChange, SourceManagerRef};
33
34type ConsumedRows = u64;
35type BufferedRows = u64;
36
37#[derive(Debug, Clone, Copy)]
38pub(crate) struct ActorBackfillProgress {
39 pub(crate) actor_id: ActorId,
40 pub(crate) upstream_type: BackfillUpstreamType,
41 pub(crate) consumed_rows: u64,
42 pub(crate) done: bool,
43}
44
45#[derive(Clone, Copy, Debug)]
46enum BackfillState {
47 Init,
48 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
49 Done(ConsumedRows, BufferedRows),
50}
51
52#[derive(Debug, Default)]
54pub(super) struct PendingBackfillFragments {
55 pub next_backfill_nodes: Vec<FragmentId>,
57 pub truncate_locality_provider_state_tables: Vec<TableId>,
59}
60
61#[derive(Debug)]
63pub(super) struct Progress {
64 job_id: JobId,
65 states: HashMap<ActorId, BackfillState>,
67 backfill_order_state: BackfillOrderState,
68 done_count: usize,
69
70 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
72
73 upstream_mv_count: HashMap<TableId, usize>,
78 upstream_mvs_total_key_count: u64,
80 mv_backfill_consumed_rows: u64,
81 source_backfill_consumed_rows: u64,
82 mv_backfill_buffered_rows: u64,
85}
86
87impl Progress {
88 fn new(
90 job_id: JobId,
91 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
92 upstream_mv_count: HashMap<TableId, usize>,
93 upstream_total_key_count: u64,
94 backfill_order_state: BackfillOrderState,
95 ) -> Self {
96 let mut states = HashMap::new();
97 let mut backfill_upstream_types = HashMap::new();
98 for (actor, backfill_upstream_type) in actors {
99 states.insert(actor, BackfillState::Init);
100 backfill_upstream_types.insert(actor, backfill_upstream_type);
101 }
102 assert!(!states.is_empty());
103
104 Self {
105 job_id,
106 states,
107 backfill_upstream_types,
108 done_count: 0,
109 upstream_mv_count,
110 upstream_mvs_total_key_count: upstream_total_key_count,
111 mv_backfill_consumed_rows: 0,
112 source_backfill_consumed_rows: 0,
113 mv_backfill_buffered_rows: 0,
114 backfill_order_state,
115 }
116 }
117
118 fn update(
121 &mut self,
122 actor: ActorId,
123 new_state: BackfillState,
124 upstream_total_key_count: u64,
125 ) -> PendingBackfillFragments {
126 let mut result = PendingBackfillFragments::default();
127 self.upstream_mvs_total_key_count = upstream_total_key_count;
128 let total_actors = self.states.len();
129 let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
130 tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
131 return result;
132 };
133
134 let mut old_consumed_row = 0;
135 let mut new_consumed_row = 0;
136 let mut old_buffered_row = 0;
137 let mut new_buffered_row = 0;
138 let Some(prev_state) = self.states.remove(&actor) else {
139 tracing::warn!(%actor, "receive progress for actor not in state map");
140 return result;
141 };
142 match prev_state {
143 BackfillState::Init => {}
144 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
145 old_consumed_row = consumed_rows;
146 old_buffered_row = buffered_rows;
147 }
148 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
149 };
150 match &new_state {
151 BackfillState::Init => {}
152 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
153 new_consumed_row = *consumed_rows;
154 new_buffered_row = *buffered_rows;
155 }
156 BackfillState::Done(consumed_rows, buffered_rows) => {
157 tracing::debug!("actor {} done", actor);
158 new_consumed_row = *consumed_rows;
159 new_buffered_row = *buffered_rows;
160 self.done_count += 1;
161 let before_backfill_nodes = self
162 .backfill_order_state
163 .current_backfill_node_fragment_ids();
164 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
165 let after_backfill_nodes = self
166 .backfill_order_state
167 .current_backfill_node_fragment_ids();
168 let last_backfill_nodes_iter = before_backfill_nodes
170 .into_iter()
171 .filter(|x| !after_backfill_nodes.contains(x));
172 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
173 .filter_map(|fragment_id| {
174 self.backfill_order_state
175 .get_locality_fragment_state_table_mapping()
176 .get(&fragment_id)
177 })
178 .flatten()
179 .copied()
180 .collect();
181 tracing::debug!(
182 "{} actors out of {} complete",
183 self.done_count,
184 total_actors,
185 );
186 }
187 };
188 debug_assert!(
189 new_consumed_row >= old_consumed_row,
190 "backfill progress should not go backward"
191 );
192 debug_assert!(
193 new_buffered_row >= old_buffered_row,
194 "backfill progress should not go backward"
195 );
196 match backfill_upstream_type {
197 BackfillUpstreamType::MView => {
198 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
199 }
200 BackfillUpstreamType::Source => {
201 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
202 }
203 BackfillUpstreamType::Values => {
204 }
206 BackfillUpstreamType::LocalityProvider => {
207 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
210 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
211 }
212 }
213 self.states.insert(actor, new_state);
214 result
215 }
216
217 fn iter_actor_progress(&self) -> impl Iterator<Item = ActorBackfillProgress> + '_ {
218 self.states.iter().filter_map(|(actor_id, state)| {
219 let upstream_type = *self.backfill_upstream_types.get(actor_id)?;
220 let (consumed_rows, done) = match *state {
221 BackfillState::Init => (0, false),
222 BackfillState::ConsumingUpstream(_, consumed_rows, _) => (consumed_rows, false),
223 BackfillState::Done(consumed_rows, _) => (consumed_rows, true),
224 };
225 Some(ActorBackfillProgress {
226 actor_id: *actor_id,
227 upstream_type,
228 consumed_rows,
229 done,
230 })
231 })
232 }
233
234 fn is_done(&self) -> bool {
236 tracing::trace!(
237 "Progress::is_done? {}, {}, {:?}",
238 self.done_count,
239 self.states.len(),
240 self.states
241 );
242 self.done_count == self.states.len()
243 }
244
245 fn calculate_progress(&self) -> String {
247 if self.is_done() || self.states.is_empty() {
248 return "100%".to_owned();
249 }
250 let mut mv_count = 0;
251 let mut source_count = 0;
252 for backfill_upstream_type in self.backfill_upstream_types.values() {
253 match backfill_upstream_type {
254 BackfillUpstreamType::MView => mv_count += 1,
255 BackfillUpstreamType::Source => source_count += 1,
256 BackfillUpstreamType::Values => (),
257 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
259 }
260
261 let mv_progress = (mv_count > 0).then_some({
262 let total_rows_to_consume =
265 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
266 if total_rows_to_consume == 0 {
267 "99.99%".to_owned()
268 } else {
269 let mut progress =
270 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
271 if progress > 1.0 {
272 progress = 0.9999;
273 }
274 format!(
275 "{:.2}% ({}/{})",
276 progress * 100.0,
277 self.mv_backfill_consumed_rows,
278 total_rows_to_consume
279 )
280 }
281 });
282 let source_progress = (source_count > 0).then_some(format!(
283 "{} rows consumed",
284 self.source_backfill_consumed_rows
285 ));
286 match (mv_progress, source_progress) {
287 (Some(mv_progress), Some(source_progress)) => {
288 format!(
289 "MView Backfill: {}, Source Backfill: {}",
290 mv_progress, source_progress
291 )
292 }
293 (Some(mv_progress), None) => mv_progress,
294 (None, Some(source_progress)) => source_progress,
295 (None, None) => "Unknown".to_owned(),
296 }
297 }
298}
299
300pub struct TrackingJob {
307 job_id: JobId,
308 is_recovered: bool,
309 source_change: Option<SourceChange>,
310}
311
312impl std::fmt::Display for TrackingJob {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 write!(
315 f,
316 "{}{}",
317 self.job_id,
318 if self.is_recovered { "<recovered>" } else { "" }
319 )
320 }
321}
322
323impl TrackingJob {
324 pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
326 Self {
327 job_id: stream_job_fragments.stream_job_id,
328 is_recovered: false,
329 source_change: Some(SourceChange::CreateJobFinished {
330 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
331 }),
332 }
333 }
334
335 pub(crate) fn recovered(
337 job_id: JobId,
338 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
339 ) -> Self {
340 Self::recovered_from_fragment_nodes(
341 job_id,
342 fragment_infos
343 .iter()
344 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
345 )
346 }
347
348 pub(crate) fn recovered_from_fragment_nodes<'a>(
349 job_id: JobId,
350 fragment_nodes: impl Iterator<Item = (FragmentId, &'a StreamNode)>,
351 ) -> Self {
352 let source_backfill_fragments =
353 StreamJobFragments::source_backfill_fragments_impl(fragment_nodes);
354 let source_change = if source_backfill_fragments.is_empty() {
355 None
356 } else {
357 Some(SourceChange::CreateJobFinished {
358 finished_backfill_fragments: source_backfill_fragments,
359 })
360 };
361 Self {
362 job_id,
363 is_recovered: true,
364 source_change,
365 }
366 }
367
368 pub(crate) fn job_id(&self) -> JobId {
369 self.job_id
370 }
371
372 pub(crate) async fn finish(
374 self,
375 metadata_manager: &MetadataManager,
376 source_manager: &SourceManagerRef,
377 ) -> MetaResult<()> {
378 metadata_manager
379 .catalog_controller
380 .finish_streaming_job(self.job_id)
381 .await?;
382 if let Some(source_change) = self.source_change {
383 source_manager.apply_source_change(source_change).await;
384 }
385 Ok(())
386 }
387}
388
389impl std::fmt::Debug for TrackingJob {
390 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
391 if !self.is_recovered {
392 write!(f, "TrackingJob::New({})", self.job_id)
393 } else {
394 write!(f, "TrackingJob::Recovered({})", self.job_id)
395 }
396 }
397}
398
399#[derive(Debug, Default)]
401pub(super) struct StagingCommitInfo {
402 pub finished_jobs: Vec<TrackingJob>,
404 pub table_ids_to_truncate: Vec<TableId>,
406 pub finished_cdc_table_backfill: Vec<JobId>,
407}
408
409pub(super) enum UpdateProgressResult {
410 None,
411 Finished {
413 truncate_locality_provider_state_tables: Vec<TableId>,
414 },
415 BackfillNodeFinished(PendingBackfillFragments),
417}
418
419#[derive(Debug)]
420pub(super) struct CreateMviewProgressTracker {
421 tracking_job: TrackingJob,
422 status: CreateMviewStatus,
423}
424
425#[derive(Debug)]
426enum CreateMviewStatus {
427 Backfilling {
428 progress: Progress,
430
431 pending_backfill_nodes: Vec<FragmentId>,
433
434 table_ids_to_truncate: Vec<TableId>,
436 },
437 CdcSourceInit,
438 Finished {
439 table_ids_to_truncate: Vec<TableId>,
440 },
441}
442
443impl CreateMviewProgressTracker {
444 pub fn recover(
445 creating_job_id: JobId,
446 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
447 backfill_order_state: BackfillOrderState,
448 version_stats: &HummockVersionStats,
449 ) -> Self {
450 {
451 let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
452 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
453 let status = if actors.is_empty() {
454 CreateMviewStatus::Finished {
455 table_ids_to_truncate: vec![],
456 }
457 } else {
458 let mut states = HashMap::new();
459 let mut backfill_upstream_types = HashMap::new();
460
461 for (actor, backfill_upstream_type) in actors {
462 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
463 backfill_upstream_types.insert(actor, backfill_upstream_type);
464 }
465
466 let progress = Self::recover_progress(
467 creating_job_id,
468 states,
469 backfill_upstream_types,
470 StreamJobFragments::upstream_table_counts_impl(
471 fragment_infos.values().map(|fragment| &fragment.nodes),
472 ),
473 version_stats,
474 backfill_order_state,
475 );
476 let pending_backfill_nodes = progress
477 .backfill_order_state
478 .current_backfill_node_fragment_ids();
479 CreateMviewStatus::Backfilling {
480 progress,
481 pending_backfill_nodes,
482 table_ids_to_truncate: vec![],
483 }
484 };
485 Self {
486 tracking_job,
487 status,
488 }
489 }
490 }
491
492 fn recover_progress(
498 job_id: JobId,
499 states: HashMap<ActorId, BackfillState>,
500 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
501 upstream_mv_count: HashMap<TableId, usize>,
502 version_stats: &HummockVersionStats,
503 backfill_order_state: BackfillOrderState,
504 ) -> Progress {
505 let upstream_mvs_total_key_count =
506 calculate_total_key_count(&upstream_mv_count, version_stats);
507 Progress {
508 job_id,
509 states,
510 backfill_order_state,
511 backfill_upstream_types,
512 done_count: 0, upstream_mv_count,
514 upstream_mvs_total_key_count,
515 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, }
519 }
520
521 pub fn gen_backfill_progress(&self) -> String {
522 match &self.status {
523 CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
524 CreateMviewStatus::CdcSourceInit => "Initializing CDC source...".to_owned(),
525 CreateMviewStatus::Finished { .. } => "100%".to_owned(),
526 }
527 }
528
529 pub(crate) fn actor_progresses(&self) -> Vec<ActorBackfillProgress> {
530 match &self.status {
531 CreateMviewStatus::Backfilling { progress, .. } => {
532 progress.iter_actor_progress().collect()
533 }
534 CreateMviewStatus::CdcSourceInit | CreateMviewStatus::Finished { .. } => vec![],
535 }
536 }
537
538 pub(super) fn apply_progress(
541 &mut self,
542 create_mview_progress: &CreateMviewProgress,
543 version_stats: &HummockVersionStats,
544 ) {
545 let CreateMviewStatus::Backfilling {
546 progress,
547 pending_backfill_nodes,
548 table_ids_to_truncate,
549 } = &mut self.status
550 else {
551 tracing::warn!(
552 "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
553 );
554 return;
555 };
556 {
557 {
559 match progress.apply(create_mview_progress, version_stats) {
561 UpdateProgressResult::None => {
562 tracing::trace!(?progress, "update progress");
563 }
564 UpdateProgressResult::Finished {
565 truncate_locality_provider_state_tables,
566 } => {
567 let mut table_ids_to_truncate = take(table_ids_to_truncate);
568 table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
569 tracing::trace!(?progress, "finish progress");
570 self.status = CreateMviewStatus::Finished {
571 table_ids_to_truncate,
572 };
573 }
574 UpdateProgressResult::BackfillNodeFinished(pending) => {
575 table_ids_to_truncate
576 .extend(pending.truncate_locality_provider_state_tables.clone());
577 tracing::trace!(
578 ?progress,
579 next_backfill_nodes = ?pending.next_backfill_nodes,
580 "start next backfill node"
581 );
582 pending_backfill_nodes.extend(pending.next_backfill_nodes);
583 }
584 }
585 }
586 }
587 }
588
589 pub fn refresh_after_reschedule(
591 &mut self,
592 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
593 version_stats: &HummockVersionStats,
594 ) {
595 let CreateMviewStatus::Backfilling {
596 progress,
597 pending_backfill_nodes,
598 ..
599 } = &mut self.status
600 else {
601 return;
602 };
603
604 let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
605 fragment_infos
606 .values()
607 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
608 );
609
610 #[cfg(debug_assertions)]
611 {
612 use std::collections::HashSet;
613 let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
614 let new_actor_ids: HashSet<_> = new_tracking_actors
615 .iter()
616 .map(|(actor_id, _)| *actor_id)
617 .collect();
618 debug_assert!(
619 old_actor_ids.is_disjoint(&new_actor_ids),
620 "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
621 );
622 }
623
624 let mut new_states = HashMap::new();
625 let mut new_backfill_types = HashMap::new();
626 for (actor_id, upstream_type) in new_tracking_actors {
627 new_states.insert(actor_id, BackfillState::Init);
628 new_backfill_types.insert(actor_id, upstream_type);
629 }
630
631 let fragment_actors: HashMap<_, _> = fragment_infos
632 .iter()
633 .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
634 .collect();
635
636 let newly_scheduled = progress
637 .backfill_order_state
638 .refresh_actors(&fragment_actors);
639
640 progress.backfill_upstream_types = new_backfill_types;
641 progress.states = new_states;
642 progress.done_count = 0;
643
644 progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
645 fragment_infos.values().map(|fragment| &fragment.nodes),
646 );
647 progress.upstream_mvs_total_key_count =
648 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
649
650 progress.mv_backfill_consumed_rows = 0;
651 progress.source_backfill_consumed_rows = 0;
652 progress.mv_backfill_buffered_rows = 0;
653
654 let mut pending = progress
655 .backfill_order_state
656 .current_backfill_node_fragment_ids();
657 pending.extend(newly_scheduled);
658 pending.sort_unstable();
659 pending.dedup();
660 *pending_backfill_nodes = pending;
661 }
662
663 pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
664 match &mut self.status {
665 CreateMviewStatus::Backfilling {
666 pending_backfill_nodes,
667 ..
668 } => Some(pending_backfill_nodes.drain(..)),
669 CreateMviewStatus::CdcSourceInit => None,
670 CreateMviewStatus::Finished { .. } => None,
671 }
672 .into_iter()
673 .flatten()
674 }
675
676 pub(super) fn collect_staging_commit_info(
677 &mut self,
678 ) -> (bool, Box<dyn Iterator<Item = TableId> + '_>) {
679 match &mut self.status {
680 CreateMviewStatus::Backfilling {
681 table_ids_to_truncate,
682 ..
683 } => (false, Box::new(table_ids_to_truncate.drain(..))),
684 CreateMviewStatus::CdcSourceInit => (false, Box::new(std::iter::empty())),
685 CreateMviewStatus::Finished {
686 table_ids_to_truncate,
687 ..
688 } => (true, Box::new(table_ids_to_truncate.drain(..))),
689 }
690 }
691
692 pub(super) fn is_finished(&self) -> bool {
693 matches!(self.status, CreateMviewStatus::Finished { .. })
694 }
695
696 pub(super) fn mark_cdc_source_finished(&mut self) {
698 if matches!(self.status, CreateMviewStatus::CdcSourceInit) {
699 self.status = CreateMviewStatus::Finished {
700 table_ids_to_truncate: vec![],
701 };
702 }
703 }
704
705 pub(super) fn into_tracking_job(self) -> TrackingJob {
706 let CreateMviewStatus::Finished { .. } = self.status else {
707 panic!("should be called when finished");
708 };
709 self.tracking_job
710 }
711
712 pub(crate) fn job_id(&self) -> JobId {
713 self.tracking_job.job_id
714 }
715
716 pub(crate) fn collect_fragment_progress(
717 &self,
718 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
719 mark_done_when_empty: bool,
720 ) -> Vec<FragmentBackfillProgress> {
721 let actor_progresses = self.actor_progresses();
722 if actor_progresses.is_empty() {
723 if mark_done_when_empty && self.is_finished() {
724 return collect_done_fragments(self.job_id(), fragment_infos);
725 }
726 return vec![];
727 }
728 collect_fragment_progress_from_actors(self.job_id(), fragment_infos, &actor_progresses)
729 }
730
731 pub fn new(
736 info: &CreateStreamingJobCommandInfo,
737 version_stats: &HummockVersionStats,
738 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
739 ) -> Self {
740 tracing::trace!(?info, "add job to track");
741 let CreateStreamingJobCommandInfo {
742 stream_job_fragments,
743 fragment_backfill_ordering,
744 locality_fragment_state_table_mapping,
745 streaming_job,
746 ..
747 } = info;
748 let job_id = stream_job_fragments.stream_job_id();
749 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
750 let tracking_job = TrackingJob::new(&info.stream_job_fragments);
751 if actors.is_empty() {
752 let is_cdc_source = matches!(
755 streaming_job,
756 crate::manager::StreamingJob::Source(source)
757 if source.info.as_ref().map(|info| info.is_shared()).unwrap_or(false) && source
758 .get_with_properties()
759 .get("connector")
760 .map(|connector| connector.to_lowercase().contains("-cdc"))
761 .unwrap_or(false)
762 );
763 if is_cdc_source {
764 return Self {
766 tracking_job,
767 status: CreateMviewStatus::CdcSourceInit,
768 };
769 }
770 return Self {
772 tracking_job,
773 status: CreateMviewStatus::Finished {
774 table_ids_to_truncate: vec![],
775 },
776 };
777 }
778
779 let upstream_mv_count = stream_job_fragments.upstream_table_counts();
780 let upstream_total_key_count: u64 =
781 calculate_total_key_count(&upstream_mv_count, version_stats);
782
783 let backfill_order_state = BackfillOrderState::new(
784 fragment_backfill_ordering,
785 fragment_infos,
786 locality_fragment_state_table_mapping.clone(),
787 );
788 let progress = Progress::new(
789 job_id,
790 actors,
791 upstream_mv_count,
792 upstream_total_key_count,
793 backfill_order_state,
794 );
795 let pending_backfill_nodes = progress
796 .backfill_order_state
797 .current_backfill_node_fragment_ids();
798 Self {
799 tracking_job,
800 status: CreateMviewStatus::Backfilling {
801 progress,
802 pending_backfill_nodes,
803 table_ids_to_truncate: vec![],
804 },
805 }
806 }
807}
808
809impl Progress {
810 fn apply(
814 &mut self,
815 progress: &CreateMviewProgress,
816 version_stats: &HummockVersionStats,
817 ) -> UpdateProgressResult {
818 tracing::trace!(?progress, "update progress");
819 let actor = progress.backfill_actor_id;
820 let job_id = self.job_id;
821
822 let new_state = if progress.done {
823 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
824 } else {
825 BackfillState::ConsumingUpstream(
826 progress.consumed_epoch.into(),
827 progress.consumed_rows,
828 progress.buffered_rows,
829 )
830 };
831
832 {
833 {
834 let progress_state = self;
835
836 let upstream_total_key_count: u64 =
837 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
838
839 tracing::trace!(%job_id, "updating progress for table");
840 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
841
842 if progress_state.is_done() {
843 tracing::debug!(
844 %job_id,
845 "all actors done for creating mview!",
846 );
847
848 let PendingBackfillFragments {
849 next_backfill_nodes,
850 truncate_locality_provider_state_tables,
851 } = pending;
852
853 assert!(next_backfill_nodes.is_empty());
854 UpdateProgressResult::Finished {
855 truncate_locality_provider_state_tables,
856 }
857 } else if !pending.next_backfill_nodes.is_empty()
858 || !pending.truncate_locality_provider_state_tables.is_empty()
859 {
860 UpdateProgressResult::BackfillNodeFinished(pending)
861 } else {
862 UpdateProgressResult::None
863 }
864 }
865 }
866 }
867}
868
869fn calculate_total_key_count(
870 table_count: &HashMap<TableId, usize>,
871 version_stats: &HummockVersionStats,
872) -> u64 {
873 table_count
874 .iter()
875 .map(|(table_id, count)| {
876 assert_ne!(*count, 0);
877 *count as u64
878 * version_stats
879 .table_stats
880 .get(table_id)
881 .map_or(0, |stat| stat.total_key_count as u64)
882 })
883 .sum()
884}
885
886pub(crate) fn collect_fragment_progress_from_actors(
887 job_id: JobId,
888 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
889 actor_progresses: &[ActorBackfillProgress],
890) -> Vec<FragmentBackfillProgress> {
891 let mut actor_to_fragment = HashMap::new();
892 for (fragment_id, info) in fragment_infos {
893 for actor_id in info.actors.keys() {
894 actor_to_fragment.insert(*actor_id, *fragment_id);
895 }
896 }
897
898 let mut per_fragment: HashMap<FragmentId, (u64, usize, usize, BackfillUpstreamType)> =
899 HashMap::new();
900 for progress in actor_progresses {
901 let Some(fragment_id) = actor_to_fragment.get(&progress.actor_id) else {
902 continue;
903 };
904 let entry = per_fragment
905 .entry(*fragment_id)
906 .or_insert((0, 0, 0, progress.upstream_type));
907 entry.0 = entry.0.saturating_add(progress.consumed_rows);
908 entry.1 += progress.done as usize;
909 entry.2 += 1;
910 }
911
912 per_fragment
913 .into_iter()
914 .map(
915 |(fragment_id, (consumed_rows, done_cnt, total_cnt, upstream_type))| {
916 FragmentBackfillProgress {
917 job_id,
918 fragment_id,
919 consumed_rows,
920 done: total_cnt > 0 && done_cnt == total_cnt,
921 upstream_type,
922 }
923 },
924 )
925 .collect()
926}
927
928pub(crate) fn collect_done_fragments(
929 job_id: JobId,
930 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
931) -> Vec<FragmentBackfillProgress> {
932 fragment_infos
933 .iter()
934 .filter(|(_, fragment)| {
935 fragment.fragment_type_mask.contains_any([
936 FragmentTypeFlag::StreamScan,
937 FragmentTypeFlag::SourceScan,
938 FragmentTypeFlag::LocalityProvider,
939 ])
940 })
941 .map(|(fragment_id, fragment)| FragmentBackfillProgress {
942 job_id,
943 fragment_id: *fragment_id,
944 consumed_rows: 0,
945 done: true,
946 upstream_type: BackfillUpstreamType::from_fragment_type_mask(
947 fragment.fragment_type_mask,
948 ),
949 })
950 .collect()
951}
952
953#[cfg(test)]
954mod tests {
955 use std::collections::HashSet;
956
957 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
958 use risingwave_common::id::WorkerId;
959 use risingwave_meta_model::fragment::DistributionType;
960 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
961
962 use super::*;
963 use crate::controller::fragment::InflightActorInfo;
964
965 fn sample_inflight_fragment(
966 fragment_id: FragmentId,
967 actor_ids: &[ActorId],
968 flag: FragmentTypeFlag,
969 ) -> InflightFragmentInfo {
970 let mut fragment_type_mask = FragmentTypeMask::empty();
971 fragment_type_mask.add(flag);
972 InflightFragmentInfo {
973 fragment_id,
974 distribution_type: DistributionType::Single,
975 fragment_type_mask,
976 vnode_count: 0,
977 nodes: PbStreamNode::default(),
978 actors: actor_ids
979 .iter()
980 .map(|actor_id| {
981 (
982 *actor_id,
983 InflightActorInfo {
984 worker_id: WorkerId::new(1),
985 vnode_bitmap: None,
986 splits: vec![],
987 },
988 )
989 })
990 .collect(),
991 state_table_ids: HashSet::new(),
992 }
993 }
994
995 fn sample_progress(actor_id: ActorId) -> Progress {
996 Progress {
997 job_id: JobId::new(1),
998 states: HashMap::from([(actor_id, BackfillState::Init)]),
999 backfill_order_state: BackfillOrderState::default(),
1000 done_count: 0,
1001 backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
1002 upstream_mv_count: HashMap::new(),
1003 upstream_mvs_total_key_count: 0,
1004 mv_backfill_consumed_rows: 0,
1005 source_backfill_consumed_rows: 0,
1006 mv_backfill_buffered_rows: 0,
1007 }
1008 }
1009
1010 #[test]
1011 fn update_ignores_unknown_actor() {
1012 let actor_known = ActorId::new(1);
1013 let actor_unknown = ActorId::new(2);
1014 let mut progress = sample_progress(actor_known);
1015
1016 let pending = progress.update(
1017 actor_unknown,
1018 BackfillState::Done(0, 0),
1019 progress.upstream_mvs_total_key_count,
1020 );
1021
1022 assert!(pending.next_backfill_nodes.is_empty());
1023 assert_eq!(progress.states.len(), 1);
1024 assert!(progress.states.contains_key(&actor_known));
1025 }
1026
1027 #[test]
1028 fn refresh_rebuilds_tracking_after_reschedule() {
1029 let actor_old = ActorId::new(1);
1030 let actor_new = ActorId::new(2);
1031
1032 let progress = Progress {
1033 job_id: JobId::new(1),
1034 states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
1035 backfill_order_state: BackfillOrderState::default(),
1036 done_count: 1,
1037 backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
1038 upstream_mv_count: HashMap::new(),
1039 upstream_mvs_total_key_count: 0,
1040 mv_backfill_consumed_rows: 5,
1041 source_backfill_consumed_rows: 0,
1042 mv_backfill_buffered_rows: 0,
1043 };
1044
1045 let mut tracker = CreateMviewProgressTracker {
1046 tracking_job: TrackingJob {
1047 job_id: JobId::new(1),
1048 is_recovered: false,
1049 source_change: None,
1050 },
1051 status: CreateMviewStatus::Backfilling {
1052 progress,
1053 pending_backfill_nodes: vec![],
1054 table_ids_to_truncate: vec![],
1055 },
1056 };
1057
1058 let fragment_infos = HashMap::from([(
1059 FragmentId::new(10),
1060 sample_inflight_fragment(
1061 FragmentId::new(10),
1062 &[actor_new],
1063 FragmentTypeFlag::StreamScan,
1064 ),
1065 )]);
1066
1067 tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
1068
1069 let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
1070 panic!("expected backfilling status");
1071 };
1072 assert!(progress.states.contains_key(&actor_new));
1073 assert!(!progress.states.contains_key(&actor_old));
1074 assert_eq!(progress.done_count, 0);
1075 assert_eq!(progress.mv_backfill_consumed_rows, 0);
1076 assert_eq!(progress.source_backfill_consumed_rows, 0);
1077 }
1078
1079 #[test]
1081 fn test_cdc_source_initialized_as_cdc_source_init() {
1082 use std::collections::BTreeMap;
1083
1084 use risingwave_meta_model::streaming_job;
1085 use risingwave_pb::catalog::{CreateType, PbSource, StreamSourceInfo};
1086
1087 use crate::barrier::command::CreateStreamingJobCommandInfo;
1088 use crate::manager::{StreamingJob, StreamingJobType};
1089 use crate::model::StreamJobFragmentsToCreate;
1090
1091 let source_info = StreamSourceInfo {
1093 cdc_source_job: true,
1094 ..Default::default()
1095 };
1096
1097 let source = PbSource {
1098 id: risingwave_common::id::SourceId::new(100),
1099 info: Some(source_info),
1100 with_properties: BTreeMap::from([("connector".to_owned(), "fake-cdc".to_owned())]),
1101 ..Default::default()
1102 };
1103
1104 let fragments = StreamJobFragments::for_test(JobId::new(100), BTreeMap::new());
1106 let stream_job_fragments = StreamJobFragmentsToCreate {
1107 inner: fragments,
1108 downstreams: Default::default(),
1109 };
1110
1111 let info = CreateStreamingJobCommandInfo {
1112 stream_job_fragments,
1113 upstream_fragment_downstreams: Default::default(),
1114 init_split_assignment: Default::default(),
1115 definition: "CREATE SOURCE ...".to_owned(),
1116 job_type: StreamingJobType::Source,
1117 create_type: CreateType::Foreground,
1118 streaming_job: StreamingJob::Source(source),
1119 database_resource_group: risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP
1120 .to_owned(),
1121 fragment_backfill_ordering: Default::default(),
1122 cdc_table_snapshot_splits: None,
1123 locality_fragment_state_table_mapping: Default::default(),
1124 is_serverless: false,
1125 refresh_interval_sec: None,
1126 streaming_job_model: streaming_job::Model {
1127 job_id: JobId::new(100),
1128 job_status: risingwave_meta_model::JobStatus::Creating,
1129 create_type: risingwave_meta_model::CreateType::Foreground,
1130 timezone: None,
1131 config_override: None,
1132 adaptive_parallelism_strategy: None,
1133 parallelism: risingwave_meta_model::StreamingParallelism::Adaptive,
1134 backfill_parallelism: None,
1135 backfill_adaptive_parallelism_strategy: None,
1136 backfill_orders: None,
1137 max_parallelism: 256,
1138 specific_resource_group: None,
1139 is_serverless_backfill: false,
1140 refresh_interval_sec: None,
1141 },
1142 };
1143
1144 let tracker = CreateMviewProgressTracker::new(
1145 &info,
1146 &HummockVersionStats::default(),
1147 &HashMap::new(),
1148 );
1149
1150 assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1152 assert!(!tracker.is_finished());
1153 }
1154
1155 #[test]
1157 fn test_cdc_source_transitions_to_finished_on_offset_update() {
1158 let mut tracker = CreateMviewProgressTracker {
1159 tracking_job: TrackingJob {
1160 job_id: JobId::new(300),
1161 is_recovered: false,
1162 source_change: None,
1163 },
1164 status: CreateMviewStatus::CdcSourceInit,
1165 };
1166
1167 assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1169 assert!(!tracker.is_finished());
1170
1171 tracker.mark_cdc_source_finished();
1173
1174 assert!(matches!(tracker.status, CreateMviewStatus::Finished { .. }));
1176 assert!(tracker.is_finished());
1177 }
1178}