1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::ObjectId;
22use risingwave_pb::catalog::CreateType;
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::PbBarrierCompleteResponse;
26use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
27
28use crate::MetaResult;
29use crate::barrier::backfill_order_control::BackfillOrderState;
30use crate::barrier::info::BarrierInfo;
31use crate::barrier::{
32 Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
33};
34use crate::manager::{MetadataManager, StreamingJobType};
35use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
36
37type ConsumedRows = u64;
38
39#[derive(Clone, Copy, Debug)]
40enum BackfillState {
41 Init,
42 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
43 Done(ConsumedRows),
44}
45
46#[derive(Debug)]
48pub(super) struct Progress {
49 states: HashMap<ActorId, BackfillState>,
51 backfill_order_state: BackfillOrderState,
52 done_count: usize,
53
54 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
56
57 upstream_mv_count: HashMap<TableId, usize>,
62 upstream_mvs_total_key_count: u64,
64 mv_backfill_consumed_rows: u64,
65 source_backfill_consumed_rows: u64,
66
67 definition: String,
69}
70
71impl Progress {
72 fn new(
74 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
75 upstream_mv_count: HashMap<TableId, usize>,
76 upstream_total_key_count: u64,
77 definition: String,
78 backfill_order_state: BackfillOrderState,
79 ) -> Self {
80 let mut states = HashMap::new();
81 let mut backfill_upstream_types = HashMap::new();
82 for (actor, backfill_upstream_type) in actors {
83 states.insert(actor, BackfillState::Init);
84 backfill_upstream_types.insert(actor, backfill_upstream_type);
85 }
86 assert!(!states.is_empty());
87
88 Self {
89 states,
90 backfill_upstream_types,
91 done_count: 0,
92 upstream_mv_count,
93 upstream_mvs_total_key_count: upstream_total_key_count,
94 mv_backfill_consumed_rows: 0,
95 source_backfill_consumed_rows: 0,
96 definition,
97 backfill_order_state,
98 }
99 }
100
101 fn update(
103 &mut self,
104 actor: ActorId,
105 new_state: BackfillState,
106 upstream_total_key_count: u64,
107 ) -> Vec<FragmentId> {
108 let mut next_backfill_nodes = vec![];
109 self.upstream_mvs_total_key_count = upstream_total_key_count;
110 let total_actors = self.states.len();
111 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
112 tracing::debug!(?actor, states = ?self.states, "update progress for actor");
113
114 let mut old = 0;
115 let mut new = 0;
116 match self.states.remove(&actor).unwrap() {
117 BackfillState::Init => {}
118 BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
119 old = old_consumed_rows;
120 }
121 BackfillState::Done(_) => panic!("should not report done multiple times"),
122 };
123 match &new_state {
124 BackfillState::Init => {}
125 BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
126 new = *new_consumed_rows;
127 }
128 BackfillState::Done(new_consumed_rows) => {
129 tracing::debug!("actor {} done", actor);
130 new = *new_consumed_rows;
131 self.done_count += 1;
132 next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
133 tracing::debug!(
134 "{} actors out of {} complete",
135 self.done_count,
136 total_actors,
137 );
138 }
139 };
140 debug_assert!(new >= old, "backfill progress should not go backward");
141 match backfill_upstream_type {
142 BackfillUpstreamType::MView => {
143 self.mv_backfill_consumed_rows += new - old;
144 }
145 BackfillUpstreamType::Source => {
146 self.source_backfill_consumed_rows += new - old;
147 }
148 BackfillUpstreamType::Values => {
149 }
151 }
152 self.states.insert(actor, new_state);
153 next_backfill_nodes
154 }
155
156 fn is_done(&self) -> bool {
158 tracing::trace!(
159 "Progress::is_done? {}, {}, {:?}",
160 self.done_count,
161 self.states.len(),
162 self.states
163 );
164 self.done_count == self.states.len()
165 }
166
167 fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
170 self.states.keys().cloned()
171 }
172
173 fn calculate_progress(&self) -> String {
175 if self.is_done() || self.states.is_empty() {
176 return "100%".to_owned();
177 }
178 let mut mv_count = 0;
179 let mut source_count = 0;
180 for backfill_upstream_type in self.backfill_upstream_types.values() {
181 match backfill_upstream_type {
182 BackfillUpstreamType::MView => mv_count += 1,
183 BackfillUpstreamType::Source => source_count += 1,
184 BackfillUpstreamType::Values => (),
185 }
186 }
187
188 let mv_progress = (mv_count > 0).then_some({
189 if self.upstream_mvs_total_key_count == 0 {
190 "99.99%".to_owned()
191 } else {
192 let mut progress = self.mv_backfill_consumed_rows as f64
193 / (self.upstream_mvs_total_key_count as f64);
194 if progress > 1.0 {
195 progress = 0.9999;
196 }
197 format!(
198 "{:.2}% ({}/{})",
199 progress * 100.0,
200 self.mv_backfill_consumed_rows,
201 self.upstream_mvs_total_key_count
202 )
203 }
204 });
205 let source_progress = (source_count > 0).then_some(format!(
206 "{} rows consumed",
207 self.source_backfill_consumed_rows
208 ));
209 match (mv_progress, source_progress) {
210 (Some(mv_progress), Some(source_progress)) => {
211 format!(
212 "MView Backfill: {}, Source Backfill: {}",
213 mv_progress, source_progress
214 )
215 }
216 (Some(mv_progress), None) => mv_progress,
217 (None, Some(source_progress)) => source_progress,
218 (None, None) => "Unknown".to_owned(),
219 }
220 }
221}
222
223pub enum TrackingJob {
230 New(TrackingCommand),
231 Recovered(RecoveredTrackingJob),
232}
233
234impl std::fmt::Display for TrackingJob {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 match self {
237 TrackingJob::New(command) => write!(f, "{}", command.job_id),
238 TrackingJob::Recovered(recovered) => write!(f, "{}<recovered>", recovered.id),
239 }
240 }
241}
242
243impl TrackingJob {
244 pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
246 match self {
247 TrackingJob::New(command) => {
248 metadata_manager
249 .catalog_controller
250 .finish_streaming_job(
251 command.job_id.table_id as i32,
252 command.replace_stream_job.clone(),
253 )
254 .await?;
255 Ok(())
256 }
257 TrackingJob::Recovered(recovered) => {
258 metadata_manager
259 .catalog_controller
260 .finish_streaming_job(recovered.id, None)
261 .await?;
262 Ok(())
263 }
264 }
265 }
266
267 pub(crate) fn table_to_create(&self) -> TableId {
268 match self {
269 TrackingJob::New(command) => command.job_id,
270 TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
271 }
272 }
273}
274
275impl std::fmt::Debug for TrackingJob {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 match self {
278 TrackingJob::New(command) => write!(f, "TrackingJob::New({:?})", command.job_id),
279 TrackingJob::Recovered(recovered) => {
280 write!(f, "TrackingJob::RecoveredV2({:?})", recovered.id)
281 }
282 }
283 }
284}
285
286pub struct RecoveredTrackingJob {
287 pub id: ObjectId,
288}
289
290pub(super) struct TrackingCommand {
292 pub job_id: TableId,
293 pub replace_stream_job: Option<ReplaceStreamJobPlan>,
294}
295
296pub(super) enum UpdateProgressResult {
297 None,
298 Finished(TrackingJob),
299 BackfillNodeFinished(Vec<FragmentId>),
300}
301
302#[derive(Default, Debug)]
308pub(super) struct CreateMviewProgressTracker {
309 progress_map: HashMap<TableId, (Progress, TrackingJob)>,
311
312 actor_map: HashMap<ActorId, TableId>,
313
314 pending_finished_jobs: Vec<TrackingJob>,
316
317 pending_backfill_nodes: Vec<FragmentId>,
319}
320
321impl CreateMviewProgressTracker {
322 pub fn recover(
330 mviews: impl IntoIterator<Item = (TableId, (String, &StreamJobFragments, BackfillOrderState))>,
331 version_stats: &HummockVersionStats,
332 ) -> Self {
333 let mut actor_map = HashMap::new();
334 let mut progress_map = HashMap::new();
335 for (creating_table_id, (definition, table_fragments, backfill_order_state)) in mviews {
336 let mut states = HashMap::new();
337 let mut backfill_upstream_types = HashMap::new();
338 let actors = table_fragments.tracking_progress_actor_ids();
339 for (actor, backfill_upstream_type) in actors {
340 actor_map.insert(actor, creating_table_id);
341 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
342 backfill_upstream_types.insert(actor, backfill_upstream_type);
343 }
344
345 let progress = Self::recover_progress(
346 states,
347 backfill_upstream_types,
348 table_fragments.upstream_table_counts(),
349 definition,
350 version_stats,
351 backfill_order_state,
352 );
353 let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob {
354 id: creating_table_id.table_id as i32,
355 });
356 progress_map.insert(creating_table_id, (progress, tracking_job));
357 }
358 Self {
359 progress_map,
360 actor_map,
361 pending_finished_jobs: Vec::new(),
362 pending_backfill_nodes: Vec::new(),
363 }
364 }
365
366 fn recover_progress(
372 states: HashMap<ActorId, BackfillState>,
373 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
374 upstream_mv_count: HashMap<TableId, usize>,
375 definition: String,
376 version_stats: &HummockVersionStats,
377 backfill_order_state: BackfillOrderState,
378 ) -> Progress {
379 let upstream_mvs_total_key_count =
380 calculate_total_key_count(&upstream_mv_count, version_stats);
381 Progress {
382 states,
383 backfill_order_state,
384 backfill_upstream_types,
385 done_count: 0, upstream_mv_count,
387 upstream_mvs_total_key_count,
388 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, definition,
391 }
392 }
393
394 pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
395 self.progress_map
396 .iter()
397 .map(|(table_id, (x, _))| {
398 let table_id = table_id.table_id;
399 let ddl_progress = DdlProgress {
400 id: table_id as u64,
401 statement: x.definition.clone(),
402 progress: x.calculate_progress(),
403 };
404 (table_id, ddl_progress)
405 })
406 .collect()
407 }
408
409 pub(super) fn update_tracking_jobs<'a>(
410 &mut self,
411 info: Option<(
412 &CreateStreamingJobCommandInfo,
413 Option<&ReplaceStreamJobPlan>,
414 )>,
415 create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
416 version_stats: &HummockVersionStats,
417 ) {
418 {
419 {
420 let finished_commands = {
422 let mut commands = vec![];
423 if let Some((create_job_info, replace_stream_job)) = info
425 && let Some(command) =
426 self.add(create_job_info, replace_stream_job, version_stats)
427 {
428 commands.push(command);
430 }
431 for progress in create_mview_progress {
433 match self.update(progress, version_stats) {
435 UpdateProgressResult::None => {
436 tracing::trace!(?progress, "update progress");
437 }
438 UpdateProgressResult::Finished(command) => {
439 tracing::trace!(?progress, "finish progress");
440 commands.push(command);
441 }
442 UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) => {
443 tracing::trace!(
444 ?progress,
445 ?next_backfill_nodes,
446 "start next backfill node"
447 );
448 self.queue_backfill(next_backfill_nodes);
449 }
450 }
451 }
452 commands
453 };
454
455 for command in finished_commands {
456 self.stash_command_to_finish(command);
457 }
458 }
459 }
460 }
461
462 pub(super) fn apply_collected_command(
465 &mut self,
466 command: Option<&Command>,
467 barrier_info: &BarrierInfo,
468 resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
469 version_stats: &HummockVersionStats,
470 ) -> Vec<TrackingJob> {
471 let new_tracking_job_info =
472 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
473 match job_type {
474 CreateStreamingJobType::Normal => Some((info, None)),
475 CreateStreamingJobType::SinkIntoTable(replace_stream_job) => {
476 Some((info, Some(replace_stream_job)))
477 }
478 CreateStreamingJobType::SnapshotBackfill(_) => {
479 None
481 }
482 }
483 } else {
484 None
485 };
486 self.update_tracking_jobs(
487 new_tracking_job_info,
488 resps
489 .into_iter()
490 .flat_map(|resp| resp.create_mview_progress.iter()),
491 version_stats,
492 );
493 for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
494 self.cancel_command(table_id);
497 }
498 if barrier_info.kind.is_checkpoint() {
499 self.take_finished_jobs()
500 } else {
501 vec![]
502 }
503 }
504
505 pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
507 self.pending_finished_jobs.push(finished_job);
508 }
509
510 fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
511 self.pending_backfill_nodes.extend(backfill_nodes);
512 }
513
514 pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
516 tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
517 take(&mut self.pending_finished_jobs)
518 }
519
520 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
521 take(&mut self.pending_backfill_nodes)
522 }
523
524 pub(super) fn has_pending_finished_jobs(&self) -> bool {
525 !self.pending_finished_jobs.is_empty()
526 }
527
528 pub(super) fn cancel_command(&mut self, id: TableId) {
529 let _ = self.progress_map.remove(&id);
530 self.pending_finished_jobs
531 .retain(|x| x.table_to_create() != id);
532 self.actor_map.retain(|_, table_id| *table_id != id);
533 }
534
535 pub fn abort_all(&mut self) {
537 self.actor_map.clear();
538 self.pending_finished_jobs.clear();
539 self.progress_map.clear();
540 }
541
542 pub fn add(
546 &mut self,
547 info: &CreateStreamingJobCommandInfo,
548 replace_stream_job: Option<&ReplaceStreamJobPlan>,
549 version_stats: &HummockVersionStats,
550 ) -> Option<TrackingJob> {
551 tracing::trace!(?info, "add job to track");
552 let (info, actors, replace_table_info) = {
553 let CreateStreamingJobCommandInfo {
554 stream_job_fragments,
555 ..
556 } = info;
557 let actors = stream_job_fragments.tracking_progress_actor_ids();
558 if actors.is_empty() {
559 return Some(TrackingJob::New(TrackingCommand {
561 job_id: info.stream_job_fragments.stream_job_id,
562 replace_stream_job: replace_stream_job.cloned(),
563 }));
564 }
565 (info.clone(), actors, replace_stream_job.cloned())
566 };
567
568 let CreateStreamingJobCommandInfo {
569 stream_job_fragments: table_fragments,
570 definition,
571 job_type,
572 create_type,
573 fragment_backfill_ordering,
574 ..
575 } = info;
576
577 let creating_mv_id = table_fragments.stream_job_id();
578 let upstream_mv_count = table_fragments.upstream_table_counts();
579 let upstream_total_key_count: u64 =
580 calculate_total_key_count(&upstream_mv_count, version_stats);
581
582 for (actor, _backfill_upstream_type) in &actors {
583 self.actor_map.insert(*actor, creating_mv_id);
584 }
585
586 let backfill_order_state =
587 BackfillOrderState::new(fragment_backfill_ordering, &table_fragments);
588 let progress = Progress::new(
589 actors,
590 upstream_mv_count,
591 upstream_total_key_count,
592 definition.clone(),
593 backfill_order_state,
594 );
595 if job_type == StreamingJobType::Sink && create_type == CreateType::Background {
596 Some(TrackingJob::New(TrackingCommand {
602 job_id: creating_mv_id,
603 replace_stream_job: replace_table_info,
604 }))
605 } else {
606 let old = self.progress_map.insert(
607 creating_mv_id,
608 (
609 progress,
610 TrackingJob::New(TrackingCommand {
611 job_id: creating_mv_id,
612 replace_stream_job: replace_table_info,
613 }),
614 ),
615 );
616 assert!(old.is_none());
617 None
618 }
619 }
620
621 pub fn update(
625 &mut self,
626 progress: &CreateMviewProgress,
627 version_stats: &HummockVersionStats,
628 ) -> UpdateProgressResult {
629 tracing::trace!(?progress, "update progress");
630 let actor = progress.backfill_actor_id;
631 let Some(table_id) = self.actor_map.get(&actor).copied() else {
632 tracing::info!(
639 "no tracked progress for actor {}, the stream job could already be finished",
640 actor
641 );
642 return UpdateProgressResult::None;
643 };
644
645 let new_state = if progress.done {
646 BackfillState::Done(progress.consumed_rows)
647 } else {
648 BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
649 };
650
651 match self.progress_map.entry(table_id) {
652 Entry::Occupied(mut o) => {
653 let progress = &mut o.get_mut().0;
654
655 let upstream_total_key_count: u64 =
656 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
657
658 tracing::debug!(?table_id, "updating progress for table");
659 let next_backfill_nodes =
660 progress.update(actor, new_state, upstream_total_key_count);
661
662 if progress.is_done() {
663 tracing::debug!(
664 "all actors done for creating mview with table_id {}!",
665 table_id
666 );
667
668 for actor in o.get().0.actors() {
670 self.actor_map.remove(&actor);
671 }
672 assert!(next_backfill_nodes.is_empty());
673 UpdateProgressResult::Finished(o.remove().1)
674 } else if !next_backfill_nodes.is_empty() {
675 tracing::debug!("scheduling next backfill nodes: {:?}", next_backfill_nodes);
676 UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes)
677 } else {
678 UpdateProgressResult::None
679 }
680 }
681 Entry::Vacant(_) => {
682 tracing::warn!(
683 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
684 );
685 UpdateProgressResult::None
686 }
687 }
688 }
689}
690
691fn calculate_total_key_count(
692 table_count: &HashMap<TableId, usize>,
693 version_stats: &HummockVersionStats,
694) -> u64 {
695 table_count
696 .iter()
697 .map(|(table_id, count)| {
698 assert_ne!(*count, 0);
699 *count as u64
700 * version_stats
701 .table_stats
702 .get(&table_id.table_id)
703 .map_or(0, |stat| stat.total_key_count as u64)
704 })
705 .sum()
706}