risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::ObjectId;
22use risingwave_pb::catalog::CreateType;
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::PbBarrierCompleteResponse;
26use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
27
28use crate::MetaResult;
29use crate::barrier::backfill_order_control::BackfillOrderState;
30use crate::barrier::info::BarrierInfo;
31use crate::barrier::{
32    Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
33};
34use crate::manager::{MetadataManager, StreamingJobType};
35use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
36
37type ConsumedRows = u64;
38
39#[derive(Clone, Copy, Debug)]
40enum BackfillState {
41    Init,
42    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
43    Done(ConsumedRows),
44}
45
46/// Progress of all actors containing backfill executors while creating mview.
47#[derive(Debug)]
48pub(super) struct Progress {
49    // `states` and `done_count` decides whether the progress is done. See `is_done`.
50    states: HashMap<ActorId, BackfillState>,
51    backfill_order_state: BackfillOrderState,
52    done_count: usize,
53
54    /// Tells whether the backfill is from source or mv.
55    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
56
57    // The following row counts are used to calculate the progress. See `calculate_progress`.
58    /// Upstream mv count.
59    /// Keep track of how many times each upstream MV
60    /// appears in this stream job.
61    upstream_mv_count: HashMap<TableId, usize>,
62    /// Total key count of all the upstream materialized views
63    upstream_mvs_total_key_count: u64,
64    mv_backfill_consumed_rows: u64,
65    source_backfill_consumed_rows: u64,
66
67    /// DDL definition
68    definition: String,
69}
70
71impl Progress {
72    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
73    fn new(
74        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
75        upstream_mv_count: HashMap<TableId, usize>,
76        upstream_total_key_count: u64,
77        definition: String,
78        backfill_order_state: BackfillOrderState,
79    ) -> Self {
80        let mut states = HashMap::new();
81        let mut backfill_upstream_types = HashMap::new();
82        for (actor, backfill_upstream_type) in actors {
83            states.insert(actor, BackfillState::Init);
84            backfill_upstream_types.insert(actor, backfill_upstream_type);
85        }
86        assert!(!states.is_empty());
87
88        Self {
89            states,
90            backfill_upstream_types,
91            done_count: 0,
92            upstream_mv_count,
93            upstream_mvs_total_key_count: upstream_total_key_count,
94            mv_backfill_consumed_rows: 0,
95            source_backfill_consumed_rows: 0,
96            definition,
97            backfill_order_state,
98        }
99    }
100
101    /// Update the progress of `actor`.
102    fn update(
103        &mut self,
104        actor: ActorId,
105        new_state: BackfillState,
106        upstream_total_key_count: u64,
107    ) -> Vec<FragmentId> {
108        let mut next_backfill_nodes = vec![];
109        self.upstream_mvs_total_key_count = upstream_total_key_count;
110        let total_actors = self.states.len();
111        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
112        tracing::debug!(?actor, states = ?self.states, "update progress for actor");
113
114        let mut old = 0;
115        let mut new = 0;
116        match self.states.remove(&actor).unwrap() {
117            BackfillState::Init => {}
118            BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
119                old = old_consumed_rows;
120            }
121            BackfillState::Done(_) => panic!("should not report done multiple times"),
122        };
123        match &new_state {
124            BackfillState::Init => {}
125            BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
126                new = *new_consumed_rows;
127            }
128            BackfillState::Done(new_consumed_rows) => {
129                tracing::debug!("actor {} done", actor);
130                new = *new_consumed_rows;
131                self.done_count += 1;
132                next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
133                tracing::debug!(
134                    "{} actors out of {} complete",
135                    self.done_count,
136                    total_actors,
137                );
138            }
139        };
140        debug_assert!(new >= old, "backfill progress should not go backward");
141        match backfill_upstream_type {
142            BackfillUpstreamType::MView => {
143                self.mv_backfill_consumed_rows += new - old;
144            }
145            BackfillUpstreamType::Source => {
146                self.source_backfill_consumed_rows += new - old;
147            }
148            BackfillUpstreamType::Values => {
149                // do not consider progress for values
150            }
151        }
152        self.states.insert(actor, new_state);
153        next_backfill_nodes
154    }
155
156    /// Returns whether all backfill executors are done.
157    fn is_done(&self) -> bool {
158        tracing::trace!(
159            "Progress::is_done? {}, {}, {:?}",
160            self.done_count,
161            self.states.len(),
162            self.states
163        );
164        self.done_count == self.states.len()
165    }
166
167    /// Returns the ids of all actors containing the backfill executors for the mview tracked by this
168    /// [`Progress`].
169    fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
170        self.states.keys().cloned()
171    }
172
173    /// `progress` = `consumed_rows` / `upstream_total_key_count`
174    fn calculate_progress(&self) -> String {
175        if self.is_done() || self.states.is_empty() {
176            return "100%".to_owned();
177        }
178        let mut mv_count = 0;
179        let mut source_count = 0;
180        for backfill_upstream_type in self.backfill_upstream_types.values() {
181            match backfill_upstream_type {
182                BackfillUpstreamType::MView => mv_count += 1,
183                BackfillUpstreamType::Source => source_count += 1,
184                BackfillUpstreamType::Values => (),
185            }
186        }
187
188        let mv_progress = (mv_count > 0).then_some({
189            if self.upstream_mvs_total_key_count == 0 {
190                "99.99%".to_owned()
191            } else {
192                let mut progress = self.mv_backfill_consumed_rows as f64
193                    / (self.upstream_mvs_total_key_count as f64);
194                if progress > 1.0 {
195                    progress = 0.9999;
196                }
197                format!(
198                    "{:.2}% ({}/{})",
199                    progress * 100.0,
200                    self.mv_backfill_consumed_rows,
201                    self.upstream_mvs_total_key_count
202                )
203            }
204        });
205        let source_progress = (source_count > 0).then_some(format!(
206            "{} rows consumed",
207            self.source_backfill_consumed_rows
208        ));
209        match (mv_progress, source_progress) {
210            (Some(mv_progress), Some(source_progress)) => {
211                format!(
212                    "MView Backfill: {}, Source Backfill: {}",
213                    mv_progress, source_progress
214                )
215            }
216            (Some(mv_progress), None) => mv_progress,
217            (None, Some(source_progress)) => source_progress,
218            (None, None) => "Unknown".to_owned(),
219        }
220    }
221}
222
223/// There are 2 kinds of `TrackingJobs`:
224/// 1. `New`. This refers to the "New" type of tracking job.
225///    It is instantiated and managed by the stream manager.
226///    On recovery, the stream manager will stop managing the job.
227/// 2. `Recovered`. This refers to the "Recovered" type of tracking job.
228///    On recovery, the barrier manager will recover and start managing the job.
229pub enum TrackingJob {
230    New(TrackingCommand),
231    Recovered(RecoveredTrackingJob),
232}
233
234impl std::fmt::Display for TrackingJob {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        match self {
237            TrackingJob::New(command) => write!(f, "{}", command.job_id),
238            TrackingJob::Recovered(recovered) => write!(f, "{}<recovered>", recovered.id),
239        }
240    }
241}
242
243impl TrackingJob {
244    /// Notify metadata manager that the job is finished.
245    pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
246        match self {
247            TrackingJob::New(command) => {
248                metadata_manager
249                    .catalog_controller
250                    .finish_streaming_job(
251                        command.job_id.table_id as i32,
252                        command.replace_stream_job.clone(),
253                    )
254                    .await?;
255                Ok(())
256            }
257            TrackingJob::Recovered(recovered) => {
258                metadata_manager
259                    .catalog_controller
260                    .finish_streaming_job(recovered.id, None)
261                    .await?;
262                Ok(())
263            }
264        }
265    }
266
267    pub(crate) fn table_to_create(&self) -> TableId {
268        match self {
269            TrackingJob::New(command) => command.job_id,
270            TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
271        }
272    }
273}
274
275impl std::fmt::Debug for TrackingJob {
276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277        match self {
278            TrackingJob::New(command) => write!(f, "TrackingJob::New({:?})", command.job_id),
279            TrackingJob::Recovered(recovered) => {
280                write!(f, "TrackingJob::RecoveredV2({:?})", recovered.id)
281            }
282        }
283    }
284}
285
286pub struct RecoveredTrackingJob {
287    pub id: ObjectId,
288}
289
290/// The command tracking by the [`CreateMviewProgressTracker`].
291pub(super) struct TrackingCommand {
292    pub job_id: TableId,
293    pub replace_stream_job: Option<ReplaceStreamJobPlan>,
294}
295
296pub(super) enum UpdateProgressResult {
297    None,
298    Finished(TrackingJob),
299    BackfillNodeFinished(Vec<FragmentId>),
300}
301
302/// Tracking is done as follows:
303/// 1. We identify a `StreamJob` by its `TableId` of its `Materialized` table.
304/// 2. For each stream job, there are several actors which run its tasks.
305/// 3. With `progress_map` we can use the ID of the `StreamJob` to view its progress.
306/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
307#[derive(Default, Debug)]
308pub(super) struct CreateMviewProgressTracker {
309    /// Progress of the create-mview DDL indicated by the `TableId`.
310    progress_map: HashMap<TableId, (Progress, TrackingJob)>,
311
312    actor_map: HashMap<ActorId, TableId>,
313
314    /// Stash of finished jobs. They will be finally finished on checkpoint.
315    pending_finished_jobs: Vec<TrackingJob>,
316
317    /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
318    pending_backfill_nodes: Vec<FragmentId>,
319}
320
321impl CreateMviewProgressTracker {
322    /// This step recovers state from the meta side:
323    /// 1. `Tables`.
324    /// 2. `TableFragments`.
325    ///
326    /// Other state are persisted by the `BackfillExecutor`, such as:
327    /// 1. `CreateMviewProgress`.
328    /// 2. `Backfill` position.
329    pub fn recover(
330        mviews: impl IntoIterator<Item = (TableId, (String, &StreamJobFragments, BackfillOrderState))>,
331        version_stats: &HummockVersionStats,
332    ) -> Self {
333        let mut actor_map = HashMap::new();
334        let mut progress_map = HashMap::new();
335        for (creating_table_id, (definition, table_fragments, backfill_order_state)) in mviews {
336            let mut states = HashMap::new();
337            let mut backfill_upstream_types = HashMap::new();
338            let actors = table_fragments.tracking_progress_actor_ids();
339            for (actor, backfill_upstream_type) in actors {
340                actor_map.insert(actor, creating_table_id);
341                states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
342                backfill_upstream_types.insert(actor, backfill_upstream_type);
343            }
344
345            let progress = Self::recover_progress(
346                states,
347                backfill_upstream_types,
348                table_fragments.upstream_table_counts(),
349                definition,
350                version_stats,
351                backfill_order_state,
352            );
353            let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob {
354                id: creating_table_id.table_id as i32,
355            });
356            progress_map.insert(creating_table_id, (progress, tracking_job));
357        }
358        Self {
359            progress_map,
360            actor_map,
361            pending_finished_jobs: Vec::new(),
362            pending_backfill_nodes: Vec::new(),
363        }
364    }
365
366    /// ## How recovery works
367    ///
368    /// The progress (number of rows consumed) is persisted in state tables.
369    /// During recovery, the backfill executor will restore the number of rows consumed,
370    /// and then it will just report progress like newly created executors.
371    fn recover_progress(
372        states: HashMap<ActorId, BackfillState>,
373        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
374        upstream_mv_count: HashMap<TableId, usize>,
375        definition: String,
376        version_stats: &HummockVersionStats,
377        backfill_order_state: BackfillOrderState,
378    ) -> Progress {
379        let upstream_mvs_total_key_count =
380            calculate_total_key_count(&upstream_mv_count, version_stats);
381        Progress {
382            states,
383            backfill_order_state,
384            backfill_upstream_types,
385            done_count: 0, // Fill only after first barrier pass
386            upstream_mv_count,
387            upstream_mvs_total_key_count,
388            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
389            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
390            definition,
391        }
392    }
393
394    pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
395        self.progress_map
396            .iter()
397            .map(|(table_id, (x, _))| {
398                let table_id = table_id.table_id;
399                let ddl_progress = DdlProgress {
400                    id: table_id as u64,
401                    statement: x.definition.clone(),
402                    progress: x.calculate_progress(),
403                };
404                (table_id, ddl_progress)
405            })
406            .collect()
407    }
408
409    pub(super) fn update_tracking_jobs<'a>(
410        &mut self,
411        info: Option<(
412            &CreateStreamingJobCommandInfo,
413            Option<&ReplaceStreamJobPlan>,
414        )>,
415        create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
416        version_stats: &HummockVersionStats,
417    ) {
418        {
419            {
420                // Save `finished_commands` for Create MVs.
421                let finished_commands = {
422                    let mut commands = vec![];
423                    // Add the command to tracker.
424                    if let Some((create_job_info, replace_stream_job)) = info
425                        && let Some(command) =
426                            self.add(create_job_info, replace_stream_job, version_stats)
427                    {
428                        // Those with no actors to track can be finished immediately.
429                        commands.push(command);
430                    }
431                    // Update the progress of all commands.
432                    for progress in create_mview_progress {
433                        // Those with actors complete can be finished immediately.
434                        match self.update(progress, version_stats) {
435                            UpdateProgressResult::None => {
436                                tracing::trace!(?progress, "update progress");
437                            }
438                            UpdateProgressResult::Finished(command) => {
439                                tracing::trace!(?progress, "finish progress");
440                                commands.push(command);
441                            }
442                            UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) => {
443                                tracing::trace!(
444                                    ?progress,
445                                    ?next_backfill_nodes,
446                                    "start next backfill node"
447                                );
448                                self.queue_backfill(next_backfill_nodes);
449                            }
450                        }
451                    }
452                    commands
453                };
454
455                for command in finished_commands {
456                    self.stash_command_to_finish(command);
457                }
458            }
459        }
460    }
461
462    /// Apply a collected epoch node command to the tracker
463    /// Return the finished jobs when the barrier kind is `Checkpoint`
464    pub(super) fn apply_collected_command(
465        &mut self,
466        command: Option<&Command>,
467        barrier_info: &BarrierInfo,
468        resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
469        version_stats: &HummockVersionStats,
470    ) -> Vec<TrackingJob> {
471        let new_tracking_job_info =
472            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
473                match job_type {
474                    CreateStreamingJobType::Normal => Some((info, None)),
475                    CreateStreamingJobType::SinkIntoTable(replace_stream_job) => {
476                        Some((info, Some(replace_stream_job)))
477                    }
478                    CreateStreamingJobType::SnapshotBackfill(_) => {
479                        // The progress of SnapshotBackfill won't be tracked here
480                        None
481                    }
482                }
483            } else {
484                None
485            };
486        self.update_tracking_jobs(
487            new_tracking_job_info,
488            resps
489                .into_iter()
490                .flat_map(|resp| resp.create_mview_progress.iter()),
491            version_stats,
492        );
493        for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
494            // the cancelled command is possibly stashed in `finished_commands` and waiting
495            // for checkpoint, we should also clear it.
496            self.cancel_command(table_id);
497        }
498        if barrier_info.kind.is_checkpoint() {
499            self.take_finished_jobs()
500        } else {
501            vec![]
502        }
503    }
504
505    /// Stash a command to finish later.
506    pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
507        self.pending_finished_jobs.push(finished_job);
508    }
509
510    fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
511        self.pending_backfill_nodes.extend(backfill_nodes);
512    }
513
514    /// Finish stashed jobs on checkpoint.
515    pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
516        tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
517        take(&mut self.pending_finished_jobs)
518    }
519
520    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
521        take(&mut self.pending_backfill_nodes)
522    }
523
524    pub(super) fn has_pending_finished_jobs(&self) -> bool {
525        !self.pending_finished_jobs.is_empty()
526    }
527
528    pub(super) fn cancel_command(&mut self, id: TableId) {
529        let _ = self.progress_map.remove(&id);
530        self.pending_finished_jobs
531            .retain(|x| x.table_to_create() != id);
532        self.actor_map.retain(|_, table_id| *table_id != id);
533    }
534
535    /// Notify all tracked commands that error encountered and clear them.
536    pub fn abort_all(&mut self) {
537        self.actor_map.clear();
538        self.pending_finished_jobs.clear();
539        self.progress_map.clear();
540    }
541
542    /// Add a new create-mview DDL command to track.
543    ///
544    /// If the actors to track is empty, return the given command as it can be finished immediately.
545    pub fn add(
546        &mut self,
547        info: &CreateStreamingJobCommandInfo,
548        replace_stream_job: Option<&ReplaceStreamJobPlan>,
549        version_stats: &HummockVersionStats,
550    ) -> Option<TrackingJob> {
551        tracing::trace!(?info, "add job to track");
552        let (info, actors, replace_table_info) = {
553            let CreateStreamingJobCommandInfo {
554                stream_job_fragments,
555                ..
556            } = info;
557            let actors = stream_job_fragments.tracking_progress_actor_ids();
558            if actors.is_empty() {
559                // The command can be finished immediately.
560                return Some(TrackingJob::New(TrackingCommand {
561                    job_id: info.stream_job_fragments.stream_job_id,
562                    replace_stream_job: replace_stream_job.cloned(),
563                }));
564            }
565            (info.clone(), actors, replace_stream_job.cloned())
566        };
567
568        let CreateStreamingJobCommandInfo {
569            stream_job_fragments: table_fragments,
570            definition,
571            job_type,
572            create_type,
573            fragment_backfill_ordering,
574            ..
575        } = info;
576
577        let creating_mv_id = table_fragments.stream_job_id();
578        let upstream_mv_count = table_fragments.upstream_table_counts();
579        let upstream_total_key_count: u64 =
580            calculate_total_key_count(&upstream_mv_count, version_stats);
581
582        for (actor, _backfill_upstream_type) in &actors {
583            self.actor_map.insert(*actor, creating_mv_id);
584        }
585
586        let backfill_order_state =
587            BackfillOrderState::new(fragment_backfill_ordering, &table_fragments);
588        let progress = Progress::new(
589            actors,
590            upstream_mv_count,
591            upstream_total_key_count,
592            definition.clone(),
593            backfill_order_state,
594        );
595        if job_type == StreamingJobType::Sink && create_type == CreateType::Background {
596            // We return the original tracking job immediately.
597            // This is because sink can be decoupled with backfill progress.
598            // We don't need to wait for sink to finish backfill.
599            // This still contains the notifiers, so we can tell listeners
600            // that the sink job has been created.
601            Some(TrackingJob::New(TrackingCommand {
602                job_id: creating_mv_id,
603                replace_stream_job: replace_table_info,
604            }))
605        } else {
606            let old = self.progress_map.insert(
607                creating_mv_id,
608                (
609                    progress,
610                    TrackingJob::New(TrackingCommand {
611                        job_id: creating_mv_id,
612                        replace_stream_job: replace_table_info,
613                    }),
614                ),
615            );
616            assert!(old.is_none());
617            None
618        }
619    }
620
621    /// Update the progress of `actor` according to the Pb struct.
622    ///
623    /// If all actors in this MV have finished, returns the command.
624    pub fn update(
625        &mut self,
626        progress: &CreateMviewProgress,
627        version_stats: &HummockVersionStats,
628    ) -> UpdateProgressResult {
629        tracing::trace!(?progress, "update progress");
630        let actor = progress.backfill_actor_id;
631        let Some(table_id) = self.actor_map.get(&actor).copied() else {
632            // On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
633            // even if backfill is finished on recovery.
634            // This is because we don't know if only this actor is finished,
635            // OR the entire stream job is finished.
636            // For the first case, we must notify meta.
637            // For the second case, we can still notify meta, but ignore it here.
638            tracing::info!(
639                "no tracked progress for actor {}, the stream job could already be finished",
640                actor
641            );
642            return UpdateProgressResult::None;
643        };
644
645        let new_state = if progress.done {
646            BackfillState::Done(progress.consumed_rows)
647        } else {
648            BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
649        };
650
651        match self.progress_map.entry(table_id) {
652            Entry::Occupied(mut o) => {
653                let progress = &mut o.get_mut().0;
654
655                let upstream_total_key_count: u64 =
656                    calculate_total_key_count(&progress.upstream_mv_count, version_stats);
657
658                tracing::debug!(?table_id, "updating progress for table");
659                let next_backfill_nodes =
660                    progress.update(actor, new_state, upstream_total_key_count);
661
662                if progress.is_done() {
663                    tracing::debug!(
664                        "all actors done for creating mview with table_id {}!",
665                        table_id
666                    );
667
668                    // Clean-up the mapping from actors to DDL table_id.
669                    for actor in o.get().0.actors() {
670                        self.actor_map.remove(&actor);
671                    }
672                    assert!(next_backfill_nodes.is_empty());
673                    UpdateProgressResult::Finished(o.remove().1)
674                } else if !next_backfill_nodes.is_empty() {
675                    tracing::debug!("scheduling next backfill nodes: {:?}", next_backfill_nodes);
676                    UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes)
677                } else {
678                    UpdateProgressResult::None
679                }
680            }
681            Entry::Vacant(_) => {
682                tracing::warn!(
683                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
684                );
685                UpdateProgressResult::None
686            }
687        }
688    }
689}
690
691fn calculate_total_key_count(
692    table_count: &HashMap<TableId, usize>,
693    version_stats: &HummockVersionStats,
694) -> u64 {
695    table_count
696        .iter()
697        .map(|(table_id, count)| {
698            assert_ne!(*count, 0);
699            *count as u64
700                * version_stats
701                    .table_stats
702                    .get(&table_id.table_id)
703                    .map_or(0, |stat| stat.total_key_count as u64)
704        })
705        .sum()
706}