1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::{CreateType, ObjectId};
22use risingwave_pb::catalog::PbCreateType;
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::PbBarrierCompleteResponse;
26use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
27
28use crate::MetaResult;
29use crate::barrier::backfill_order_control::BackfillOrderState;
30use crate::barrier::info::BarrierInfo;
31use crate::barrier::{
32 Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
33};
34use crate::manager::MetadataManager;
35use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
36
37type ConsumedRows = u64;
38
39#[derive(Clone, Copy, Debug)]
40enum BackfillState {
41 Init,
42 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
43 Done(ConsumedRows),
44}
45
46#[derive(Debug)]
48pub(super) struct Progress {
49 states: HashMap<ActorId, BackfillState>,
51 backfill_order_state: BackfillOrderState,
52 done_count: usize,
53
54 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
56
57 upstream_mv_count: HashMap<TableId, usize>,
62 upstream_mvs_total_key_count: u64,
64 mv_backfill_consumed_rows: u64,
65 source_backfill_consumed_rows: u64,
66
67 definition: String,
69 create_type: CreateType,
71}
72
73impl Progress {
74 fn new(
76 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
77 upstream_mv_count: HashMap<TableId, usize>,
78 upstream_total_key_count: u64,
79 definition: String,
80 create_type: CreateType,
81 backfill_order_state: BackfillOrderState,
82 ) -> Self {
83 let mut states = HashMap::new();
84 let mut backfill_upstream_types = HashMap::new();
85 for (actor, backfill_upstream_type) in actors {
86 states.insert(actor, BackfillState::Init);
87 backfill_upstream_types.insert(actor, backfill_upstream_type);
88 }
89 assert!(!states.is_empty());
90
91 Self {
92 states,
93 backfill_upstream_types,
94 done_count: 0,
95 upstream_mv_count,
96 upstream_mvs_total_key_count: upstream_total_key_count,
97 mv_backfill_consumed_rows: 0,
98 source_backfill_consumed_rows: 0,
99 definition,
100 create_type,
101 backfill_order_state,
102 }
103 }
104
105 fn update(
107 &mut self,
108 actor: ActorId,
109 new_state: BackfillState,
110 upstream_total_key_count: u64,
111 ) -> Vec<FragmentId> {
112 let mut next_backfill_nodes = vec![];
113 self.upstream_mvs_total_key_count = upstream_total_key_count;
114 let total_actors = self.states.len();
115 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
116 tracing::debug!(?actor, states = ?self.states, "update progress for actor");
117
118 let mut old = 0;
119 let mut new = 0;
120 match self.states.remove(&actor).unwrap() {
121 BackfillState::Init => {}
122 BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
123 old = old_consumed_rows;
124 }
125 BackfillState::Done(_) => panic!("should not report done multiple times"),
126 };
127 match &new_state {
128 BackfillState::Init => {}
129 BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
130 new = *new_consumed_rows;
131 }
132 BackfillState::Done(new_consumed_rows) => {
133 tracing::debug!("actor {} done", actor);
134 new = *new_consumed_rows;
135 self.done_count += 1;
136 next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
137 tracing::debug!(
138 "{} actors out of {} complete",
139 self.done_count,
140 total_actors,
141 );
142 }
143 };
144 debug_assert!(new >= old, "backfill progress should not go backward");
145 match backfill_upstream_type {
146 BackfillUpstreamType::MView => {
147 self.mv_backfill_consumed_rows += new - old;
148 }
149 BackfillUpstreamType::Source => {
150 self.source_backfill_consumed_rows += new - old;
151 }
152 BackfillUpstreamType::Values => {
153 }
155 }
156 self.states.insert(actor, new_state);
157 next_backfill_nodes
158 }
159
160 fn is_done(&self) -> bool {
162 tracing::trace!(
163 "Progress::is_done? {}, {}, {:?}",
164 self.done_count,
165 self.states.len(),
166 self.states
167 );
168 self.done_count == self.states.len()
169 }
170
171 fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
174 self.states.keys().cloned()
175 }
176
177 fn calculate_progress(&self) -> String {
179 if self.is_done() || self.states.is_empty() {
180 return "100%".to_owned();
181 }
182 let mut mv_count = 0;
183 let mut source_count = 0;
184 for backfill_upstream_type in self.backfill_upstream_types.values() {
185 match backfill_upstream_type {
186 BackfillUpstreamType::MView => mv_count += 1,
187 BackfillUpstreamType::Source => source_count += 1,
188 BackfillUpstreamType::Values => (),
189 }
190 }
191
192 let mv_progress = (mv_count > 0).then_some({
193 if self.upstream_mvs_total_key_count == 0 {
194 "99.99%".to_owned()
195 } else {
196 let mut progress = self.mv_backfill_consumed_rows as f64
197 / (self.upstream_mvs_total_key_count as f64);
198 if progress > 1.0 {
199 progress = 0.9999;
200 }
201 format!(
202 "{:.2}% ({}/{})",
203 progress * 100.0,
204 self.mv_backfill_consumed_rows,
205 self.upstream_mvs_total_key_count
206 )
207 }
208 });
209 let source_progress = (source_count > 0).then_some(format!(
210 "{} rows consumed",
211 self.source_backfill_consumed_rows
212 ));
213 match (mv_progress, source_progress) {
214 (Some(mv_progress), Some(source_progress)) => {
215 format!(
216 "MView Backfill: {}, Source Backfill: {}",
217 mv_progress, source_progress
218 )
219 }
220 (Some(mv_progress), None) => mv_progress,
221 (None, Some(source_progress)) => source_progress,
222 (None, None) => "Unknown".to_owned(),
223 }
224 }
225}
226
227pub enum TrackingJob {
234 New(TrackingCommand),
235 Recovered(RecoveredTrackingJob),
236}
237
238impl std::fmt::Display for TrackingJob {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 match self {
241 TrackingJob::New(command) => write!(f, "{}", command.job_id),
242 TrackingJob::Recovered(recovered) => write!(f, "{}<recovered>", recovered.id),
243 }
244 }
245}
246
247impl TrackingJob {
248 pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
250 match self {
251 TrackingJob::New(command) => {
252 metadata_manager
253 .catalog_controller
254 .finish_streaming_job(
255 command.job_id.table_id as i32,
256 command.replace_stream_job.clone(),
257 )
258 .await?;
259 Ok(())
260 }
261 TrackingJob::Recovered(recovered) => {
262 metadata_manager
263 .catalog_controller
264 .finish_streaming_job(recovered.id, None)
265 .await?;
266 Ok(())
267 }
268 }
269 }
270
271 pub(crate) fn table_to_create(&self) -> TableId {
272 match self {
273 TrackingJob::New(command) => command.job_id,
274 TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
275 }
276 }
277}
278
279impl std::fmt::Debug for TrackingJob {
280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281 match self {
282 TrackingJob::New(command) => write!(f, "TrackingJob::New({:?})", command.job_id),
283 TrackingJob::Recovered(recovered) => {
284 write!(f, "TrackingJob::Recovered({:?})", recovered.id)
285 }
286 }
287 }
288}
289
290pub struct RecoveredTrackingJob {
291 pub id: ObjectId,
292}
293
294pub(super) struct TrackingCommand {
296 pub job_id: TableId,
297 pub replace_stream_job: Option<ReplaceStreamJobPlan>,
298}
299
300pub(super) enum UpdateProgressResult {
301 None,
302 Finished(TrackingJob),
303 BackfillNodeFinished(Vec<FragmentId>),
304}
305
306#[derive(Default, Debug)]
312pub(super) struct CreateMviewProgressTracker {
313 progress_map: HashMap<TableId, (Progress, TrackingJob)>,
315
316 actor_map: HashMap<ActorId, TableId>,
317
318 pending_finished_jobs: Vec<TrackingJob>,
320
321 pending_backfill_nodes: Vec<FragmentId>,
323}
324
325impl CreateMviewProgressTracker {
326 pub fn recover(
334 jobs: impl IntoIterator<Item = (TableId, (String, &StreamJobFragments, BackfillOrderState))>,
335 version_stats: &HummockVersionStats,
336 ) -> Self {
337 let mut actor_map = HashMap::new();
338 let mut progress_map = HashMap::new();
339 for (creating_table_id, (definition, table_fragments, backfill_order_state)) in jobs {
340 let mut states = HashMap::new();
341 let mut backfill_upstream_types = HashMap::new();
342 let actors = table_fragments.tracking_progress_actor_ids();
343 for (actor, backfill_upstream_type) in actors {
344 actor_map.insert(actor, creating_table_id);
345 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
346 backfill_upstream_types.insert(actor, backfill_upstream_type);
347 }
348
349 let progress = Self::recover_progress(
350 states,
351 backfill_upstream_types,
352 table_fragments.upstream_table_counts(),
353 definition,
354 version_stats,
355 backfill_order_state,
356 );
357 let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob {
358 id: creating_table_id.table_id as i32,
359 });
360 progress_map.insert(creating_table_id, (progress, tracking_job));
361 }
362 Self {
363 progress_map,
364 actor_map,
365 pending_finished_jobs: Vec::new(),
366 pending_backfill_nodes: Vec::new(),
367 }
368 }
369
370 fn recover_progress(
376 states: HashMap<ActorId, BackfillState>,
377 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
378 upstream_mv_count: HashMap<TableId, usize>,
379 definition: String,
380 version_stats: &HummockVersionStats,
381 backfill_order_state: BackfillOrderState,
382 ) -> Progress {
383 let upstream_mvs_total_key_count =
384 calculate_total_key_count(&upstream_mv_count, version_stats);
385 Progress {
386 states,
387 backfill_order_state,
388 backfill_upstream_types,
389 done_count: 0, upstream_mv_count,
391 upstream_mvs_total_key_count,
392 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, definition,
395 create_type: CreateType::Background,
396 }
397 }
398
399 pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
400 self.progress_map
401 .iter()
402 .map(|(table_id, (x, _))| {
403 let table_id = table_id.table_id;
404 let ddl_progress = DdlProgress {
405 id: table_id as u64,
406 statement: x.definition.clone(),
407 create_type: x.create_type.as_str().to_owned(),
408 progress: x.calculate_progress(),
409 };
410 (table_id, ddl_progress)
411 })
412 .collect()
413 }
414
415 pub(super) fn update_tracking_jobs<'a>(
416 &mut self,
417 info: Option<(
418 &CreateStreamingJobCommandInfo,
419 Option<&ReplaceStreamJobPlan>,
420 )>,
421 create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
422 version_stats: &HummockVersionStats,
423 ) {
424 {
425 {
426 let finished_commands = {
428 let mut commands = vec![];
429 if let Some((create_job_info, replace_stream_job)) = info
431 && let Some(command) =
432 self.add(create_job_info, replace_stream_job, version_stats)
433 {
434 commands.push(command);
436 }
437 for progress in create_mview_progress {
439 match self.update(progress, version_stats) {
441 UpdateProgressResult::None => {
442 tracing::trace!(?progress, "update progress");
443 }
444 UpdateProgressResult::Finished(command) => {
445 tracing::trace!(?progress, "finish progress");
446 commands.push(command);
447 }
448 UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) => {
449 tracing::trace!(
450 ?progress,
451 ?next_backfill_nodes,
452 "start next backfill node"
453 );
454 self.queue_backfill(next_backfill_nodes);
455 }
456 }
457 }
458 commands
459 };
460
461 for command in finished_commands {
462 self.stash_command_to_finish(command);
463 }
464 }
465 }
466 }
467
468 pub(super) fn apply_collected_command(
471 &mut self,
472 command: Option<&Command>,
473 barrier_info: &BarrierInfo,
474 resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
475 version_stats: &HummockVersionStats,
476 ) -> Vec<TrackingJob> {
477 let new_tracking_job_info =
478 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
479 match job_type {
480 CreateStreamingJobType::Normal => Some((info, None)),
481 CreateStreamingJobType::SinkIntoTable(replace_stream_job) => {
482 Some((info, Some(replace_stream_job)))
483 }
484 CreateStreamingJobType::SnapshotBackfill(_) => {
485 None
487 }
488 }
489 } else {
490 None
491 };
492 self.update_tracking_jobs(
493 new_tracking_job_info,
494 resps
495 .into_iter()
496 .flat_map(|resp| resp.create_mview_progress.iter()),
497 version_stats,
498 );
499 for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
500 self.cancel_command(table_id);
503 }
504 if barrier_info.kind.is_checkpoint() {
505 self.take_finished_jobs()
506 } else {
507 vec![]
508 }
509 }
510
511 pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
513 self.pending_finished_jobs.push(finished_job);
514 }
515
516 fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
517 self.pending_backfill_nodes.extend(backfill_nodes);
518 }
519
520 pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
522 tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
523 take(&mut self.pending_finished_jobs)
524 }
525
526 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
527 take(&mut self.pending_backfill_nodes)
528 }
529
530 pub(super) fn has_pending_finished_jobs(&self) -> bool {
531 !self.pending_finished_jobs.is_empty()
532 }
533
534 pub(super) fn cancel_command(&mut self, id: TableId) {
535 let _ = self.progress_map.remove(&id);
536 self.pending_finished_jobs
537 .retain(|x| x.table_to_create() != id);
538 self.actor_map.retain(|_, table_id| *table_id != id);
539 }
540
541 pub fn abort_all(&mut self) {
543 self.actor_map.clear();
544 self.pending_finished_jobs.clear();
545 self.progress_map.clear();
546 }
547
548 pub fn add(
552 &mut self,
553 info: &CreateStreamingJobCommandInfo,
554 replace_stream_job: Option<&ReplaceStreamJobPlan>,
555 version_stats: &HummockVersionStats,
556 ) -> Option<TrackingJob> {
557 tracing::trace!(?info, "add job to track");
558 let (info, actors, replace_table_info) = {
559 let CreateStreamingJobCommandInfo {
560 stream_job_fragments,
561 ..
562 } = info;
563 let actors = stream_job_fragments.tracking_progress_actor_ids();
564 if actors.is_empty() {
565 return Some(TrackingJob::New(TrackingCommand {
567 job_id: info.stream_job_fragments.stream_job_id,
568 replace_stream_job: replace_stream_job.cloned(),
569 }));
570 }
571 (info.clone(), actors, replace_stream_job.cloned())
572 };
573
574 let CreateStreamingJobCommandInfo {
575 stream_job_fragments: table_fragments,
576 definition,
577 create_type,
578 fragment_backfill_ordering,
579 streaming_job,
580 ..
581 } = info;
582
583 let creating_job_id = table_fragments.stream_job_id();
584 let upstream_mv_count = table_fragments.upstream_table_counts();
585 let upstream_total_key_count: u64 =
586 calculate_total_key_count(&upstream_mv_count, version_stats);
587
588 for (actor, _backfill_upstream_type) in &actors {
589 self.actor_map.insert(*actor, creating_job_id);
590 }
591
592 let backfill_order_state =
593 BackfillOrderState::new(fragment_backfill_ordering, &table_fragments);
594 let progress = Progress::new(
595 actors,
596 upstream_mv_count,
597 upstream_total_key_count,
598 definition.clone(),
599 create_type.into(),
600 backfill_order_state,
601 );
602 if create_type == PbCreateType::Background && streaming_job.is_sink_into_table() {
603 Some(TrackingJob::New(TrackingCommand {
610 job_id: creating_job_id,
611 replace_stream_job: replace_table_info,
612 }))
613 } else {
614 let old = self.progress_map.insert(
615 creating_job_id,
616 (
617 progress,
618 TrackingJob::New(TrackingCommand {
619 job_id: creating_job_id,
620 replace_stream_job: replace_table_info,
621 }),
622 ),
623 );
624 assert!(old.is_none());
625 None
626 }
627 }
628
629 pub fn update(
633 &mut self,
634 progress: &CreateMviewProgress,
635 version_stats: &HummockVersionStats,
636 ) -> UpdateProgressResult {
637 tracing::trace!(?progress, "update progress");
638 let actor = progress.backfill_actor_id;
639 let Some(table_id) = self.actor_map.get(&actor).copied() else {
640 tracing::info!(
647 "no tracked progress for actor {}, the stream job could already be finished",
648 actor
649 );
650 return UpdateProgressResult::None;
651 };
652
653 let new_state = if progress.done {
654 BackfillState::Done(progress.consumed_rows)
655 } else {
656 BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
657 };
658
659 match self.progress_map.entry(table_id) {
660 Entry::Occupied(mut o) => {
661 let progress = &mut o.get_mut().0;
662
663 let upstream_total_key_count: u64 =
664 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
665
666 tracing::debug!(?table_id, "updating progress for table");
667 let next_backfill_nodes =
668 progress.update(actor, new_state, upstream_total_key_count);
669
670 if progress.is_done() {
671 tracing::debug!(
672 "all actors done for creating mview with table_id {}!",
673 table_id
674 );
675
676 for actor in o.get().0.actors() {
678 self.actor_map.remove(&actor);
679 }
680 assert!(next_backfill_nodes.is_empty());
681 UpdateProgressResult::Finished(o.remove().1)
682 } else if !next_backfill_nodes.is_empty() {
683 tracing::debug!("scheduling next backfill nodes: {:?}", next_backfill_nodes);
684 UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes)
685 } else {
686 UpdateProgressResult::None
687 }
688 }
689 Entry::Vacant(_) => {
690 tracing::warn!(
691 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
692 );
693 UpdateProgressResult::None
694 }
695 }
696 }
697}
698
699fn calculate_total_key_count(
700 table_count: &HashMap<TableId, usize>,
701 version_stats: &HummockVersionStats,
702) -> u64 {
703 table_count
704 .iter()
705 .map(|(table_id, count)| {
706 assert_ne!(*count, 0);
707 *count as u64
708 * version_stats
709 .table_stats
710 .get(&table_id.table_id)
711 .map_or(0, |stat| stat.total_key_count as u64)
712 })
713 .sum()
714}