1use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::CreateType;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
25
26use crate::MetaResult;
27use crate::barrier::CreateStreamingJobCommandInfo;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::InflightStreamingJobInfo;
30use crate::controller::fragment::InflightFragmentInfo;
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40 Init,
41 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42 Done(ConsumedRows, BufferedRows),
43}
44
45#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48 pub next_backfill_nodes: Vec<FragmentId>,
50 pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54#[derive(Debug, Clone)]
56pub(super) struct Progress {
57 job_id: JobId,
58 states: HashMap<ActorId, BackfillState>,
60 backfill_order_state: BackfillOrderState,
61 done_count: usize,
62
63 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66 upstream_mv_count: HashMap<TableId, usize>,
71 upstream_mvs_total_key_count: u64,
73 mv_backfill_consumed_rows: u64,
74 source_backfill_consumed_rows: u64,
75 mv_backfill_buffered_rows: u64,
78}
79
80impl Progress {
81 fn new(
83 job_id: JobId,
84 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
85 upstream_mv_count: HashMap<TableId, usize>,
86 upstream_total_key_count: u64,
87 backfill_order_state: BackfillOrderState,
88 ) -> Self {
89 let mut states = HashMap::new();
90 let mut backfill_upstream_types = HashMap::new();
91 for (actor, backfill_upstream_type) in actors {
92 states.insert(actor, BackfillState::Init);
93 backfill_upstream_types.insert(actor, backfill_upstream_type);
94 }
95 assert!(!states.is_empty());
96
97 Self {
98 job_id,
99 states,
100 backfill_upstream_types,
101 done_count: 0,
102 upstream_mv_count,
103 upstream_mvs_total_key_count: upstream_total_key_count,
104 mv_backfill_consumed_rows: 0,
105 source_backfill_consumed_rows: 0,
106 mv_backfill_buffered_rows: 0,
107 backfill_order_state,
108 }
109 }
110
111 fn update(
114 &mut self,
115 actor: ActorId,
116 new_state: BackfillState,
117 upstream_total_key_count: u64,
118 ) -> PendingBackfillFragments {
119 let mut result = PendingBackfillFragments::default();
120 self.upstream_mvs_total_key_count = upstream_total_key_count;
121 let total_actors = self.states.len();
122 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
123
124 let mut old_consumed_row = 0;
125 let mut new_consumed_row = 0;
126 let mut old_buffered_row = 0;
127 let mut new_buffered_row = 0;
128 match self.states.remove(&actor).unwrap() {
129 BackfillState::Init => {}
130 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
131 old_consumed_row = consumed_rows;
132 old_buffered_row = buffered_rows;
133 }
134 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
135 };
136 match &new_state {
137 BackfillState::Init => {}
138 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
139 new_consumed_row = *consumed_rows;
140 new_buffered_row = *buffered_rows;
141 }
142 BackfillState::Done(consumed_rows, buffered_rows) => {
143 tracing::debug!("actor {} done", actor);
144 new_consumed_row = *consumed_rows;
145 new_buffered_row = *buffered_rows;
146 self.done_count += 1;
147 let before_backfill_nodes = self
148 .backfill_order_state
149 .current_backfill_node_fragment_ids();
150 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
151 let after_backfill_nodes = self
152 .backfill_order_state
153 .current_backfill_node_fragment_ids();
154 let last_backfill_nodes_iter = before_backfill_nodes
156 .into_iter()
157 .filter(|x| !after_backfill_nodes.contains(x));
158 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
159 .filter_map(|fragment_id| {
160 self.backfill_order_state
161 .get_locality_fragment_state_table_mapping()
162 .get(&fragment_id)
163 })
164 .flatten()
165 .copied()
166 .collect();
167 tracing::debug!(
168 "{} actors out of {} complete",
169 self.done_count,
170 total_actors,
171 );
172 }
173 };
174 debug_assert!(
175 new_consumed_row >= old_consumed_row,
176 "backfill progress should not go backward"
177 );
178 debug_assert!(
179 new_buffered_row >= old_buffered_row,
180 "backfill progress should not go backward"
181 );
182 match backfill_upstream_type {
183 BackfillUpstreamType::MView => {
184 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
185 }
186 BackfillUpstreamType::Source => {
187 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
188 }
189 BackfillUpstreamType::Values => {
190 }
192 BackfillUpstreamType::LocalityProvider => {
193 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
196 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
197 }
198 }
199 self.states.insert(actor, new_state);
200 result
201 }
202
203 fn is_done(&self) -> bool {
205 tracing::trace!(
206 "Progress::is_done? {}, {}, {:?}",
207 self.done_count,
208 self.states.len(),
209 self.states
210 );
211 self.done_count == self.states.len()
212 }
213
214 fn calculate_progress(&self) -> String {
216 if self.is_done() || self.states.is_empty() {
217 return "100%".to_owned();
218 }
219 let mut mv_count = 0;
220 let mut source_count = 0;
221 for backfill_upstream_type in self.backfill_upstream_types.values() {
222 match backfill_upstream_type {
223 BackfillUpstreamType::MView => mv_count += 1,
224 BackfillUpstreamType::Source => source_count += 1,
225 BackfillUpstreamType::Values => (),
226 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
228 }
229
230 let mv_progress = (mv_count > 0).then_some({
231 let total_rows_to_consume =
234 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
235 if total_rows_to_consume == 0 {
236 "99.99%".to_owned()
237 } else {
238 let mut progress =
239 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
240 if progress > 1.0 {
241 progress = 0.9999;
242 }
243 format!(
244 "{:.2}% ({}/{})",
245 progress * 100.0,
246 self.mv_backfill_consumed_rows,
247 total_rows_to_consume
248 )
249 }
250 });
251 let source_progress = (source_count > 0).then_some(format!(
252 "{} rows consumed",
253 self.source_backfill_consumed_rows
254 ));
255 match (mv_progress, source_progress) {
256 (Some(mv_progress), Some(source_progress)) => {
257 format!(
258 "MView Backfill: {}, Source Backfill: {}",
259 mv_progress, source_progress
260 )
261 }
262 (Some(mv_progress), None) => mv_progress,
263 (None, Some(source_progress)) => source_progress,
264 (None, None) => "Unknown".to_owned(),
265 }
266 }
267}
268
269#[derive(Clone)]
276pub struct TrackingJob {
277 job_id: JobId,
278 is_recovered: bool,
279 source_change: Option<SourceChange>,
280}
281
282impl std::fmt::Display for TrackingJob {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 write!(
285 f,
286 "{}{}",
287 self.job_id,
288 if self.is_recovered { "<recovered>" } else { "" }
289 )
290 }
291}
292
293impl TrackingJob {
294 pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
296 Self {
297 job_id: stream_job_fragments.stream_job_id,
298 is_recovered: false,
299 source_change: Some(SourceChange::CreateJobFinished {
300 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
301 }),
302 }
303 }
304
305 pub(crate) fn recovered(
307 job_id: JobId,
308 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
309 ) -> Self {
310 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
311 fragment_infos
312 .iter()
313 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
314 );
315 let source_change = if source_backfill_fragments.is_empty() {
316 None
317 } else {
318 Some(SourceChange::CreateJobFinished {
319 finished_backfill_fragments: source_backfill_fragments,
320 })
321 };
322 Self {
323 job_id,
324 is_recovered: true,
325 source_change,
326 }
327 }
328
329 pub(crate) async fn finish(
331 self,
332 metadata_manager: &MetadataManager,
333 source_manager: &SourceManagerRef,
334 ) -> MetaResult<()> {
335 metadata_manager
336 .catalog_controller
337 .finish_streaming_job(self.job_id)
338 .await?;
339 if let Some(source_change) = self.source_change {
340 source_manager.apply_source_change(source_change).await;
341 }
342 Ok(())
343 }
344}
345
346impl std::fmt::Debug for TrackingJob {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 if !self.is_recovered {
349 write!(f, "TrackingJob::New({})", self.job_id)
350 } else {
351 write!(f, "TrackingJob::Recovered({})", self.job_id)
352 }
353 }
354}
355
356#[derive(Debug, Default)]
358pub(super) struct StagingCommitInfo {
359 pub finished_jobs: Vec<TrackingJob>,
361 pub table_ids_to_truncate: Vec<TableId>,
363}
364
365pub(super) enum UpdateProgressResult {
366 None,
367 Finished {
369 truncate_locality_provider_state_tables: Vec<TableId>,
370 },
371 BackfillNodeFinished(PendingBackfillFragments),
373}
374
375#[derive(Debug, Clone)]
376pub(super) struct CreateMviewProgressTracker {
377 job_id: JobId,
378 definition: String,
379 create_type: CreateType,
380 tracking_job: TrackingJob,
381 status: CreateMviewStatus,
382}
383
384#[derive(Debug, Clone)]
385enum CreateMviewStatus {
386 Backfilling {
387 progress: Progress,
389
390 pending_backfill_nodes: Vec<FragmentId>,
392
393 table_ids_to_truncate: Vec<TableId>,
395 },
396 Finished {
397 table_ids_to_truncate: Vec<TableId>,
398 },
399}
400
401impl CreateMviewProgressTracker {
402 pub fn recover(
403 creating_job_id: JobId,
404 definition: String,
405 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
406 backfill_order_state: BackfillOrderState,
407 version_stats: &HummockVersionStats,
408 ) -> Self {
409 {
410 let create_type = CreateType::Background;
411 let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
412 let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
413 let status = if actors.is_empty() {
414 CreateMviewStatus::Finished {
415 table_ids_to_truncate: vec![],
416 }
417 } else {
418 let mut states = HashMap::new();
419 let mut backfill_upstream_types = HashMap::new();
420
421 for (actor, backfill_upstream_type) in actors {
422 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
423 backfill_upstream_types.insert(actor, backfill_upstream_type);
424 }
425
426 let progress = Self::recover_progress(
427 creating_job_id,
428 states,
429 backfill_upstream_types,
430 StreamJobFragments::upstream_table_counts_impl(
431 fragment_infos.values().map(|fragment| &fragment.nodes),
432 ),
433 version_stats,
434 backfill_order_state,
435 );
436 CreateMviewStatus::Backfilling {
437 progress,
438 pending_backfill_nodes: vec![],
439 table_ids_to_truncate: vec![],
440 }
441 };
442 Self {
443 job_id: creating_job_id,
444 definition,
445 create_type,
446 tracking_job,
447 status,
448 }
449 }
450 }
451
452 fn recover_progress(
458 job_id: JobId,
459 states: HashMap<ActorId, BackfillState>,
460 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
461 upstream_mv_count: HashMap<TableId, usize>,
462 version_stats: &HummockVersionStats,
463 backfill_order_state: BackfillOrderState,
464 ) -> Progress {
465 let upstream_mvs_total_key_count =
466 calculate_total_key_count(&upstream_mv_count, version_stats);
467 Progress {
468 job_id,
469 states,
470 backfill_order_state,
471 backfill_upstream_types,
472 done_count: 0, upstream_mv_count,
474 upstream_mvs_total_key_count,
475 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, }
479 }
480
481 pub fn gen_ddl_progress(&self) -> DdlProgress {
482 let progress = match &self.status {
483 CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
484 CreateMviewStatus::Finished { .. } => "100%".to_owned(),
485 };
486 DdlProgress {
487 id: self.job_id.as_raw_id() as u64,
488 statement: self.definition.clone(),
489 create_type: self.create_type.as_str().to_owned(),
490 progress,
491 }
492 }
493
494 pub(super) fn apply_progress(
497 &mut self,
498 create_mview_progress: &CreateMviewProgress,
499 version_stats: &HummockVersionStats,
500 ) {
501 let CreateMviewStatus::Backfilling {
502 progress,
503 pending_backfill_nodes,
504 table_ids_to_truncate,
505 } = &mut self.status
506 else {
507 tracing::warn!(
508 "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
509 );
510 return;
511 };
512 {
513 {
515 match progress.apply(create_mview_progress, version_stats) {
517 UpdateProgressResult::None => {
518 tracing::trace!(?progress, "update progress");
519 }
520 UpdateProgressResult::Finished {
521 truncate_locality_provider_state_tables,
522 } => {
523 let mut table_ids_to_truncate = take(table_ids_to_truncate);
524 table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
525 tracing::trace!(?progress, "finish progress");
526 self.status = CreateMviewStatus::Finished {
527 table_ids_to_truncate,
528 };
529 }
530 UpdateProgressResult::BackfillNodeFinished(pending) => {
531 table_ids_to_truncate
532 .extend(pending.truncate_locality_provider_state_tables.clone());
533 tracing::trace!(
534 ?progress,
535 next_backfill_nodes = ?pending.next_backfill_nodes,
536 "start next backfill node"
537 );
538 pending_backfill_nodes.extend(pending.next_backfill_nodes);
539 }
540 }
541 }
542 }
543 }
544
545 pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
546 match &mut self.status {
547 CreateMviewStatus::Backfilling {
548 pending_backfill_nodes,
549 ..
550 } => Some(pending_backfill_nodes.drain(..)),
551 CreateMviewStatus::Finished { .. } => None,
552 }
553 .into_iter()
554 .flatten()
555 }
556
557 pub(super) fn collect_staging_commit_info(
558 &mut self,
559 ) -> (bool, impl Iterator<Item = TableId> + '_) {
560 let (is_finished, table_ids) = match &mut self.status {
561 CreateMviewStatus::Backfilling {
562 table_ids_to_truncate,
563 ..
564 } => (false, table_ids_to_truncate),
565 CreateMviewStatus::Finished {
566 table_ids_to_truncate,
567 ..
568 } => (true, table_ids_to_truncate),
569 };
570 (is_finished, table_ids.drain(..))
571 }
572
573 pub(super) fn is_finished(&self) -> bool {
574 matches!(self.status, CreateMviewStatus::Finished { .. })
575 }
576
577 pub(super) fn into_tracking_job(self) -> TrackingJob {
578 let CreateMviewStatus::Finished { .. } = self.status else {
579 panic!("should be called when finished");
580 };
581 self.tracking_job
582 }
583
584 pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
588 tracing::trace!(?info, "add job to track");
589 let CreateStreamingJobCommandInfo {
590 stream_job_fragments,
591 definition,
592 create_type,
593 fragment_backfill_ordering,
594 locality_fragment_state_table_mapping,
595 ..
596 } = info;
597 let job_id = stream_job_fragments.stream_job_id();
598 let definition = definition.clone();
599 let create_type = (*create_type).into();
600 let actors = stream_job_fragments.tracking_progress_actor_ids();
601 let tracking_job = TrackingJob::new(&info.stream_job_fragments);
602 if actors.is_empty() {
603 return Self {
605 job_id,
606 definition,
607 create_type,
608 tracking_job,
609 status: CreateMviewStatus::Finished {
610 table_ids_to_truncate: vec![],
611 },
612 };
613 }
614
615 let upstream_mv_count = stream_job_fragments.upstream_table_counts();
616 let upstream_total_key_count: u64 =
617 calculate_total_key_count(&upstream_mv_count, version_stats);
618
619 let backfill_order_state = BackfillOrderState::new(
620 fragment_backfill_ordering,
621 stream_job_fragments,
622 locality_fragment_state_table_mapping.clone(),
623 );
624 let progress = Progress::new(
625 job_id,
626 actors,
627 upstream_mv_count,
628 upstream_total_key_count,
629 backfill_order_state,
630 );
631 Self {
632 job_id,
633 definition,
634 create_type,
635 tracking_job,
636 status: CreateMviewStatus::Backfilling {
637 progress,
638 pending_backfill_nodes: vec![],
639 table_ids_to_truncate: vec![],
640 },
641 }
642 }
643}
644
645impl Progress {
646 fn apply(
650 &mut self,
651 progress: &CreateMviewProgress,
652 version_stats: &HummockVersionStats,
653 ) -> UpdateProgressResult {
654 tracing::trace!(?progress, "update progress");
655 let actor = progress.backfill_actor_id;
656 let job_id = self.job_id;
657
658 let new_state = if progress.done {
659 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
660 } else {
661 BackfillState::ConsumingUpstream(
662 progress.consumed_epoch.into(),
663 progress.consumed_rows,
664 progress.buffered_rows,
665 )
666 };
667
668 {
669 {
670 let progress_state = self;
671
672 let upstream_total_key_count: u64 =
673 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
674
675 tracing::debug!(%job_id, "updating progress for table");
676 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
677
678 if progress_state.is_done() {
679 tracing::debug!(
680 %job_id,
681 "all actors done for creating mview!",
682 );
683
684 let PendingBackfillFragments {
685 next_backfill_nodes,
686 truncate_locality_provider_state_tables,
687 } = pending;
688
689 assert!(next_backfill_nodes.is_empty());
690 UpdateProgressResult::Finished {
691 truncate_locality_provider_state_tables,
692 }
693 } else if !pending.next_backfill_nodes.is_empty()
694 || !pending.truncate_locality_provider_state_tables.is_empty()
695 {
696 UpdateProgressResult::BackfillNodeFinished(pending)
697 } else {
698 UpdateProgressResult::None
699 }
700 }
701 }
702 }
703}
704
705fn calculate_total_key_count(
706 table_count: &HashMap<TableId, usize>,
707 version_stats: &HummockVersionStats,
708) -> u64 {
709 table_count
710 .iter()
711 .map(|(table_id, count)| {
712 assert_ne!(*count, 0);
713 *count as u64
714 * version_stats
715 .table_stats
716 .get(table_id)
717 .map_or(0, |stat| stat.total_key_count as u64)
718 })
719 .sum()
720}