1use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::CreateType;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
25
26use crate::MetaResult;
27use crate::barrier::CreateStreamingJobCommandInfo;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::InflightStreamingJobInfo;
30use crate::controller::fragment::InflightFragmentInfo;
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40 Init,
41 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42 Done(ConsumedRows, BufferedRows),
43}
44
45#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48 pub next_backfill_nodes: Vec<FragmentId>,
50 pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54#[derive(Debug)]
56pub(super) struct Progress {
57 job_id: JobId,
58 states: HashMap<ActorId, BackfillState>,
60 backfill_order_state: BackfillOrderState,
61 done_count: usize,
62
63 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66 upstream_mv_count: HashMap<TableId, usize>,
71 upstream_mvs_total_key_count: u64,
73 mv_backfill_consumed_rows: u64,
74 source_backfill_consumed_rows: u64,
75 mv_backfill_buffered_rows: u64,
78}
79
80impl Progress {
81 fn new(
83 job_id: JobId,
84 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
85 upstream_mv_count: HashMap<TableId, usize>,
86 upstream_total_key_count: u64,
87 backfill_order_state: BackfillOrderState,
88 ) -> Self {
89 let mut states = HashMap::new();
90 let mut backfill_upstream_types = HashMap::new();
91 for (actor, backfill_upstream_type) in actors {
92 states.insert(actor, BackfillState::Init);
93 backfill_upstream_types.insert(actor, backfill_upstream_type);
94 }
95 assert!(!states.is_empty());
96
97 Self {
98 job_id,
99 states,
100 backfill_upstream_types,
101 done_count: 0,
102 upstream_mv_count,
103 upstream_mvs_total_key_count: upstream_total_key_count,
104 mv_backfill_consumed_rows: 0,
105 source_backfill_consumed_rows: 0,
106 mv_backfill_buffered_rows: 0,
107 backfill_order_state,
108 }
109 }
110
111 fn update(
114 &mut self,
115 actor: ActorId,
116 new_state: BackfillState,
117 upstream_total_key_count: u64,
118 ) -> PendingBackfillFragments {
119 let mut result = PendingBackfillFragments::default();
120 self.upstream_mvs_total_key_count = upstream_total_key_count;
121 let total_actors = self.states.len();
122 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
123
124 let mut old_consumed_row = 0;
125 let mut new_consumed_row = 0;
126 let mut old_buffered_row = 0;
127 let mut new_buffered_row = 0;
128 match self.states.remove(&actor).unwrap() {
129 BackfillState::Init => {}
130 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
131 old_consumed_row = consumed_rows;
132 old_buffered_row = buffered_rows;
133 }
134 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
135 };
136 match &new_state {
137 BackfillState::Init => {}
138 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
139 new_consumed_row = *consumed_rows;
140 new_buffered_row = *buffered_rows;
141 }
142 BackfillState::Done(consumed_rows, buffered_rows) => {
143 tracing::debug!("actor {} done", actor);
144 new_consumed_row = *consumed_rows;
145 new_buffered_row = *buffered_rows;
146 self.done_count += 1;
147 let before_backfill_nodes = self
148 .backfill_order_state
149 .current_backfill_node_fragment_ids();
150 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
151 let after_backfill_nodes = self
152 .backfill_order_state
153 .current_backfill_node_fragment_ids();
154 let last_backfill_nodes_iter = before_backfill_nodes
156 .into_iter()
157 .filter(|x| !after_backfill_nodes.contains(x));
158 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
159 .filter_map(|fragment_id| {
160 self.backfill_order_state
161 .get_locality_fragment_state_table_mapping()
162 .get(&fragment_id)
163 })
164 .flatten()
165 .copied()
166 .collect();
167 tracing::debug!(
168 "{} actors out of {} complete",
169 self.done_count,
170 total_actors,
171 );
172 }
173 };
174 debug_assert!(
175 new_consumed_row >= old_consumed_row,
176 "backfill progress should not go backward"
177 );
178 debug_assert!(
179 new_buffered_row >= old_buffered_row,
180 "backfill progress should not go backward"
181 );
182 match backfill_upstream_type {
183 BackfillUpstreamType::MView => {
184 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
185 }
186 BackfillUpstreamType::Source => {
187 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
188 }
189 BackfillUpstreamType::Values => {
190 }
192 BackfillUpstreamType::LocalityProvider => {
193 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
196 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
197 }
198 }
199 self.states.insert(actor, new_state);
200 result
201 }
202
203 fn is_done(&self) -> bool {
205 tracing::trace!(
206 "Progress::is_done? {}, {}, {:?}",
207 self.done_count,
208 self.states.len(),
209 self.states
210 );
211 self.done_count == self.states.len()
212 }
213
214 fn calculate_progress(&self) -> String {
216 if self.is_done() || self.states.is_empty() {
217 return "100%".to_owned();
218 }
219 let mut mv_count = 0;
220 let mut source_count = 0;
221 for backfill_upstream_type in self.backfill_upstream_types.values() {
222 match backfill_upstream_type {
223 BackfillUpstreamType::MView => mv_count += 1,
224 BackfillUpstreamType::Source => source_count += 1,
225 BackfillUpstreamType::Values => (),
226 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
228 }
229
230 let mv_progress = (mv_count > 0).then_some({
231 let total_rows_to_consume =
234 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
235 if total_rows_to_consume == 0 {
236 "99.99%".to_owned()
237 } else {
238 let mut progress =
239 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
240 if progress > 1.0 {
241 progress = 0.9999;
242 }
243 format!(
244 "{:.2}% ({}/{})",
245 progress * 100.0,
246 self.mv_backfill_consumed_rows,
247 total_rows_to_consume
248 )
249 }
250 });
251 let source_progress = (source_count > 0).then_some(format!(
252 "{} rows consumed",
253 self.source_backfill_consumed_rows
254 ));
255 match (mv_progress, source_progress) {
256 (Some(mv_progress), Some(source_progress)) => {
257 format!(
258 "MView Backfill: {}, Source Backfill: {}",
259 mv_progress, source_progress
260 )
261 }
262 (Some(mv_progress), None) => mv_progress,
263 (None, Some(source_progress)) => source_progress,
264 (None, None) => "Unknown".to_owned(),
265 }
266 }
267}
268
269pub struct TrackingJob {
276 job_id: JobId,
277 is_recovered: bool,
278 source_change: Option<SourceChange>,
279}
280
281impl std::fmt::Display for TrackingJob {
282 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283 write!(
284 f,
285 "{}{}",
286 self.job_id,
287 if self.is_recovered { "<recovered>" } else { "" }
288 )
289 }
290}
291
292impl TrackingJob {
293 pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
295 Self {
296 job_id: stream_job_fragments.stream_job_id,
297 is_recovered: false,
298 source_change: Some(SourceChange::CreateJobFinished {
299 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
300 }),
301 }
302 }
303
304 pub(crate) fn recovered(
306 job_id: JobId,
307 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
308 ) -> Self {
309 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
310 fragment_infos
311 .iter()
312 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
313 );
314 let source_change = if source_backfill_fragments.is_empty() {
315 None
316 } else {
317 Some(SourceChange::CreateJobFinished {
318 finished_backfill_fragments: source_backfill_fragments,
319 })
320 };
321 Self {
322 job_id,
323 is_recovered: true,
324 source_change,
325 }
326 }
327
328 pub(crate) fn job_id(&self) -> JobId {
329 self.job_id
330 }
331
332 pub(crate) async fn finish(
334 self,
335 metadata_manager: &MetadataManager,
336 source_manager: &SourceManagerRef,
337 ) -> MetaResult<()> {
338 metadata_manager
339 .catalog_controller
340 .finish_streaming_job(self.job_id)
341 .await?;
342 if let Some(source_change) = self.source_change {
343 source_manager.apply_source_change(source_change).await;
344 }
345 Ok(())
346 }
347}
348
349impl std::fmt::Debug for TrackingJob {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 if !self.is_recovered {
352 write!(f, "TrackingJob::New({})", self.job_id)
353 } else {
354 write!(f, "TrackingJob::Recovered({})", self.job_id)
355 }
356 }
357}
358
359#[derive(Debug, Default)]
361pub(super) struct StagingCommitInfo {
362 pub finished_jobs: Vec<TrackingJob>,
364 pub table_ids_to_truncate: Vec<TableId>,
366}
367
368pub(super) enum UpdateProgressResult {
369 None,
370 Finished {
372 truncate_locality_provider_state_tables: Vec<TableId>,
373 },
374 BackfillNodeFinished(PendingBackfillFragments),
376}
377
378#[derive(Debug)]
379pub(super) struct CreateMviewProgressTracker {
380 job_id: JobId,
381 definition: String,
382 create_type: CreateType,
383 tracking_job: TrackingJob,
384 status: CreateMviewStatus,
385}
386
387#[derive(Debug)]
388enum CreateMviewStatus {
389 Backfilling {
390 progress: Progress,
392
393 pending_backfill_nodes: Vec<FragmentId>,
395
396 table_ids_to_truncate: Vec<TableId>,
398 },
399 Finished {
400 table_ids_to_truncate: Vec<TableId>,
401 },
402}
403
404impl CreateMviewProgressTracker {
405 pub fn recover(
406 creating_job_id: JobId,
407 definition: String,
408 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
409 backfill_order_state: BackfillOrderState,
410 version_stats: &HummockVersionStats,
411 ) -> Self {
412 {
413 let create_type = CreateType::Background;
414 let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
415 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
416 let status = if actors.is_empty() {
417 CreateMviewStatus::Finished {
418 table_ids_to_truncate: vec![],
419 }
420 } else {
421 let mut states = HashMap::new();
422 let mut backfill_upstream_types = HashMap::new();
423
424 for (actor, backfill_upstream_type) in actors {
425 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
426 backfill_upstream_types.insert(actor, backfill_upstream_type);
427 }
428
429 let progress = Self::recover_progress(
430 creating_job_id,
431 states,
432 backfill_upstream_types,
433 StreamJobFragments::upstream_table_counts_impl(
434 fragment_infos.values().map(|fragment| &fragment.nodes),
435 ),
436 version_stats,
437 backfill_order_state,
438 );
439 let pending_backfill_nodes = progress
440 .backfill_order_state
441 .current_backfill_node_fragment_ids();
442 CreateMviewStatus::Backfilling {
443 progress,
444 pending_backfill_nodes,
445 table_ids_to_truncate: vec![],
446 }
447 };
448 Self {
449 job_id: creating_job_id,
450 definition,
451 create_type,
452 tracking_job,
453 status,
454 }
455 }
456 }
457
458 fn recover_progress(
464 job_id: JobId,
465 states: HashMap<ActorId, BackfillState>,
466 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
467 upstream_mv_count: HashMap<TableId, usize>,
468 version_stats: &HummockVersionStats,
469 backfill_order_state: BackfillOrderState,
470 ) -> Progress {
471 let upstream_mvs_total_key_count =
472 calculate_total_key_count(&upstream_mv_count, version_stats);
473 Progress {
474 job_id,
475 states,
476 backfill_order_state,
477 backfill_upstream_types,
478 done_count: 0, upstream_mv_count,
480 upstream_mvs_total_key_count,
481 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, }
485 }
486
487 pub fn gen_ddl_progress(&self) -> DdlProgress {
488 let progress = match &self.status {
489 CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
490 CreateMviewStatus::Finished { .. } => "100%".to_owned(),
491 };
492 DdlProgress {
493 id: self.job_id.as_raw_id() as u64,
494 statement: self.definition.clone(),
495 create_type: self.create_type.as_str().to_owned(),
496 progress,
497 }
498 }
499
500 pub(super) fn apply_progress(
503 &mut self,
504 create_mview_progress: &CreateMviewProgress,
505 version_stats: &HummockVersionStats,
506 ) {
507 let CreateMviewStatus::Backfilling {
508 progress,
509 pending_backfill_nodes,
510 table_ids_to_truncate,
511 } = &mut self.status
512 else {
513 tracing::warn!(
514 "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
515 );
516 return;
517 };
518 {
519 {
521 match progress.apply(create_mview_progress, version_stats) {
523 UpdateProgressResult::None => {
524 tracing::trace!(?progress, "update progress");
525 }
526 UpdateProgressResult::Finished {
527 truncate_locality_provider_state_tables,
528 } => {
529 let mut table_ids_to_truncate = take(table_ids_to_truncate);
530 table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
531 tracing::trace!(?progress, "finish progress");
532 self.status = CreateMviewStatus::Finished {
533 table_ids_to_truncate,
534 };
535 }
536 UpdateProgressResult::BackfillNodeFinished(pending) => {
537 table_ids_to_truncate
538 .extend(pending.truncate_locality_provider_state_tables.clone());
539 tracing::trace!(
540 ?progress,
541 next_backfill_nodes = ?pending.next_backfill_nodes,
542 "start next backfill node"
543 );
544 pending_backfill_nodes.extend(pending.next_backfill_nodes);
545 }
546 }
547 }
548 }
549 }
550
551 pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
552 match &mut self.status {
553 CreateMviewStatus::Backfilling {
554 pending_backfill_nodes,
555 ..
556 } => Some(pending_backfill_nodes.drain(..)),
557 CreateMviewStatus::Finished { .. } => None,
558 }
559 .into_iter()
560 .flatten()
561 }
562
563 pub(super) fn collect_staging_commit_info(
564 &mut self,
565 ) -> (bool, impl Iterator<Item = TableId> + '_) {
566 let (is_finished, table_ids) = match &mut self.status {
567 CreateMviewStatus::Backfilling {
568 table_ids_to_truncate,
569 ..
570 } => (false, table_ids_to_truncate),
571 CreateMviewStatus::Finished {
572 table_ids_to_truncate,
573 ..
574 } => (true, table_ids_to_truncate),
575 };
576 (is_finished, table_ids.drain(..))
577 }
578
579 pub(super) fn is_finished(&self) -> bool {
580 matches!(self.status, CreateMviewStatus::Finished { .. })
581 }
582
583 pub(super) fn into_tracking_job(self) -> TrackingJob {
584 let CreateMviewStatus::Finished { .. } = self.status else {
585 panic!("should be called when finished");
586 };
587 self.tracking_job
588 }
589
590 pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
594 tracing::trace!(?info, "add job to track");
595 let CreateStreamingJobCommandInfo {
596 stream_job_fragments,
597 definition,
598 create_type,
599 fragment_backfill_ordering,
600 locality_fragment_state_table_mapping,
601 ..
602 } = info;
603 let job_id = stream_job_fragments.stream_job_id();
604 let definition = definition.clone();
605 let create_type = (*create_type).into();
606 let actors = stream_job_fragments.tracking_progress_actor_ids();
607 let tracking_job = TrackingJob::new(&info.stream_job_fragments);
608 if actors.is_empty() {
609 return Self {
611 job_id,
612 definition,
613 create_type,
614 tracking_job,
615 status: CreateMviewStatus::Finished {
616 table_ids_to_truncate: vec![],
617 },
618 };
619 }
620
621 let upstream_mv_count = stream_job_fragments.upstream_table_counts();
622 let upstream_total_key_count: u64 =
623 calculate_total_key_count(&upstream_mv_count, version_stats);
624
625 let backfill_order_state = BackfillOrderState::new(
626 fragment_backfill_ordering,
627 stream_job_fragments,
628 locality_fragment_state_table_mapping.clone(),
629 );
630 let progress = Progress::new(
631 job_id,
632 actors,
633 upstream_mv_count,
634 upstream_total_key_count,
635 backfill_order_state,
636 );
637 let pending_backfill_nodes = progress
638 .backfill_order_state
639 .current_backfill_node_fragment_ids();
640 Self {
641 job_id,
642 definition,
643 create_type,
644 tracking_job,
645 status: CreateMviewStatus::Backfilling {
646 progress,
647 pending_backfill_nodes,
648 table_ids_to_truncate: vec![],
649 },
650 }
651 }
652}
653
654impl Progress {
655 fn apply(
659 &mut self,
660 progress: &CreateMviewProgress,
661 version_stats: &HummockVersionStats,
662 ) -> UpdateProgressResult {
663 tracing::trace!(?progress, "update progress");
664 let actor = progress.backfill_actor_id;
665 let job_id = self.job_id;
666
667 let new_state = if progress.done {
668 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
669 } else {
670 BackfillState::ConsumingUpstream(
671 progress.consumed_epoch.into(),
672 progress.consumed_rows,
673 progress.buffered_rows,
674 )
675 };
676
677 {
678 {
679 let progress_state = self;
680
681 let upstream_total_key_count: u64 =
682 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
683
684 tracing::trace!(%job_id, "updating progress for table");
685 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
686
687 if progress_state.is_done() {
688 tracing::debug!(
689 %job_id,
690 "all actors done for creating mview!",
691 );
692
693 let PendingBackfillFragments {
694 next_backfill_nodes,
695 truncate_locality_provider_state_tables,
696 } = pending;
697
698 assert!(next_backfill_nodes.is_empty());
699 UpdateProgressResult::Finished {
700 truncate_locality_provider_state_tables,
701 }
702 } else if !pending.next_backfill_nodes.is_empty()
703 || !pending.truncate_locality_provider_state_tables.is_empty()
704 {
705 UpdateProgressResult::BackfillNodeFinished(pending)
706 } else {
707 UpdateProgressResult::None
708 }
709 }
710 }
711 }
712}
713
714fn calculate_total_key_count(
715 table_count: &HashMap<TableId, usize>,
716 version_stats: &HummockVersionStats,
717) -> u64 {
718 table_count
719 .iter()
720 .map(|(table_id, count)| {
721 assert_ne!(*count, 0);
722 *count as u64
723 * version_stats
724 .table_stats
725 .get(table_id)
726 .map_or(0, |stat| stat.total_key_count as u64)
727 })
728 .sum()
729}