1use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
23
24use crate::MetaResult;
25use crate::barrier::CreateStreamingJobCommandInfo;
26use crate::barrier::backfill_order_control::BackfillOrderState;
27use crate::barrier::info::InflightStreamingJobInfo;
28use crate::controller::fragment::InflightFragmentInfo;
29use crate::manager::MetadataManager;
30use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
31use crate::stream::{SourceChange, SourceManagerRef};
32
33type ConsumedRows = u64;
34type BufferedRows = u64;
35
36#[derive(Clone, Copy, Debug)]
37enum BackfillState {
38 Init,
39 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
40 Done(ConsumedRows, BufferedRows),
41}
42
43#[derive(Debug, Default)]
45pub(super) struct PendingBackfillFragments {
46 pub next_backfill_nodes: Vec<FragmentId>,
48 pub truncate_locality_provider_state_tables: Vec<TableId>,
50}
51
52#[derive(Debug)]
54pub(super) struct Progress {
55 job_id: JobId,
56 states: HashMap<ActorId, BackfillState>,
58 backfill_order_state: BackfillOrderState,
59 done_count: usize,
60
61 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
63
64 upstream_mv_count: HashMap<TableId, usize>,
69 upstream_mvs_total_key_count: u64,
71 mv_backfill_consumed_rows: u64,
72 source_backfill_consumed_rows: u64,
73 mv_backfill_buffered_rows: u64,
76}
77
78impl Progress {
79 fn new(
81 job_id: JobId,
82 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
83 upstream_mv_count: HashMap<TableId, usize>,
84 upstream_total_key_count: u64,
85 backfill_order_state: BackfillOrderState,
86 ) -> Self {
87 let mut states = HashMap::new();
88 let mut backfill_upstream_types = HashMap::new();
89 for (actor, backfill_upstream_type) in actors {
90 states.insert(actor, BackfillState::Init);
91 backfill_upstream_types.insert(actor, backfill_upstream_type);
92 }
93 assert!(!states.is_empty());
94
95 Self {
96 job_id,
97 states,
98 backfill_upstream_types,
99 done_count: 0,
100 upstream_mv_count,
101 upstream_mvs_total_key_count: upstream_total_key_count,
102 mv_backfill_consumed_rows: 0,
103 source_backfill_consumed_rows: 0,
104 mv_backfill_buffered_rows: 0,
105 backfill_order_state,
106 }
107 }
108
109 fn update(
112 &mut self,
113 actor: ActorId,
114 new_state: BackfillState,
115 upstream_total_key_count: u64,
116 ) -> PendingBackfillFragments {
117 let mut result = PendingBackfillFragments::default();
118 self.upstream_mvs_total_key_count = upstream_total_key_count;
119 let total_actors = self.states.len();
120 let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
121 tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
122 return result;
123 };
124
125 let mut old_consumed_row = 0;
126 let mut new_consumed_row = 0;
127 let mut old_buffered_row = 0;
128 let mut new_buffered_row = 0;
129 let Some(prev_state) = self.states.remove(&actor) else {
130 tracing::warn!(%actor, "receive progress for actor not in state map");
131 return result;
132 };
133 match prev_state {
134 BackfillState::Init => {}
135 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
136 old_consumed_row = consumed_rows;
137 old_buffered_row = buffered_rows;
138 }
139 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
140 };
141 match &new_state {
142 BackfillState::Init => {}
143 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
144 new_consumed_row = *consumed_rows;
145 new_buffered_row = *buffered_rows;
146 }
147 BackfillState::Done(consumed_rows, buffered_rows) => {
148 tracing::debug!("actor {} done", actor);
149 new_consumed_row = *consumed_rows;
150 new_buffered_row = *buffered_rows;
151 self.done_count += 1;
152 let before_backfill_nodes = self
153 .backfill_order_state
154 .current_backfill_node_fragment_ids();
155 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
156 let after_backfill_nodes = self
157 .backfill_order_state
158 .current_backfill_node_fragment_ids();
159 let last_backfill_nodes_iter = before_backfill_nodes
161 .into_iter()
162 .filter(|x| !after_backfill_nodes.contains(x));
163 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
164 .filter_map(|fragment_id| {
165 self.backfill_order_state
166 .get_locality_fragment_state_table_mapping()
167 .get(&fragment_id)
168 })
169 .flatten()
170 .copied()
171 .collect();
172 tracing::debug!(
173 "{} actors out of {} complete",
174 self.done_count,
175 total_actors,
176 );
177 }
178 };
179 debug_assert!(
180 new_consumed_row >= old_consumed_row,
181 "backfill progress should not go backward"
182 );
183 debug_assert!(
184 new_buffered_row >= old_buffered_row,
185 "backfill progress should not go backward"
186 );
187 match backfill_upstream_type {
188 BackfillUpstreamType::MView => {
189 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
190 }
191 BackfillUpstreamType::Source => {
192 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
193 }
194 BackfillUpstreamType::Values => {
195 }
197 BackfillUpstreamType::LocalityProvider => {
198 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
201 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
202 }
203 }
204 self.states.insert(actor, new_state);
205 result
206 }
207
208 fn is_done(&self) -> bool {
210 tracing::trace!(
211 "Progress::is_done? {}, {}, {:?}",
212 self.done_count,
213 self.states.len(),
214 self.states
215 );
216 self.done_count == self.states.len()
217 }
218
219 fn calculate_progress(&self) -> String {
221 if self.is_done() || self.states.is_empty() {
222 return "100%".to_owned();
223 }
224 let mut mv_count = 0;
225 let mut source_count = 0;
226 for backfill_upstream_type in self.backfill_upstream_types.values() {
227 match backfill_upstream_type {
228 BackfillUpstreamType::MView => mv_count += 1,
229 BackfillUpstreamType::Source => source_count += 1,
230 BackfillUpstreamType::Values => (),
231 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
233 }
234
235 let mv_progress = (mv_count > 0).then_some({
236 let total_rows_to_consume =
239 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
240 if total_rows_to_consume == 0 {
241 "99.99%".to_owned()
242 } else {
243 let mut progress =
244 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
245 if progress > 1.0 {
246 progress = 0.9999;
247 }
248 format!(
249 "{:.2}% ({}/{})",
250 progress * 100.0,
251 self.mv_backfill_consumed_rows,
252 total_rows_to_consume
253 )
254 }
255 });
256 let source_progress = (source_count > 0).then_some(format!(
257 "{} rows consumed",
258 self.source_backfill_consumed_rows
259 ));
260 match (mv_progress, source_progress) {
261 (Some(mv_progress), Some(source_progress)) => {
262 format!(
263 "MView Backfill: {}, Source Backfill: {}",
264 mv_progress, source_progress
265 )
266 }
267 (Some(mv_progress), None) => mv_progress,
268 (None, Some(source_progress)) => source_progress,
269 (None, None) => "Unknown".to_owned(),
270 }
271 }
272}
273
274pub struct TrackingJob {
281 job_id: JobId,
282 is_recovered: bool,
283 source_change: Option<SourceChange>,
284}
285
286impl std::fmt::Display for TrackingJob {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 write!(
289 f,
290 "{}{}",
291 self.job_id,
292 if self.is_recovered { "<recovered>" } else { "" }
293 )
294 }
295}
296
297impl TrackingJob {
298 pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
300 Self {
301 job_id: stream_job_fragments.stream_job_id,
302 is_recovered: false,
303 source_change: Some(SourceChange::CreateJobFinished {
304 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
305 }),
306 }
307 }
308
309 pub(crate) fn recovered(
311 job_id: JobId,
312 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
313 ) -> Self {
314 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
315 fragment_infos
316 .iter()
317 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
318 );
319 let source_change = if source_backfill_fragments.is_empty() {
320 None
321 } else {
322 Some(SourceChange::CreateJobFinished {
323 finished_backfill_fragments: source_backfill_fragments,
324 })
325 };
326 Self {
327 job_id,
328 is_recovered: true,
329 source_change,
330 }
331 }
332
333 pub(crate) fn job_id(&self) -> JobId {
334 self.job_id
335 }
336
337 pub(crate) async fn finish(
339 self,
340 metadata_manager: &MetadataManager,
341 source_manager: &SourceManagerRef,
342 ) -> MetaResult<()> {
343 metadata_manager
344 .catalog_controller
345 .finish_streaming_job(self.job_id)
346 .await?;
347 if let Some(source_change) = self.source_change {
348 source_manager.apply_source_change(source_change).await;
349 }
350 Ok(())
351 }
352}
353
354impl std::fmt::Debug for TrackingJob {
355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 if !self.is_recovered {
357 write!(f, "TrackingJob::New({})", self.job_id)
358 } else {
359 write!(f, "TrackingJob::Recovered({})", self.job_id)
360 }
361 }
362}
363
364#[derive(Debug, Default)]
366pub(super) struct StagingCommitInfo {
367 pub finished_jobs: Vec<TrackingJob>,
369 pub table_ids_to_truncate: Vec<TableId>,
371 pub finished_cdc_table_backfill: Vec<JobId>,
372}
373
374pub(super) enum UpdateProgressResult {
375 None,
376 Finished {
378 truncate_locality_provider_state_tables: Vec<TableId>,
379 },
380 BackfillNodeFinished(PendingBackfillFragments),
382}
383
384#[derive(Debug)]
385pub(super) struct CreateMviewProgressTracker {
386 tracking_job: TrackingJob,
387 status: CreateMviewStatus,
388}
389
390#[derive(Debug)]
391enum CreateMviewStatus {
392 Backfilling {
393 progress: Progress,
395
396 pending_backfill_nodes: Vec<FragmentId>,
398
399 table_ids_to_truncate: Vec<TableId>,
401 },
402 CdcSourceInit,
403 Finished {
404 table_ids_to_truncate: Vec<TableId>,
405 },
406}
407
408impl CreateMviewProgressTracker {
409 pub fn recover(
410 creating_job_id: JobId,
411 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
412 backfill_order_state: BackfillOrderState,
413 version_stats: &HummockVersionStats,
414 ) -> Self {
415 {
416 let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
417 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
418 let status = if actors.is_empty() {
419 CreateMviewStatus::Finished {
420 table_ids_to_truncate: vec![],
421 }
422 } else {
423 let mut states = HashMap::new();
424 let mut backfill_upstream_types = HashMap::new();
425
426 for (actor, backfill_upstream_type) in actors {
427 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
428 backfill_upstream_types.insert(actor, backfill_upstream_type);
429 }
430
431 let progress = Self::recover_progress(
432 creating_job_id,
433 states,
434 backfill_upstream_types,
435 StreamJobFragments::upstream_table_counts_impl(
436 fragment_infos.values().map(|fragment| &fragment.nodes),
437 ),
438 version_stats,
439 backfill_order_state,
440 );
441 let pending_backfill_nodes = progress
442 .backfill_order_state
443 .current_backfill_node_fragment_ids();
444 CreateMviewStatus::Backfilling {
445 progress,
446 pending_backfill_nodes,
447 table_ids_to_truncate: vec![],
448 }
449 };
450 Self {
451 tracking_job,
452 status,
453 }
454 }
455 }
456
457 fn recover_progress(
463 job_id: JobId,
464 states: HashMap<ActorId, BackfillState>,
465 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
466 upstream_mv_count: HashMap<TableId, usize>,
467 version_stats: &HummockVersionStats,
468 backfill_order_state: BackfillOrderState,
469 ) -> Progress {
470 let upstream_mvs_total_key_count =
471 calculate_total_key_count(&upstream_mv_count, version_stats);
472 Progress {
473 job_id,
474 states,
475 backfill_order_state,
476 backfill_upstream_types,
477 done_count: 0, upstream_mv_count,
479 upstream_mvs_total_key_count,
480 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, }
484 }
485
486 pub fn gen_backfill_progress(&self) -> String {
487 match &self.status {
488 CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
489 CreateMviewStatus::CdcSourceInit => "Initializing CDC source...".to_owned(),
490 CreateMviewStatus::Finished { .. } => "100%".to_owned(),
491 }
492 }
493
494 pub(super) fn apply_progress(
497 &mut self,
498 create_mview_progress: &CreateMviewProgress,
499 version_stats: &HummockVersionStats,
500 ) {
501 let CreateMviewStatus::Backfilling {
502 progress,
503 pending_backfill_nodes,
504 table_ids_to_truncate,
505 } = &mut self.status
506 else {
507 tracing::warn!(
508 "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
509 );
510 return;
511 };
512 {
513 {
515 match progress.apply(create_mview_progress, version_stats) {
517 UpdateProgressResult::None => {
518 tracing::trace!(?progress, "update progress");
519 }
520 UpdateProgressResult::Finished {
521 truncate_locality_provider_state_tables,
522 } => {
523 let mut table_ids_to_truncate = take(table_ids_to_truncate);
524 table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
525 tracing::trace!(?progress, "finish progress");
526 self.status = CreateMviewStatus::Finished {
527 table_ids_to_truncate,
528 };
529 }
530 UpdateProgressResult::BackfillNodeFinished(pending) => {
531 table_ids_to_truncate
532 .extend(pending.truncate_locality_provider_state_tables.clone());
533 tracing::trace!(
534 ?progress,
535 next_backfill_nodes = ?pending.next_backfill_nodes,
536 "start next backfill node"
537 );
538 pending_backfill_nodes.extend(pending.next_backfill_nodes);
539 }
540 }
541 }
542 }
543 }
544
545 pub fn refresh_after_reschedule(
547 &mut self,
548 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
549 version_stats: &HummockVersionStats,
550 ) {
551 let CreateMviewStatus::Backfilling {
552 progress,
553 pending_backfill_nodes,
554 ..
555 } = &mut self.status
556 else {
557 return;
558 };
559
560 let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
561 fragment_infos
562 .values()
563 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
564 );
565
566 #[cfg(debug_assertions)]
567 {
568 use std::collections::HashSet;
569 let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
570 let new_actor_ids: HashSet<_> = new_tracking_actors
571 .iter()
572 .map(|(actor_id, _)| *actor_id)
573 .collect();
574 debug_assert!(
575 old_actor_ids.is_disjoint(&new_actor_ids),
576 "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
577 );
578 }
579
580 let mut new_states = HashMap::new();
581 let mut new_backfill_types = HashMap::new();
582 for (actor_id, upstream_type) in new_tracking_actors {
583 new_states.insert(actor_id, BackfillState::Init);
584 new_backfill_types.insert(actor_id, upstream_type);
585 }
586
587 let fragment_actors: HashMap<_, _> = fragment_infos
588 .iter()
589 .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
590 .collect();
591
592 let newly_scheduled = progress
593 .backfill_order_state
594 .refresh_actors(&fragment_actors);
595
596 progress.backfill_upstream_types = new_backfill_types;
597 progress.states = new_states;
598 progress.done_count = 0;
599
600 progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
601 fragment_infos.values().map(|fragment| &fragment.nodes),
602 );
603 progress.upstream_mvs_total_key_count =
604 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
605
606 progress.mv_backfill_consumed_rows = 0;
607 progress.source_backfill_consumed_rows = 0;
608 progress.mv_backfill_buffered_rows = 0;
609
610 let mut pending = progress
611 .backfill_order_state
612 .current_backfill_node_fragment_ids();
613 pending.extend(newly_scheduled);
614 pending.sort_unstable();
615 pending.dedup();
616 *pending_backfill_nodes = pending;
617 }
618
619 pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
620 match &mut self.status {
621 CreateMviewStatus::Backfilling {
622 pending_backfill_nodes,
623 ..
624 } => Some(pending_backfill_nodes.drain(..)),
625 CreateMviewStatus::CdcSourceInit => None,
626 CreateMviewStatus::Finished { .. } => None,
627 }
628 .into_iter()
629 .flatten()
630 }
631
632 pub(super) fn collect_staging_commit_info(
633 &mut self,
634 ) -> (bool, Box<dyn Iterator<Item = TableId> + '_>) {
635 match &mut self.status {
636 CreateMviewStatus::Backfilling {
637 table_ids_to_truncate,
638 ..
639 } => (false, Box::new(table_ids_to_truncate.drain(..))),
640 CreateMviewStatus::CdcSourceInit => (false, Box::new(std::iter::empty())),
641 CreateMviewStatus::Finished {
642 table_ids_to_truncate,
643 ..
644 } => (true, Box::new(table_ids_to_truncate.drain(..))),
645 }
646 }
647
648 pub(super) fn is_finished(&self) -> bool {
649 matches!(self.status, CreateMviewStatus::Finished { .. })
650 }
651
652 pub(super) fn mark_cdc_source_finished(&mut self) {
654 if matches!(self.status, CreateMviewStatus::CdcSourceInit) {
655 self.status = CreateMviewStatus::Finished {
656 table_ids_to_truncate: vec![],
657 };
658 }
659 }
660
661 pub(super) fn into_tracking_job(self) -> TrackingJob {
662 let CreateMviewStatus::Finished { .. } = self.status else {
663 panic!("should be called when finished");
664 };
665 self.tracking_job
666 }
667
668 pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
673 tracing::trace!(?info, "add job to track");
674 let CreateStreamingJobCommandInfo {
675 stream_job_fragments,
676 fragment_backfill_ordering,
677 locality_fragment_state_table_mapping,
678 streaming_job,
679 ..
680 } = info;
681 let job_id = stream_job_fragments.stream_job_id();
682 let actors = stream_job_fragments.tracking_progress_actor_ids();
683 let tracking_job = TrackingJob::new(&info.stream_job_fragments);
684 if actors.is_empty() {
685 let is_cdc_source = matches!(
688 streaming_job,
689 crate::manager::StreamingJob::Source(source)
690 if source.info.as_ref().map(|info| info.is_shared()).unwrap_or(false) && source
691 .get_with_properties()
692 .get("connector")
693 .map(|connector| connector.to_lowercase().contains("-cdc"))
694 .unwrap_or(false)
695 );
696 if is_cdc_source {
697 return Self {
699 tracking_job,
700 status: CreateMviewStatus::CdcSourceInit,
701 };
702 }
703 return Self {
705 tracking_job,
706 status: CreateMviewStatus::Finished {
707 table_ids_to_truncate: vec![],
708 },
709 };
710 }
711
712 let upstream_mv_count = stream_job_fragments.upstream_table_counts();
713 let upstream_total_key_count: u64 =
714 calculate_total_key_count(&upstream_mv_count, version_stats);
715
716 let backfill_order_state = BackfillOrderState::new(
717 fragment_backfill_ordering,
718 stream_job_fragments,
719 locality_fragment_state_table_mapping.clone(),
720 );
721 let progress = Progress::new(
722 job_id,
723 actors,
724 upstream_mv_count,
725 upstream_total_key_count,
726 backfill_order_state,
727 );
728 let pending_backfill_nodes = progress
729 .backfill_order_state
730 .current_backfill_node_fragment_ids();
731 Self {
732 tracking_job,
733 status: CreateMviewStatus::Backfilling {
734 progress,
735 pending_backfill_nodes,
736 table_ids_to_truncate: vec![],
737 },
738 }
739 }
740}
741
742impl Progress {
743 fn apply(
747 &mut self,
748 progress: &CreateMviewProgress,
749 version_stats: &HummockVersionStats,
750 ) -> UpdateProgressResult {
751 tracing::trace!(?progress, "update progress");
752 let actor = progress.backfill_actor_id;
753 let job_id = self.job_id;
754
755 let new_state = if progress.done {
756 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
757 } else {
758 BackfillState::ConsumingUpstream(
759 progress.consumed_epoch.into(),
760 progress.consumed_rows,
761 progress.buffered_rows,
762 )
763 };
764
765 {
766 {
767 let progress_state = self;
768
769 let upstream_total_key_count: u64 =
770 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
771
772 tracing::trace!(%job_id, "updating progress for table");
773 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
774
775 if progress_state.is_done() {
776 tracing::debug!(
777 %job_id,
778 "all actors done for creating mview!",
779 );
780
781 let PendingBackfillFragments {
782 next_backfill_nodes,
783 truncate_locality_provider_state_tables,
784 } = pending;
785
786 assert!(next_backfill_nodes.is_empty());
787 UpdateProgressResult::Finished {
788 truncate_locality_provider_state_tables,
789 }
790 } else if !pending.next_backfill_nodes.is_empty()
791 || !pending.truncate_locality_provider_state_tables.is_empty()
792 {
793 UpdateProgressResult::BackfillNodeFinished(pending)
794 } else {
795 UpdateProgressResult::None
796 }
797 }
798 }
799 }
800}
801
802fn calculate_total_key_count(
803 table_count: &HashMap<TableId, usize>,
804 version_stats: &HummockVersionStats,
805) -> u64 {
806 table_count
807 .iter()
808 .map(|(table_id, count)| {
809 assert_ne!(*count, 0);
810 *count as u64
811 * version_stats
812 .table_stats
813 .get(table_id)
814 .map_or(0, |stat| stat.total_key_count as u64)
815 })
816 .sum()
817}
818
819#[cfg(test)]
820mod tests {
821 use std::collections::HashSet;
822
823 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
824 use risingwave_common::id::WorkerId;
825 use risingwave_meta_model::fragment::DistributionType;
826 use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
827
828 use super::*;
829 use crate::controller::fragment::InflightActorInfo;
830
831 fn sample_inflight_fragment(
832 fragment_id: FragmentId,
833 actor_ids: &[ActorId],
834 flag: FragmentTypeFlag,
835 ) -> InflightFragmentInfo {
836 let mut fragment_type_mask = FragmentTypeMask::empty();
837 fragment_type_mask.add(flag);
838 InflightFragmentInfo {
839 fragment_id,
840 distribution_type: DistributionType::Single,
841 fragment_type_mask,
842 vnode_count: 0,
843 nodes: PbStreamNode::default(),
844 actors: actor_ids
845 .iter()
846 .map(|actor_id| {
847 (
848 *actor_id,
849 InflightActorInfo {
850 worker_id: WorkerId::new(1),
851 vnode_bitmap: None,
852 splits: vec![],
853 },
854 )
855 })
856 .collect(),
857 state_table_ids: HashSet::new(),
858 }
859 }
860
861 fn sample_progress(actor_id: ActorId) -> Progress {
862 Progress {
863 job_id: JobId::new(1),
864 states: HashMap::from([(actor_id, BackfillState::Init)]),
865 backfill_order_state: BackfillOrderState::default(),
866 done_count: 0,
867 backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
868 upstream_mv_count: HashMap::new(),
869 upstream_mvs_total_key_count: 0,
870 mv_backfill_consumed_rows: 0,
871 source_backfill_consumed_rows: 0,
872 mv_backfill_buffered_rows: 0,
873 }
874 }
875
876 #[test]
877 fn update_ignores_unknown_actor() {
878 let actor_known = ActorId::new(1);
879 let actor_unknown = ActorId::new(2);
880 let mut progress = sample_progress(actor_known);
881
882 let pending = progress.update(
883 actor_unknown,
884 BackfillState::Done(0, 0),
885 progress.upstream_mvs_total_key_count,
886 );
887
888 assert!(pending.next_backfill_nodes.is_empty());
889 assert_eq!(progress.states.len(), 1);
890 assert!(progress.states.contains_key(&actor_known));
891 }
892
893 #[test]
894 fn refresh_rebuilds_tracking_after_reschedule() {
895 let actor_old = ActorId::new(1);
896 let actor_new = ActorId::new(2);
897
898 let progress = Progress {
899 job_id: JobId::new(1),
900 states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
901 backfill_order_state: BackfillOrderState::default(),
902 done_count: 1,
903 backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
904 upstream_mv_count: HashMap::new(),
905 upstream_mvs_total_key_count: 0,
906 mv_backfill_consumed_rows: 5,
907 source_backfill_consumed_rows: 0,
908 mv_backfill_buffered_rows: 0,
909 };
910
911 let mut tracker = CreateMviewProgressTracker {
912 tracking_job: TrackingJob {
913 job_id: JobId::new(1),
914 is_recovered: false,
915 source_change: None,
916 },
917 status: CreateMviewStatus::Backfilling {
918 progress,
919 pending_backfill_nodes: vec![],
920 table_ids_to_truncate: vec![],
921 },
922 };
923
924 let fragment_infos = HashMap::from([(
925 FragmentId::new(10),
926 sample_inflight_fragment(
927 FragmentId::new(10),
928 &[actor_new],
929 FragmentTypeFlag::StreamScan,
930 ),
931 )]);
932
933 tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
934
935 let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
936 panic!("expected backfilling status");
937 };
938 assert!(progress.states.contains_key(&actor_new));
939 assert!(!progress.states.contains_key(&actor_old));
940 assert_eq!(progress.done_count, 0);
941 assert_eq!(progress.mv_backfill_consumed_rows, 0);
942 assert_eq!(progress.source_backfill_consumed_rows, 0);
943 }
944
945 #[test]
947 fn test_cdc_source_initialized_as_cdc_source_init() {
948 use std::collections::BTreeMap;
949
950 use risingwave_pb::catalog::{CreateType, PbSource, StreamSourceInfo};
951
952 use crate::barrier::command::CreateStreamingJobCommandInfo;
953 use crate::manager::{StreamingJob, StreamingJobType};
954 use crate::model::StreamJobFragmentsToCreate;
955
956 let source_info = StreamSourceInfo {
958 cdc_source_job: true,
959 ..Default::default()
960 };
961
962 let source = PbSource {
963 id: risingwave_common::id::SourceId::new(100),
964 info: Some(source_info),
965 with_properties: BTreeMap::from([("connector".to_owned(), "fake-cdc".to_owned())]),
966 ..Default::default()
967 };
968
969 let fragments = StreamJobFragments::for_test(JobId::new(100), BTreeMap::new());
971 let stream_job_fragments = StreamJobFragmentsToCreate {
972 inner: fragments,
973 downstreams: Default::default(),
974 };
975
976 let info = CreateStreamingJobCommandInfo {
977 stream_job_fragments,
978 upstream_fragment_downstreams: Default::default(),
979 init_split_assignment: Default::default(),
980 definition: "CREATE SOURCE ...".to_owned(),
981 job_type: StreamingJobType::Source,
982 create_type: CreateType::Foreground,
983 streaming_job: StreamingJob::Source(source),
984 fragment_backfill_ordering: Default::default(),
985 cdc_table_snapshot_splits: None,
986 locality_fragment_state_table_mapping: Default::default(),
987 is_serverless: false,
988 };
989
990 let tracker = CreateMviewProgressTracker::new(&info, &HummockVersionStats::default());
991
992 assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
994 assert!(!tracker.is_finished());
995 }
996
997 #[test]
999 fn test_cdc_source_transitions_to_finished_on_offset_update() {
1000 let mut tracker = CreateMviewProgressTracker {
1001 tracking_job: TrackingJob {
1002 job_id: JobId::new(300),
1003 is_recovered: false,
1004 source_change: None,
1005 },
1006 status: CreateMviewStatus::CdcSourceInit,
1007 };
1008
1009 assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1011 assert!(!tracker.is_finished());
1012
1013 tracker.mark_cdc_source_finished();
1015
1016 assert!(matches!(tracker.status, CreateMviewStatus::Finished { .. }));
1018 assert!(tracker.is_finished());
1019 }
1020}