1use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::CreateType;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
25
26use crate::MetaResult;
27use crate::barrier::CreateStreamingJobCommandInfo;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::InflightStreamingJobInfo;
30use crate::controller::fragment::InflightFragmentInfo;
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40 Init,
41 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42 Done(ConsumedRows, BufferedRows),
43}
44
45#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48 pub next_backfill_nodes: Vec<FragmentId>,
50 pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54#[derive(Debug)]
56pub(super) struct Progress {
57 job_id: JobId,
58 states: HashMap<ActorId, BackfillState>,
60 backfill_order_state: BackfillOrderState,
61 done_count: usize,
62
63 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66 upstream_mv_count: HashMap<TableId, usize>,
71 upstream_mvs_total_key_count: u64,
73 mv_backfill_consumed_rows: u64,
74 source_backfill_consumed_rows: u64,
75 mv_backfill_buffered_rows: u64,
78}
79
80impl Progress {
81 fn new(
83 job_id: JobId,
84 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
85 upstream_mv_count: HashMap<TableId, usize>,
86 upstream_total_key_count: u64,
87 backfill_order_state: BackfillOrderState,
88 ) -> Self {
89 let mut states = HashMap::new();
90 let mut backfill_upstream_types = HashMap::new();
91 for (actor, backfill_upstream_type) in actors {
92 states.insert(actor, BackfillState::Init);
93 backfill_upstream_types.insert(actor, backfill_upstream_type);
94 }
95 assert!(!states.is_empty());
96
97 Self {
98 job_id,
99 states,
100 backfill_upstream_types,
101 done_count: 0,
102 upstream_mv_count,
103 upstream_mvs_total_key_count: upstream_total_key_count,
104 mv_backfill_consumed_rows: 0,
105 source_backfill_consumed_rows: 0,
106 mv_backfill_buffered_rows: 0,
107 backfill_order_state,
108 }
109 }
110
111 fn update(
114 &mut self,
115 actor: ActorId,
116 new_state: BackfillState,
117 upstream_total_key_count: u64,
118 ) -> PendingBackfillFragments {
119 let mut result = PendingBackfillFragments::default();
120 self.upstream_mvs_total_key_count = upstream_total_key_count;
121 let total_actors = self.states.len();
122 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
123
124 let mut old_consumed_row = 0;
125 let mut new_consumed_row = 0;
126 let mut old_buffered_row = 0;
127 let mut new_buffered_row = 0;
128 match self.states.remove(&actor).unwrap() {
129 BackfillState::Init => {}
130 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
131 old_consumed_row = consumed_rows;
132 old_buffered_row = buffered_rows;
133 }
134 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
135 };
136 match &new_state {
137 BackfillState::Init => {}
138 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
139 new_consumed_row = *consumed_rows;
140 new_buffered_row = *buffered_rows;
141 }
142 BackfillState::Done(consumed_rows, buffered_rows) => {
143 tracing::debug!("actor {} done", actor);
144 new_consumed_row = *consumed_rows;
145 new_buffered_row = *buffered_rows;
146 self.done_count += 1;
147 let before_backfill_nodes = self
148 .backfill_order_state
149 .current_backfill_node_fragment_ids();
150 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
151 let after_backfill_nodes = self
152 .backfill_order_state
153 .current_backfill_node_fragment_ids();
154 let last_backfill_nodes_iter = before_backfill_nodes
156 .into_iter()
157 .filter(|x| !after_backfill_nodes.contains(x));
158 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
159 .filter_map(|fragment_id| {
160 self.backfill_order_state
161 .get_locality_fragment_state_table_mapping()
162 .get(&fragment_id)
163 })
164 .flatten()
165 .copied()
166 .collect();
167 tracing::debug!(
168 "{} actors out of {} complete",
169 self.done_count,
170 total_actors,
171 );
172 }
173 };
174 debug_assert!(
175 new_consumed_row >= old_consumed_row,
176 "backfill progress should not go backward"
177 );
178 debug_assert!(
179 new_buffered_row >= old_buffered_row,
180 "backfill progress should not go backward"
181 );
182 match backfill_upstream_type {
183 BackfillUpstreamType::MView => {
184 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
185 }
186 BackfillUpstreamType::Source => {
187 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
188 }
189 BackfillUpstreamType::Values => {
190 }
192 BackfillUpstreamType::LocalityProvider => {
193 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
196 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
197 }
198 }
199 self.states.insert(actor, new_state);
200 result
201 }
202
203 fn is_done(&self) -> bool {
205 tracing::trace!(
206 "Progress::is_done? {}, {}, {:?}",
207 self.done_count,
208 self.states.len(),
209 self.states
210 );
211 self.done_count == self.states.len()
212 }
213
214 fn calculate_progress(&self) -> String {
216 if self.is_done() || self.states.is_empty() {
217 return "100%".to_owned();
218 }
219 let mut mv_count = 0;
220 let mut source_count = 0;
221 for backfill_upstream_type in self.backfill_upstream_types.values() {
222 match backfill_upstream_type {
223 BackfillUpstreamType::MView => mv_count += 1,
224 BackfillUpstreamType::Source => source_count += 1,
225 BackfillUpstreamType::Values => (),
226 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
228 }
229
230 let mv_progress = (mv_count > 0).then_some({
231 let total_rows_to_consume =
234 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
235 if total_rows_to_consume == 0 {
236 "99.99%".to_owned()
237 } else {
238 let mut progress =
239 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
240 if progress > 1.0 {
241 progress = 0.9999;
242 }
243 format!(
244 "{:.2}% ({}/{})",
245 progress * 100.0,
246 self.mv_backfill_consumed_rows,
247 total_rows_to_consume
248 )
249 }
250 });
251 let source_progress = (source_count > 0).then_some(format!(
252 "{} rows consumed",
253 self.source_backfill_consumed_rows
254 ));
255 match (mv_progress, source_progress) {
256 (Some(mv_progress), Some(source_progress)) => {
257 format!(
258 "MView Backfill: {}, Source Backfill: {}",
259 mv_progress, source_progress
260 )
261 }
262 (Some(mv_progress), None) => mv_progress,
263 (None, Some(source_progress)) => source_progress,
264 (None, None) => "Unknown".to_owned(),
265 }
266 }
267}
268
269pub struct TrackingJob {
276 job_id: JobId,
277 is_recovered: bool,
278 source_change: Option<SourceChange>,
279}
280
281impl std::fmt::Display for TrackingJob {
282 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283 write!(
284 f,
285 "{}{}",
286 self.job_id,
287 if self.is_recovered { "<recovered>" } else { "" }
288 )
289 }
290}
291
292impl TrackingJob {
293 pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
295 Self {
296 job_id: stream_job_fragments.stream_job_id,
297 is_recovered: false,
298 source_change: Some(SourceChange::CreateJobFinished {
299 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
300 }),
301 }
302 }
303
304 pub(crate) fn recovered(
306 job_id: JobId,
307 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
308 ) -> Self {
309 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
310 fragment_infos
311 .iter()
312 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
313 );
314 let source_change = if source_backfill_fragments.is_empty() {
315 None
316 } else {
317 Some(SourceChange::CreateJobFinished {
318 finished_backfill_fragments: source_backfill_fragments,
319 })
320 };
321 Self {
322 job_id,
323 is_recovered: true,
324 source_change,
325 }
326 }
327
328 pub(crate) async fn finish(
330 self,
331 metadata_manager: &MetadataManager,
332 source_manager: &SourceManagerRef,
333 ) -> MetaResult<()> {
334 metadata_manager
335 .catalog_controller
336 .finish_streaming_job(self.job_id)
337 .await?;
338 if let Some(source_change) = self.source_change {
339 source_manager.apply_source_change(source_change).await;
340 }
341 Ok(())
342 }
343}
344
345impl std::fmt::Debug for TrackingJob {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 if !self.is_recovered {
348 write!(f, "TrackingJob::New({})", self.job_id)
349 } else {
350 write!(f, "TrackingJob::Recovered({})", self.job_id)
351 }
352 }
353}
354
355#[derive(Debug, Default)]
357pub(super) struct StagingCommitInfo {
358 pub finished_jobs: Vec<TrackingJob>,
360 pub table_ids_to_truncate: Vec<TableId>,
362}
363
364pub(super) enum UpdateProgressResult {
365 None,
366 Finished {
368 truncate_locality_provider_state_tables: Vec<TableId>,
369 },
370 BackfillNodeFinished(PendingBackfillFragments),
372}
373
374#[derive(Debug)]
375pub(super) struct CreateMviewProgressTracker {
376 job_id: JobId,
377 definition: String,
378 create_type: CreateType,
379 tracking_job: TrackingJob,
380 status: CreateMviewStatus,
381}
382
383#[derive(Debug)]
384enum CreateMviewStatus {
385 Backfilling {
386 progress: Progress,
388
389 pending_backfill_nodes: Vec<FragmentId>,
391
392 table_ids_to_truncate: Vec<TableId>,
394 },
395 Finished {
396 table_ids_to_truncate: Vec<TableId>,
397 },
398}
399
400impl CreateMviewProgressTracker {
401 pub fn recover(
402 creating_job_id: JobId,
403 definition: String,
404 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
405 backfill_order_state: BackfillOrderState,
406 version_stats: &HummockVersionStats,
407 ) -> Self {
408 {
409 let create_type = CreateType::Background;
410 let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
411 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
412 let status = if actors.is_empty() {
413 CreateMviewStatus::Finished {
414 table_ids_to_truncate: vec![],
415 }
416 } else {
417 let mut states = HashMap::new();
418 let mut backfill_upstream_types = HashMap::new();
419
420 for (actor, backfill_upstream_type) in actors {
421 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
422 backfill_upstream_types.insert(actor, backfill_upstream_type);
423 }
424
425 let progress = Self::recover_progress(
426 creating_job_id,
427 states,
428 backfill_upstream_types,
429 StreamJobFragments::upstream_table_counts_impl(
430 fragment_infos.values().map(|fragment| &fragment.nodes),
431 ),
432 version_stats,
433 backfill_order_state,
434 );
435 let pending_backfill_nodes = progress
436 .backfill_order_state
437 .current_backfill_node_fragment_ids();
438 CreateMviewStatus::Backfilling {
439 progress,
440 pending_backfill_nodes,
441 table_ids_to_truncate: vec![],
442 }
443 };
444 Self {
445 job_id: creating_job_id,
446 definition,
447 create_type,
448 tracking_job,
449 status,
450 }
451 }
452 }
453
454 fn recover_progress(
460 job_id: JobId,
461 states: HashMap<ActorId, BackfillState>,
462 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
463 upstream_mv_count: HashMap<TableId, usize>,
464 version_stats: &HummockVersionStats,
465 backfill_order_state: BackfillOrderState,
466 ) -> Progress {
467 let upstream_mvs_total_key_count =
468 calculate_total_key_count(&upstream_mv_count, version_stats);
469 Progress {
470 job_id,
471 states,
472 backfill_order_state,
473 backfill_upstream_types,
474 done_count: 0, upstream_mv_count,
476 upstream_mvs_total_key_count,
477 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, }
481 }
482
483 pub fn gen_ddl_progress(&self) -> DdlProgress {
484 let progress = match &self.status {
485 CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
486 CreateMviewStatus::Finished { .. } => "100%".to_owned(),
487 };
488 DdlProgress {
489 id: self.job_id.as_raw_id() as u64,
490 statement: self.definition.clone(),
491 create_type: self.create_type.as_str().to_owned(),
492 progress,
493 }
494 }
495
496 pub(super) fn apply_progress(
499 &mut self,
500 create_mview_progress: &CreateMviewProgress,
501 version_stats: &HummockVersionStats,
502 ) {
503 let CreateMviewStatus::Backfilling {
504 progress,
505 pending_backfill_nodes,
506 table_ids_to_truncate,
507 } = &mut self.status
508 else {
509 tracing::warn!(
510 "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
511 );
512 return;
513 };
514 {
515 {
517 match progress.apply(create_mview_progress, version_stats) {
519 UpdateProgressResult::None => {
520 tracing::trace!(?progress, "update progress");
521 }
522 UpdateProgressResult::Finished {
523 truncate_locality_provider_state_tables,
524 } => {
525 let mut table_ids_to_truncate = take(table_ids_to_truncate);
526 table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
527 tracing::trace!(?progress, "finish progress");
528 self.status = CreateMviewStatus::Finished {
529 table_ids_to_truncate,
530 };
531 }
532 UpdateProgressResult::BackfillNodeFinished(pending) => {
533 table_ids_to_truncate
534 .extend(pending.truncate_locality_provider_state_tables.clone());
535 tracing::trace!(
536 ?progress,
537 next_backfill_nodes = ?pending.next_backfill_nodes,
538 "start next backfill node"
539 );
540 pending_backfill_nodes.extend(pending.next_backfill_nodes);
541 }
542 }
543 }
544 }
545 }
546
547 pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
548 match &mut self.status {
549 CreateMviewStatus::Backfilling {
550 pending_backfill_nodes,
551 ..
552 } => Some(pending_backfill_nodes.drain(..)),
553 CreateMviewStatus::Finished { .. } => None,
554 }
555 .into_iter()
556 .flatten()
557 }
558
559 pub(super) fn collect_staging_commit_info(
560 &mut self,
561 ) -> (bool, impl Iterator<Item = TableId> + '_) {
562 let (is_finished, table_ids) = match &mut self.status {
563 CreateMviewStatus::Backfilling {
564 table_ids_to_truncate,
565 ..
566 } => (false, table_ids_to_truncate),
567 CreateMviewStatus::Finished {
568 table_ids_to_truncate,
569 ..
570 } => (true, table_ids_to_truncate),
571 };
572 (is_finished, table_ids.drain(..))
573 }
574
575 pub(super) fn is_finished(&self) -> bool {
576 matches!(self.status, CreateMviewStatus::Finished { .. })
577 }
578
579 pub(super) fn into_tracking_job(self) -> TrackingJob {
580 let CreateMviewStatus::Finished { .. } = self.status else {
581 panic!("should be called when finished");
582 };
583 self.tracking_job
584 }
585
586 pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
590 tracing::trace!(?info, "add job to track");
591 let CreateStreamingJobCommandInfo {
592 stream_job_fragments,
593 definition,
594 create_type,
595 fragment_backfill_ordering,
596 locality_fragment_state_table_mapping,
597 ..
598 } = info;
599 let job_id = stream_job_fragments.stream_job_id();
600 let definition = definition.clone();
601 let create_type = (*create_type).into();
602 let actors = stream_job_fragments.tracking_progress_actor_ids();
603 let tracking_job = TrackingJob::new(&info.stream_job_fragments);
604 if actors.is_empty() {
605 return Self {
607 job_id,
608 definition,
609 create_type,
610 tracking_job,
611 status: CreateMviewStatus::Finished {
612 table_ids_to_truncate: vec![],
613 },
614 };
615 }
616
617 let upstream_mv_count = stream_job_fragments.upstream_table_counts();
618 let upstream_total_key_count: u64 =
619 calculate_total_key_count(&upstream_mv_count, version_stats);
620
621 let backfill_order_state = BackfillOrderState::new(
622 fragment_backfill_ordering,
623 stream_job_fragments,
624 locality_fragment_state_table_mapping.clone(),
625 );
626 let progress = Progress::new(
627 job_id,
628 actors,
629 upstream_mv_count,
630 upstream_total_key_count,
631 backfill_order_state,
632 );
633 let pending_backfill_nodes = progress
634 .backfill_order_state
635 .current_backfill_node_fragment_ids();
636 Self {
637 job_id,
638 definition,
639 create_type,
640 tracking_job,
641 status: CreateMviewStatus::Backfilling {
642 progress,
643 pending_backfill_nodes,
644 table_ids_to_truncate: vec![],
645 },
646 }
647 }
648}
649
650impl Progress {
651 fn apply(
655 &mut self,
656 progress: &CreateMviewProgress,
657 version_stats: &HummockVersionStats,
658 ) -> UpdateProgressResult {
659 tracing::trace!(?progress, "update progress");
660 let actor = progress.backfill_actor_id;
661 let job_id = self.job_id;
662
663 let new_state = if progress.done {
664 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
665 } else {
666 BackfillState::ConsumingUpstream(
667 progress.consumed_epoch.into(),
668 progress.consumed_rows,
669 progress.buffered_rows,
670 )
671 };
672
673 {
674 {
675 let progress_state = self;
676
677 let upstream_total_key_count: u64 =
678 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
679
680 tracing::debug!(%job_id, "updating progress for table");
681 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
682
683 if progress_state.is_done() {
684 tracing::debug!(
685 %job_id,
686 "all actors done for creating mview!",
687 );
688
689 let PendingBackfillFragments {
690 next_backfill_nodes,
691 truncate_locality_provider_state_tables,
692 } = pending;
693
694 assert!(next_backfill_nodes.is_empty());
695 UpdateProgressResult::Finished {
696 truncate_locality_provider_state_tables,
697 }
698 } else if !pending.next_backfill_nodes.is_empty()
699 || !pending.truncate_locality_provider_state_tables.is_empty()
700 {
701 UpdateProgressResult::BackfillNodeFinished(pending)
702 } else {
703 UpdateProgressResult::None
704 }
705 }
706 }
707 }
708}
709
710fn calculate_total_key_count(
711 table_count: &HashMap<TableId, usize>,
712 version_stats: &HummockVersionStats,
713) -> u64 {
714 table_count
715 .iter()
716 .map(|(table_id, count)| {
717 assert_ne!(*count, 0);
718 *count as u64
719 * version_stats
720 .table_stats
721 .get(table_id)
722 .map_or(0, |stat| stat.total_key_count as u64)
723 })
724 .sum()
725}