1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::{CreateType, ObjectId};
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::PbBarrierCompleteResponse;
25use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
26
27use crate::MetaResult;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::BarrierInfo;
30use crate::barrier::{Command, CreateStreamingJobCommandInfo, CreateStreamingJobType};
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33
34type ConsumedRows = u64;
35
36#[derive(Clone, Copy, Debug)]
37enum BackfillState {
38 Init,
39 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
40 Done(ConsumedRows),
41}
42
43#[derive(Debug)]
45pub(super) struct Progress {
46 states: HashMap<ActorId, BackfillState>,
48 backfill_order_state: BackfillOrderState,
49 done_count: usize,
50
51 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
53
54 upstream_mv_count: HashMap<TableId, usize>,
59 upstream_mvs_total_key_count: u64,
61 mv_backfill_consumed_rows: u64,
62 source_backfill_consumed_rows: u64,
63
64 definition: String,
66 create_type: CreateType,
68}
69
70impl Progress {
71 fn new(
73 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
74 upstream_mv_count: HashMap<TableId, usize>,
75 upstream_total_key_count: u64,
76 definition: String,
77 create_type: CreateType,
78 backfill_order_state: BackfillOrderState,
79 ) -> Self {
80 let mut states = HashMap::new();
81 let mut backfill_upstream_types = HashMap::new();
82 for (actor, backfill_upstream_type) in actors {
83 states.insert(actor, BackfillState::Init);
84 backfill_upstream_types.insert(actor, backfill_upstream_type);
85 }
86 assert!(!states.is_empty());
87
88 Self {
89 states,
90 backfill_upstream_types,
91 done_count: 0,
92 upstream_mv_count,
93 upstream_mvs_total_key_count: upstream_total_key_count,
94 mv_backfill_consumed_rows: 0,
95 source_backfill_consumed_rows: 0,
96 definition,
97 create_type,
98 backfill_order_state,
99 }
100 }
101
102 fn update(
104 &mut self,
105 actor: ActorId,
106 new_state: BackfillState,
107 upstream_total_key_count: u64,
108 ) -> Vec<FragmentId> {
109 let mut next_backfill_nodes = vec![];
110 self.upstream_mvs_total_key_count = upstream_total_key_count;
111 let total_actors = self.states.len();
112 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
113 tracing::debug!(?actor, states = ?self.states, "update progress for actor");
114
115 let mut old = 0;
116 let mut new = 0;
117 match self.states.remove(&actor).unwrap() {
118 BackfillState::Init => {}
119 BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
120 old = old_consumed_rows;
121 }
122 BackfillState::Done(_) => panic!("should not report done multiple times"),
123 };
124 match &new_state {
125 BackfillState::Init => {}
126 BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
127 new = *new_consumed_rows;
128 }
129 BackfillState::Done(new_consumed_rows) => {
130 tracing::debug!("actor {} done", actor);
131 new = *new_consumed_rows;
132 self.done_count += 1;
133 next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
134 tracing::debug!(
135 "{} actors out of {} complete",
136 self.done_count,
137 total_actors,
138 );
139 }
140 };
141 debug_assert!(new >= old, "backfill progress should not go backward");
142 match backfill_upstream_type {
143 BackfillUpstreamType::MView => {
144 self.mv_backfill_consumed_rows += new - old;
145 }
146 BackfillUpstreamType::Source => {
147 self.source_backfill_consumed_rows += new - old;
148 }
149 BackfillUpstreamType::Values => {
150 }
152 }
153 self.states.insert(actor, new_state);
154 next_backfill_nodes
155 }
156
157 fn is_done(&self) -> bool {
159 tracing::trace!(
160 "Progress::is_done? {}, {}, {:?}",
161 self.done_count,
162 self.states.len(),
163 self.states
164 );
165 self.done_count == self.states.len()
166 }
167
168 fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
171 self.states.keys().cloned()
172 }
173
174 fn calculate_progress(&self) -> String {
176 if self.is_done() || self.states.is_empty() {
177 return "100%".to_owned();
178 }
179 let mut mv_count = 0;
180 let mut source_count = 0;
181 for backfill_upstream_type in self.backfill_upstream_types.values() {
182 match backfill_upstream_type {
183 BackfillUpstreamType::MView => mv_count += 1,
184 BackfillUpstreamType::Source => source_count += 1,
185 BackfillUpstreamType::Values => (),
186 }
187 }
188
189 let mv_progress = (mv_count > 0).then_some({
190 if self.upstream_mvs_total_key_count == 0 {
191 "99.99%".to_owned()
192 } else {
193 let mut progress = self.mv_backfill_consumed_rows as f64
194 / (self.upstream_mvs_total_key_count as f64);
195 if progress > 1.0 {
196 progress = 0.9999;
197 }
198 format!(
199 "{:.2}% ({}/{})",
200 progress * 100.0,
201 self.mv_backfill_consumed_rows,
202 self.upstream_mvs_total_key_count
203 )
204 }
205 });
206 let source_progress = (source_count > 0).then_some(format!(
207 "{} rows consumed",
208 self.source_backfill_consumed_rows
209 ));
210 match (mv_progress, source_progress) {
211 (Some(mv_progress), Some(source_progress)) => {
212 format!(
213 "MView Backfill: {}, Source Backfill: {}",
214 mv_progress, source_progress
215 )
216 }
217 (Some(mv_progress), None) => mv_progress,
218 (None, Some(source_progress)) => source_progress,
219 (None, None) => "Unknown".to_owned(),
220 }
221 }
222}
223
224pub enum TrackingJob {
231 New(TrackingCommand),
232 Recovered(RecoveredTrackingJob),
233}
234
235impl std::fmt::Display for TrackingJob {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 match self {
238 TrackingJob::New(command) => write!(f, "{}", command.job_id),
239 TrackingJob::Recovered(recovered) => write!(f, "{}<recovered>", recovered.id),
240 }
241 }
242}
243
244impl TrackingJob {
245 pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
247 match self {
248 TrackingJob::New(command) => {
249 metadata_manager
250 .catalog_controller
251 .finish_streaming_job(command.job_id.table_id as i32)
252 .await?;
253 Ok(())
254 }
255 TrackingJob::Recovered(recovered) => {
256 metadata_manager
257 .catalog_controller
258 .finish_streaming_job(recovered.id)
259 .await?;
260 Ok(())
261 }
262 }
263 }
264
265 pub(crate) fn table_to_create(&self) -> TableId {
266 match self {
267 TrackingJob::New(command) => command.job_id,
268 TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
269 }
270 }
271}
272
273impl std::fmt::Debug for TrackingJob {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 match self {
276 TrackingJob::New(command) => write!(f, "TrackingJob::New({:?})", command.job_id),
277 TrackingJob::Recovered(recovered) => {
278 write!(f, "TrackingJob::Recovered({:?})", recovered.id)
279 }
280 }
281 }
282}
283
284pub struct RecoveredTrackingJob {
285 pub id: ObjectId,
286}
287
288pub(super) struct TrackingCommand {
290 pub job_id: TableId,
291}
292
293pub(super) enum UpdateProgressResult {
294 None,
295 Finished(TrackingJob),
296 BackfillNodeFinished(Vec<FragmentId>),
297}
298
299#[derive(Default, Debug)]
305pub(super) struct CreateMviewProgressTracker {
306 progress_map: HashMap<TableId, (Progress, TrackingJob)>,
308
309 actor_map: HashMap<ActorId, TableId>,
310
311 pending_finished_jobs: Vec<TrackingJob>,
313
314 pending_backfill_nodes: Vec<FragmentId>,
316}
317
318impl CreateMviewProgressTracker {
319 pub fn recover(
327 jobs: impl IntoIterator<Item = (TableId, (String, &StreamJobFragments, BackfillOrderState))>,
328 version_stats: &HummockVersionStats,
329 ) -> Self {
330 let mut actor_map = HashMap::new();
331 let mut progress_map = HashMap::new();
332 for (creating_table_id, (definition, table_fragments, backfill_order_state)) in jobs {
333 let mut states = HashMap::new();
334 let mut backfill_upstream_types = HashMap::new();
335 let actors = table_fragments.tracking_progress_actor_ids();
336 for (actor, backfill_upstream_type) in actors {
337 actor_map.insert(actor, creating_table_id);
338 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
339 backfill_upstream_types.insert(actor, backfill_upstream_type);
340 }
341
342 let progress = Self::recover_progress(
343 states,
344 backfill_upstream_types,
345 table_fragments.upstream_table_counts(),
346 definition,
347 version_stats,
348 backfill_order_state,
349 );
350 let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob {
351 id: creating_table_id.table_id as i32,
352 });
353 progress_map.insert(creating_table_id, (progress, tracking_job));
354 }
355 Self {
356 progress_map,
357 actor_map,
358 pending_finished_jobs: Vec::new(),
359 pending_backfill_nodes: Vec::new(),
360 }
361 }
362
363 fn recover_progress(
369 states: HashMap<ActorId, BackfillState>,
370 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
371 upstream_mv_count: HashMap<TableId, usize>,
372 definition: String,
373 version_stats: &HummockVersionStats,
374 backfill_order_state: BackfillOrderState,
375 ) -> Progress {
376 let upstream_mvs_total_key_count =
377 calculate_total_key_count(&upstream_mv_count, version_stats);
378 Progress {
379 states,
380 backfill_order_state,
381 backfill_upstream_types,
382 done_count: 0, upstream_mv_count,
384 upstream_mvs_total_key_count,
385 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, definition,
388 create_type: CreateType::Background,
389 }
390 }
391
392 pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
393 self.progress_map
394 .iter()
395 .map(|(table_id, (x, _))| {
396 let table_id = table_id.table_id;
397 let ddl_progress = DdlProgress {
398 id: table_id as u64,
399 statement: x.definition.clone(),
400 create_type: x.create_type.as_str().to_owned(),
401 progress: x.calculate_progress(),
402 };
403 (table_id, ddl_progress)
404 })
405 .collect()
406 }
407
408 pub(super) fn update_tracking_jobs<'a>(
409 &mut self,
410 info: Option<&CreateStreamingJobCommandInfo>,
411 create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
412 version_stats: &HummockVersionStats,
413 ) {
414 {
415 {
416 let finished_commands = {
418 let mut commands = vec![];
419 if let Some(create_job_info) = info
421 && let Some(command) = self.add(create_job_info, version_stats)
422 {
423 commands.push(command);
425 }
426 for progress in create_mview_progress {
428 match self.update(progress, version_stats) {
430 UpdateProgressResult::None => {
431 tracing::trace!(?progress, "update progress");
432 }
433 UpdateProgressResult::Finished(command) => {
434 tracing::trace!(?progress, "finish progress");
435 commands.push(command);
436 }
437 UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) => {
438 tracing::trace!(
439 ?progress,
440 ?next_backfill_nodes,
441 "start next backfill node"
442 );
443 self.queue_backfill(next_backfill_nodes);
444 }
445 }
446 }
447 commands
448 };
449
450 for command in finished_commands {
451 self.stash_command_to_finish(command);
452 }
453 }
454 }
455 }
456
457 pub(super) fn apply_collected_command(
460 &mut self,
461 command: Option<&Command>,
462 barrier_info: &BarrierInfo,
463 resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
464 version_stats: &HummockVersionStats,
465 ) -> Vec<TrackingJob> {
466 let new_tracking_job_info =
467 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
468 match job_type {
469 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
470 Some(info)
471 }
472 CreateStreamingJobType::SnapshotBackfill(_) => {
473 None
475 }
476 }
477 } else {
478 None
479 };
480 self.update_tracking_jobs(
481 new_tracking_job_info,
482 resps
483 .into_iter()
484 .flat_map(|resp| resp.create_mview_progress.iter()),
485 version_stats,
486 );
487 for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
488 self.cancel_command(table_id);
491 }
492 if barrier_info.kind.is_checkpoint() {
493 self.take_finished_jobs()
494 } else {
495 vec![]
496 }
497 }
498
499 pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
501 self.pending_finished_jobs.push(finished_job);
502 }
503
504 fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
505 self.pending_backfill_nodes.extend(backfill_nodes);
506 }
507
508 pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
510 tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
511 take(&mut self.pending_finished_jobs)
512 }
513
514 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
515 take(&mut self.pending_backfill_nodes)
516 }
517
518 pub(super) fn has_pending_finished_jobs(&self) -> bool {
519 !self.pending_finished_jobs.is_empty()
520 }
521
522 pub(super) fn cancel_command(&mut self, id: TableId) {
523 let _ = self.progress_map.remove(&id);
524 self.pending_finished_jobs
525 .retain(|x| x.table_to_create() != id);
526 self.actor_map.retain(|_, table_id| *table_id != id);
527 }
528
529 pub fn abort_all(&mut self) {
531 self.actor_map.clear();
532 self.pending_finished_jobs.clear();
533 self.progress_map.clear();
534 }
535
536 pub fn add(
540 &mut self,
541 info: &CreateStreamingJobCommandInfo,
542 version_stats: &HummockVersionStats,
543 ) -> Option<TrackingJob> {
544 tracing::trace!(?info, "add job to track");
545 let (info, actors) = {
546 let CreateStreamingJobCommandInfo {
547 stream_job_fragments,
548 ..
549 } = info;
550 let actors = stream_job_fragments.tracking_progress_actor_ids();
551 if actors.is_empty() {
552 return Some(TrackingJob::New(TrackingCommand {
554 job_id: info.stream_job_fragments.stream_job_id,
555 }));
556 }
557 (info.clone(), actors)
558 };
559
560 let CreateStreamingJobCommandInfo {
561 stream_job_fragments: table_fragments,
562 definition,
563 create_type,
564 fragment_backfill_ordering,
565 ..
566 } = info;
567
568 let creating_job_id = table_fragments.stream_job_id();
569 let upstream_mv_count = table_fragments.upstream_table_counts();
570 let upstream_total_key_count: u64 =
571 calculate_total_key_count(&upstream_mv_count, version_stats);
572
573 for (actor, _backfill_upstream_type) in &actors {
574 self.actor_map.insert(*actor, creating_job_id);
575 }
576
577 let backfill_order_state =
578 BackfillOrderState::new(fragment_backfill_ordering, &table_fragments);
579 let progress = Progress::new(
580 actors,
581 upstream_mv_count,
582 upstream_total_key_count,
583 definition.clone(),
584 create_type.into(),
585 backfill_order_state,
586 );
587 let old = self.progress_map.insert(
588 creating_job_id,
589 (
590 progress,
591 TrackingJob::New(TrackingCommand {
592 job_id: creating_job_id,
593 }),
594 ),
595 );
596 assert!(old.is_none());
597 None
598 }
599
600 pub fn update(
604 &mut self,
605 progress: &CreateMviewProgress,
606 version_stats: &HummockVersionStats,
607 ) -> UpdateProgressResult {
608 tracing::trace!(?progress, "update progress");
609 let actor = progress.backfill_actor_id;
610 let Some(table_id) = self.actor_map.get(&actor).copied() else {
611 tracing::info!(
618 "no tracked progress for actor {}, the stream job could already be finished",
619 actor
620 );
621 return UpdateProgressResult::None;
622 };
623
624 let new_state = if progress.done {
625 BackfillState::Done(progress.consumed_rows)
626 } else {
627 BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
628 };
629
630 match self.progress_map.entry(table_id) {
631 Entry::Occupied(mut o) => {
632 let progress = &mut o.get_mut().0;
633
634 let upstream_total_key_count: u64 =
635 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
636
637 tracing::debug!(?table_id, "updating progress for table");
638 let next_backfill_nodes =
639 progress.update(actor, new_state, upstream_total_key_count);
640
641 if progress.is_done() {
642 tracing::debug!(
643 "all actors done for creating mview with table_id {}!",
644 table_id
645 );
646
647 for actor in o.get().0.actors() {
649 self.actor_map.remove(&actor);
650 }
651 assert!(next_backfill_nodes.is_empty());
652 UpdateProgressResult::Finished(o.remove().1)
653 } else if !next_backfill_nodes.is_empty() {
654 tracing::debug!("scheduling next backfill nodes: {:?}", next_backfill_nodes);
655 UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes)
656 } else {
657 UpdateProgressResult::None
658 }
659 }
660 Entry::Vacant(_) => {
661 tracing::warn!(
662 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
663 );
664 UpdateProgressResult::None
665 }
666 }
667 }
668}
669
670fn calculate_total_key_count(
671 table_count: &HashMap<TableId, usize>,
672 version_stats: &HummockVersionStats,
673) -> u64 {
674 table_count
675 .iter()
676 .map(|(table_id, count)| {
677 assert_ne!(*count, 0);
678 *count as u64
679 * version_stats
680 .table_stats
681 .get(&table_id.table_id)
682 .map_or(0, |stat| stat.total_key_count as u64)
683 })
684 .sum()
685}