1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::ObjectId;
22use risingwave_pb::catalog::CreateType;
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::PbBarrierCompleteResponse;
26use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
27
28use crate::MetaResult;
29use crate::barrier::info::BarrierInfo;
30use crate::barrier::{
31 Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
32};
33use crate::manager::{MetadataManager, StreamingJobType};
34use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments};
35
36type ConsumedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40 Init,
41 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
42 Done(ConsumedRows),
43}
44
45#[derive(Debug)]
47pub(super) struct Progress {
48 states: HashMap<ActorId, BackfillState>,
50 done_count: usize,
51
52 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
54
55 upstream_mv_count: HashMap<TableId, usize>,
60 upstream_mvs_total_key_count: u64,
62 mv_backfill_consumed_rows: u64,
63 source_backfill_consumed_rows: u64,
64
65 definition: String,
67}
68
69impl Progress {
70 fn new(
72 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
73 upstream_mv_count: HashMap<TableId, usize>,
74 upstream_total_key_count: u64,
75 definition: String,
76 ) -> Self {
77 let mut states = HashMap::new();
78 let mut backfill_upstream_types = HashMap::new();
79 for (actor, backfill_upstream_type) in actors {
80 states.insert(actor, BackfillState::Init);
81 backfill_upstream_types.insert(actor, backfill_upstream_type);
82 }
83 assert!(!states.is_empty());
84
85 Self {
86 states,
87 backfill_upstream_types,
88 done_count: 0,
89 upstream_mv_count,
90 upstream_mvs_total_key_count: upstream_total_key_count,
91 mv_backfill_consumed_rows: 0,
92 source_backfill_consumed_rows: 0,
93 definition,
94 }
95 }
96
97 fn update(&mut self, actor: ActorId, new_state: BackfillState, upstream_total_key_count: u64) {
99 self.upstream_mvs_total_key_count = upstream_total_key_count;
100 let total_actors = self.states.len();
101 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
102 tracing::debug!(?actor, states = ?self.states, "update progress for actor");
103
104 let mut old = 0;
105 let mut new = 0;
106 match self.states.remove(&actor).unwrap() {
107 BackfillState::Init => {}
108 BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
109 old = old_consumed_rows;
110 }
111 BackfillState::Done(_) => panic!("should not report done multiple times"),
112 };
113 match &new_state {
114 BackfillState::Init => {}
115 BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
116 new = *new_consumed_rows;
117 }
118 BackfillState::Done(new_consumed_rows) => {
119 tracing::debug!("actor {} done", actor);
120 new = *new_consumed_rows;
121 self.done_count += 1;
122 tracing::debug!(
123 "{} actors out of {} complete",
124 self.done_count,
125 total_actors,
126 );
127 }
128 };
129 debug_assert!(new >= old, "backfill progress should not go backward");
130 match backfill_upstream_type {
131 BackfillUpstreamType::MView => {
132 self.mv_backfill_consumed_rows += new - old;
133 }
134 BackfillUpstreamType::Source => {
135 self.source_backfill_consumed_rows += new - old;
136 }
137 BackfillUpstreamType::Values => {
138 }
140 }
141 self.states.insert(actor, new_state);
142 }
143
144 fn is_done(&self) -> bool {
146 tracing::trace!(
147 "Progress::is_done? {}, {}, {:?}",
148 self.done_count,
149 self.states.len(),
150 self.states
151 );
152 self.done_count == self.states.len()
153 }
154
155 fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
158 self.states.keys().cloned()
159 }
160
161 fn calculate_progress(&self) -> String {
163 if self.is_done() || self.states.is_empty() {
164 return "100%".to_owned();
165 }
166 let mut mv_count = 0;
167 let mut source_count = 0;
168 for backfill_upstream_type in self.backfill_upstream_types.values() {
169 match backfill_upstream_type {
170 BackfillUpstreamType::MView => mv_count += 1,
171 BackfillUpstreamType::Source => source_count += 1,
172 BackfillUpstreamType::Values => (),
173 }
174 }
175
176 let mv_progress = (mv_count > 0).then_some({
177 if self.upstream_mvs_total_key_count == 0 {
178 "99.99%".to_owned()
179 } else {
180 let mut progress = self.mv_backfill_consumed_rows as f64
181 / (self.upstream_mvs_total_key_count as f64);
182 if progress > 1.0 {
183 progress = 0.9999;
184 }
185 format!(
186 "{:.2}% ({}/{})",
187 progress * 100.0,
188 self.mv_backfill_consumed_rows,
189 self.upstream_mvs_total_key_count
190 )
191 }
192 });
193 let source_progress = (source_count > 0).then_some(format!(
194 "{} rows consumed",
195 self.source_backfill_consumed_rows
196 ));
197 match (mv_progress, source_progress) {
198 (Some(mv_progress), Some(source_progress)) => {
199 format!(
200 "MView Backfill: {}, Source Backfill: {}",
201 mv_progress, source_progress
202 )
203 }
204 (Some(mv_progress), None) => mv_progress,
205 (None, Some(source_progress)) => source_progress,
206 (None, None) => "Unknown".to_owned(),
207 }
208 }
209}
210
211pub enum TrackingJob {
218 New(TrackingCommand),
219 Recovered(RecoveredTrackingJob),
220}
221
222impl TrackingJob {
223 pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
225 match self {
226 TrackingJob::New(command) => {
227 metadata_manager
228 .catalog_controller
229 .finish_streaming_job(
230 command.job_id.table_id as i32,
231 command.replace_stream_job.clone(),
232 )
233 .await?;
234 Ok(())
235 }
236 TrackingJob::Recovered(recovered) => {
237 metadata_manager
238 .catalog_controller
239 .finish_streaming_job(recovered.id, None)
240 .await?;
241 Ok(())
242 }
243 }
244 }
245
246 pub(crate) fn table_to_create(&self) -> TableId {
247 match self {
248 TrackingJob::New(command) => command.job_id,
249 TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
250 }
251 }
252}
253
254impl std::fmt::Debug for TrackingJob {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 match self {
257 TrackingJob::New(command) => write!(f, "TrackingJob::New({:?})", command.job_id),
258 TrackingJob::Recovered(recovered) => {
259 write!(f, "TrackingJob::RecoveredV2({:?})", recovered.id)
260 }
261 }
262 }
263}
264
265pub struct RecoveredTrackingJob {
266 pub id: ObjectId,
267}
268
269pub(super) struct TrackingCommand {
271 pub job_id: TableId,
272 pub replace_stream_job: Option<ReplaceStreamJobPlan>,
273}
274
275#[derive(Default, Debug)]
281pub(super) struct CreateMviewProgressTracker {
282 progress_map: HashMap<TableId, (Progress, TrackingJob)>,
284
285 actor_map: HashMap<ActorId, TableId>,
286
287 pending_finished_jobs: Vec<TrackingJob>,
289}
290
291impl CreateMviewProgressTracker {
292 pub fn recover(
300 mviews: impl IntoIterator<Item = (TableId, (String, &StreamJobFragments))>,
301 version_stats: &HummockVersionStats,
302 ) -> Self {
303 let mut actor_map = HashMap::new();
304 let mut progress_map = HashMap::new();
305 for (creating_table_id, (definition, table_fragments)) in mviews {
306 let mut states = HashMap::new();
307 let mut backfill_upstream_types = HashMap::new();
308 let actors = table_fragments.tracking_progress_actor_ids();
309 for (actor, backfill_upstream_type) in actors {
310 actor_map.insert(actor, creating_table_id);
311 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
312 backfill_upstream_types.insert(actor, backfill_upstream_type);
313 }
314
315 let progress = Self::recover_progress(
316 states,
317 backfill_upstream_types,
318 table_fragments.upstream_table_counts(),
319 definition,
320 version_stats,
321 );
322 let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob {
323 id: creating_table_id.table_id as i32,
324 });
325 progress_map.insert(creating_table_id, (progress, tracking_job));
326 }
327 Self {
328 progress_map,
329 actor_map,
330 pending_finished_jobs: Vec::new(),
331 }
332 }
333
334 fn recover_progress(
340 states: HashMap<ActorId, BackfillState>,
341 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
342 upstream_mv_count: HashMap<TableId, usize>,
343 definition: String,
344 version_stats: &HummockVersionStats,
345 ) -> Progress {
346 let upstream_mvs_total_key_count =
347 calculate_total_key_count(&upstream_mv_count, version_stats);
348 Progress {
349 states,
350 backfill_upstream_types,
351 done_count: 0, upstream_mv_count,
353 upstream_mvs_total_key_count,
354 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, definition,
357 }
358 }
359
360 pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
361 self.progress_map
362 .iter()
363 .map(|(table_id, (x, _))| {
364 let table_id = table_id.table_id;
365 let ddl_progress = DdlProgress {
366 id: table_id as u64,
367 statement: x.definition.clone(),
368 progress: x.calculate_progress(),
369 };
370 (table_id, ddl_progress)
371 })
372 .collect()
373 }
374
375 pub(super) fn update_tracking_jobs<'a>(
376 &mut self,
377 info: Option<(
378 &CreateStreamingJobCommandInfo,
379 Option<&ReplaceStreamJobPlan>,
380 )>,
381 create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
382 version_stats: &HummockVersionStats,
383 ) {
384 {
385 {
386 let finished_commands = {
388 let mut commands = vec![];
389 if let Some((create_job_info, replace_stream_job)) = info
391 && let Some(command) =
392 self.add(create_job_info, replace_stream_job, version_stats)
393 {
394 commands.push(command);
396 }
397 for progress in create_mview_progress {
399 match self.update(progress, version_stats) {
401 Some(command) => {
402 tracing::trace!(?progress, "finish progress");
403 commands.push(command);
404 }
405 _ => {
406 tracing::trace!(?progress, "update progress");
407 }
408 }
409 }
410 commands
411 };
412
413 for command in finished_commands {
414 self.stash_command_to_finish(command);
415 }
416 }
417 }
418 }
419
420 pub(super) fn apply_collected_command(
423 &mut self,
424 command: Option<&Command>,
425 barrier_info: &BarrierInfo,
426 resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
427 version_stats: &HummockVersionStats,
428 ) -> Vec<TrackingJob> {
429 let new_tracking_job_info =
430 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
431 match job_type {
432 CreateStreamingJobType::Normal => Some((info, None)),
433 CreateStreamingJobType::SinkIntoTable(replace_stream_job) => {
434 Some((info, Some(replace_stream_job)))
435 }
436 CreateStreamingJobType::SnapshotBackfill(_) => {
437 None
439 }
440 }
441 } else {
442 None
443 };
444 self.update_tracking_jobs(
445 new_tracking_job_info,
446 resps
447 .into_iter()
448 .flat_map(|resp| resp.create_mview_progress.iter()),
449 version_stats,
450 );
451 for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
452 self.cancel_command(table_id);
455 }
456 if barrier_info.kind.is_checkpoint() {
457 self.take_finished_jobs()
458 } else {
459 vec![]
460 }
461 }
462
463 pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
465 self.pending_finished_jobs.push(finished_job);
466 }
467
468 pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
470 tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
471 take(&mut self.pending_finished_jobs)
472 }
473
474 pub(super) fn has_pending_finished_jobs(&self) -> bool {
475 !self.pending_finished_jobs.is_empty()
476 }
477
478 pub(super) fn cancel_command(&mut self, id: TableId) {
479 let _ = self.progress_map.remove(&id);
480 self.pending_finished_jobs
481 .retain(|x| x.table_to_create() != id);
482 self.actor_map.retain(|_, table_id| *table_id != id);
483 }
484
485 pub fn abort_all(&mut self) {
487 self.actor_map.clear();
488 self.pending_finished_jobs.clear();
489 self.progress_map.clear();
490 }
491
492 pub fn add(
496 &mut self,
497 info: &CreateStreamingJobCommandInfo,
498 replace_stream_job: Option<&ReplaceStreamJobPlan>,
499 version_stats: &HummockVersionStats,
500 ) -> Option<TrackingJob> {
501 tracing::trace!(?info, "add job to track");
502 let (info, actors, replace_table_info) = {
503 let CreateStreamingJobCommandInfo {
504 stream_job_fragments,
505 ..
506 } = info;
507 let actors = stream_job_fragments.tracking_progress_actor_ids();
508 if actors.is_empty() {
509 return Some(TrackingJob::New(TrackingCommand {
511 job_id: info.stream_job_fragments.stream_job_id,
512 replace_stream_job: replace_stream_job.cloned(),
513 }));
514 }
515 (info.clone(), actors, replace_stream_job.cloned())
516 };
517
518 let CreateStreamingJobCommandInfo {
519 stream_job_fragments: table_fragments,
520 definition,
521 job_type,
522 create_type,
523 ..
524 } = &info;
525
526 let creating_mv_id = table_fragments.stream_job_id();
527 let upstream_mv_count = table_fragments.upstream_table_counts();
528 let upstream_total_key_count: u64 =
529 calculate_total_key_count(&upstream_mv_count, version_stats);
530
531 for (actor, _backfill_upstream_type) in &actors {
532 self.actor_map.insert(*actor, creating_mv_id);
533 }
534
535 let progress = Progress::new(
536 actors,
537 upstream_mv_count,
538 upstream_total_key_count,
539 definition.clone(),
540 );
541 if *job_type == StreamingJobType::Sink && *create_type == CreateType::Background {
542 Some(TrackingJob::New(TrackingCommand {
548 job_id: creating_mv_id,
549 replace_stream_job: replace_table_info,
550 }))
551 } else {
552 let old = self.progress_map.insert(
553 creating_mv_id,
554 (
555 progress,
556 TrackingJob::New(TrackingCommand {
557 job_id: creating_mv_id,
558 replace_stream_job: replace_table_info,
559 }),
560 ),
561 );
562 assert!(old.is_none());
563 None
564 }
565 }
566
567 pub fn update(
571 &mut self,
572 progress: &CreateMviewProgress,
573 version_stats: &HummockVersionStats,
574 ) -> Option<TrackingJob> {
575 tracing::trace!(?progress, "update progress");
576 let actor = progress.backfill_actor_id;
577 let Some(table_id) = self.actor_map.get(&actor).copied() else {
578 tracing::info!(
585 "no tracked progress for actor {}, the stream job could already be finished",
586 actor
587 );
588 return None;
589 };
590
591 let new_state = if progress.done {
592 BackfillState::Done(progress.consumed_rows)
593 } else {
594 BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
595 };
596
597 match self.progress_map.entry(table_id) {
598 Entry::Occupied(mut o) => {
599 let progress = &mut o.get_mut().0;
600
601 let upstream_total_key_count: u64 =
602 calculate_total_key_count(&progress.upstream_mv_count, version_stats);
603
604 tracing::debug!(?table_id, "updating progress for table");
605 progress.update(actor, new_state, upstream_total_key_count);
606
607 if progress.is_done() {
608 tracing::debug!(
609 "all actors done for creating mview with table_id {}!",
610 table_id
611 );
612
613 for actor in o.get().0.actors() {
615 self.actor_map.remove(&actor);
616 }
617 Some(o.remove().1)
618 } else {
619 None
620 }
621 }
622 Entry::Vacant(_) => {
623 tracing::warn!(
624 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
625 );
626 None
627 }
628 }
629 }
630}
631
632fn calculate_total_key_count(
633 table_count: &HashMap<TableId, usize>,
634 version_stats: &HummockVersionStats,
635) -> u64 {
636 table_count
637 .iter()
638 .map(|(table_id, count)| {
639 assert_ne!(*count, 0);
640 *count as u64
641 * version_stats
642 .table_stats
643 .get(&table_id.table_id)
644 .map_or(0, |stat| stat.total_key_count as u64)
645 })
646 .sum()
647}