risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::CreateType;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
25
26use crate::MetaResult;
27use crate::barrier::CreateStreamingJobCommandInfo;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::InflightStreamingJobInfo;
30use crate::controller::fragment::InflightFragmentInfo;
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40    Init,
41    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42    Done(ConsumedRows, BufferedRows),
43}
44
45/// Represents the backfill nodes that need to be scheduled or cleaned up.
46#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48    /// Fragment IDs that should start backfilling in the next checkpoint
49    pub next_backfill_nodes: Vec<FragmentId>,
50    /// State tables of locality provider fragments that should be truncated
51    pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54/// Progress of all actors containing backfill executors while creating mview.
55#[derive(Debug, Clone)]
56pub(super) struct Progress {
57    job_id: JobId,
58    // `states` and `done_count` decides whether the progress is done. See `is_done`.
59    states: HashMap<ActorId, BackfillState>,
60    backfill_order_state: BackfillOrderState,
61    done_count: usize,
62
63    /// Tells whether the backfill is from source or mv.
64    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66    // The following row counts are used to calculate the progress. See `calculate_progress`.
67    /// Upstream mv count.
68    /// Keep track of how many times each upstream MV
69    /// appears in this stream job.
70    upstream_mv_count: HashMap<TableId, usize>,
71    /// Total key count of all the upstream materialized views
72    upstream_mvs_total_key_count: u64,
73    mv_backfill_consumed_rows: u64,
74    source_backfill_consumed_rows: u64,
75    /// Buffered rows (for locality backfill) that are yet to be consumed
76    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
77    mv_backfill_buffered_rows: u64,
78}
79
80impl Progress {
81    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
82    fn new(
83        job_id: JobId,
84        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
85        upstream_mv_count: HashMap<TableId, usize>,
86        upstream_total_key_count: u64,
87        backfill_order_state: BackfillOrderState,
88    ) -> Self {
89        let mut states = HashMap::new();
90        let mut backfill_upstream_types = HashMap::new();
91        for (actor, backfill_upstream_type) in actors {
92            states.insert(actor, BackfillState::Init);
93            backfill_upstream_types.insert(actor, backfill_upstream_type);
94        }
95        assert!(!states.is_empty());
96
97        Self {
98            job_id,
99            states,
100            backfill_upstream_types,
101            done_count: 0,
102            upstream_mv_count,
103            upstream_mvs_total_key_count: upstream_total_key_count,
104            mv_backfill_consumed_rows: 0,
105            source_backfill_consumed_rows: 0,
106            mv_backfill_buffered_rows: 0,
107            backfill_order_state,
108        }
109    }
110
111    /// Update the progress of `actor`.
112    /// Returns the backfill fragments that need to be scheduled or cleaned up.
113    fn update(
114        &mut self,
115        actor: ActorId,
116        new_state: BackfillState,
117        upstream_total_key_count: u64,
118    ) -> PendingBackfillFragments {
119        let mut result = PendingBackfillFragments::default();
120        self.upstream_mvs_total_key_count = upstream_total_key_count;
121        let total_actors = self.states.len();
122        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
123
124        let mut old_consumed_row = 0;
125        let mut new_consumed_row = 0;
126        let mut old_buffered_row = 0;
127        let mut new_buffered_row = 0;
128        match self.states.remove(&actor).unwrap() {
129            BackfillState::Init => {}
130            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
131                old_consumed_row = consumed_rows;
132                old_buffered_row = buffered_rows;
133            }
134            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
135        };
136        match &new_state {
137            BackfillState::Init => {}
138            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
139                new_consumed_row = *consumed_rows;
140                new_buffered_row = *buffered_rows;
141            }
142            BackfillState::Done(consumed_rows, buffered_rows) => {
143                tracing::debug!("actor {} done", actor);
144                new_consumed_row = *consumed_rows;
145                new_buffered_row = *buffered_rows;
146                self.done_count += 1;
147                let before_backfill_nodes = self
148                    .backfill_order_state
149                    .current_backfill_node_fragment_ids();
150                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
151                let after_backfill_nodes = self
152                    .backfill_order_state
153                    .current_backfill_node_fragment_ids();
154                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
155                let last_backfill_nodes_iter = before_backfill_nodes
156                    .into_iter()
157                    .filter(|x| !after_backfill_nodes.contains(x));
158                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
159                    .filter_map(|fragment_id| {
160                        self.backfill_order_state
161                            .get_locality_fragment_state_table_mapping()
162                            .get(&fragment_id)
163                    })
164                    .flatten()
165                    .copied()
166                    .collect();
167                tracing::debug!(
168                    "{} actors out of {} complete",
169                    self.done_count,
170                    total_actors,
171                );
172            }
173        };
174        debug_assert!(
175            new_consumed_row >= old_consumed_row,
176            "backfill progress should not go backward"
177        );
178        debug_assert!(
179            new_buffered_row >= old_buffered_row,
180            "backfill progress should not go backward"
181        );
182        match backfill_upstream_type {
183            BackfillUpstreamType::MView => {
184                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
185            }
186            BackfillUpstreamType::Source => {
187                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
188            }
189            BackfillUpstreamType::Values => {
190                // do not consider progress for values
191            }
192            BackfillUpstreamType::LocalityProvider => {
193                // Track LocalityProvider progress similar to MView
194                // Update buffered rows for precise progress calculation
195                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
196                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
197            }
198        }
199        self.states.insert(actor, new_state);
200        result
201    }
202
203    /// Returns whether all backfill executors are done.
204    fn is_done(&self) -> bool {
205        tracing::trace!(
206            "Progress::is_done? {}, {}, {:?}",
207            self.done_count,
208            self.states.len(),
209            self.states
210        );
211        self.done_count == self.states.len()
212    }
213
214    /// `progress` = `consumed_rows` / `upstream_total_key_count`
215    fn calculate_progress(&self) -> String {
216        if self.is_done() || self.states.is_empty() {
217            return "100%".to_owned();
218        }
219        let mut mv_count = 0;
220        let mut source_count = 0;
221        for backfill_upstream_type in self.backfill_upstream_types.values() {
222            match backfill_upstream_type {
223                BackfillUpstreamType::MView => mv_count += 1,
224                BackfillUpstreamType::Source => source_count += 1,
225                BackfillUpstreamType::Values => (),
226                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
227            }
228        }
229
230        let mv_progress = (mv_count > 0).then_some({
231            // Include buffered rows in total for precise progress calculation
232            // Progress = consumed / (upstream_total + buffered)
233            let total_rows_to_consume =
234                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
235            if total_rows_to_consume == 0 {
236                "99.99%".to_owned()
237            } else {
238                let mut progress =
239                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
240                if progress > 1.0 {
241                    progress = 0.9999;
242                }
243                format!(
244                    "{:.2}% ({}/{})",
245                    progress * 100.0,
246                    self.mv_backfill_consumed_rows,
247                    total_rows_to_consume
248                )
249            }
250        });
251        let source_progress = (source_count > 0).then_some(format!(
252            "{} rows consumed",
253            self.source_backfill_consumed_rows
254        ));
255        match (mv_progress, source_progress) {
256            (Some(mv_progress), Some(source_progress)) => {
257                format!(
258                    "MView Backfill: {}, Source Backfill: {}",
259                    mv_progress, source_progress
260                )
261            }
262            (Some(mv_progress), None) => mv_progress,
263            (None, Some(source_progress)) => source_progress,
264            (None, None) => "Unknown".to_owned(),
265        }
266    }
267}
268
269/// There are two kinds of `TrackingJobs`:
270/// 1. if `is_recovered` is false, it is a "New" tracking job.
271///    It is instantiated and managed by the stream manager.
272///    On recovery, the stream manager will stop managing the job.
273/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
274///    On recovery, the barrier manager will recover and start managing the job.
275#[derive(Clone)]
276pub struct TrackingJob {
277    job_id: JobId,
278    is_recovered: bool,
279    source_change: Option<SourceChange>,
280}
281
282impl std::fmt::Display for TrackingJob {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        write!(
285            f,
286            "{}{}",
287            self.job_id,
288            if self.is_recovered { "<recovered>" } else { "" }
289        )
290    }
291}
292
293impl TrackingJob {
294    /// Create a new tracking job.
295    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
296        Self {
297            job_id: stream_job_fragments.stream_job_id,
298            is_recovered: false,
299            source_change: Some(SourceChange::CreateJobFinished {
300                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
301            }),
302        }
303    }
304
305    /// Create a recovered tracking job.
306    pub(crate) fn recovered(
307        job_id: JobId,
308        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
309    ) -> Self {
310        let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
311            fragment_infos
312                .iter()
313                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
314        );
315        let source_change = if source_backfill_fragments.is_empty() {
316            None
317        } else {
318            Some(SourceChange::CreateJobFinished {
319                finished_backfill_fragments: source_backfill_fragments,
320            })
321        };
322        Self {
323            job_id,
324            is_recovered: true,
325            source_change,
326        }
327    }
328
329    /// Notify the metadata manager that the job is finished.
330    pub(crate) async fn finish(
331        self,
332        metadata_manager: &MetadataManager,
333        source_manager: &SourceManagerRef,
334    ) -> MetaResult<()> {
335        metadata_manager
336            .catalog_controller
337            .finish_streaming_job(self.job_id)
338            .await?;
339        if let Some(source_change) = self.source_change {
340            source_manager.apply_source_change(source_change).await;
341        }
342        Ok(())
343    }
344}
345
346impl std::fmt::Debug for TrackingJob {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        if !self.is_recovered {
349            write!(f, "TrackingJob::New({})", self.job_id)
350        } else {
351            write!(f, "TrackingJob::Recovered({})", self.job_id)
352        }
353    }
354}
355
356/// Information collected during barrier completion that needs to be committed.
357#[derive(Debug, Default)]
358pub(super) struct StagingCommitInfo {
359    /// Finished jobs that should be committed
360    pub finished_jobs: Vec<TrackingJob>,
361    /// Table IDs whose locality provider state tables need to be truncated
362    pub table_ids_to_truncate: Vec<TableId>,
363}
364
365pub(super) enum UpdateProgressResult {
366    None,
367    /// The finished job, along with its pending backfill fragments for cleanup.
368    Finished {
369        truncate_locality_provider_state_tables: Vec<TableId>,
370    },
371    /// Backfill nodes have finished and new ones need to be scheduled.
372    BackfillNodeFinished(PendingBackfillFragments),
373}
374
375#[derive(Debug, Clone)]
376pub(super) struct CreateMviewProgressTracker {
377    job_id: JobId,
378    definition: String,
379    create_type: CreateType,
380    tracking_job: TrackingJob,
381    status: CreateMviewStatus,
382}
383
384#[derive(Debug, Clone)]
385enum CreateMviewStatus {
386    Backfilling {
387        /// Progress of the create-mview DDL.
388        progress: Progress,
389
390        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
391        pending_backfill_nodes: Vec<FragmentId>,
392
393        /// Table IDs whose locality provider state tables need to be truncated
394        table_ids_to_truncate: Vec<TableId>,
395    },
396    Finished {
397        table_ids_to_truncate: Vec<TableId>,
398    },
399}
400
401impl CreateMviewProgressTracker {
402    pub fn recover(
403        creating_job_id: JobId,
404        definition: String,
405        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
406        backfill_order_state: BackfillOrderState,
407        version_stats: &HummockVersionStats,
408    ) -> Self {
409        {
410            let create_type = CreateType::Background;
411            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
412            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
413            let status = if actors.is_empty() {
414                CreateMviewStatus::Finished {
415                    table_ids_to_truncate: vec![],
416                }
417            } else {
418                let mut states = HashMap::new();
419                let mut backfill_upstream_types = HashMap::new();
420
421                for (actor, backfill_upstream_type) in actors {
422                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
423                    backfill_upstream_types.insert(actor, backfill_upstream_type);
424                }
425
426                let progress = Self::recover_progress(
427                    creating_job_id,
428                    states,
429                    backfill_upstream_types,
430                    StreamJobFragments::upstream_table_counts_impl(
431                        fragment_infos.values().map(|fragment| &fragment.nodes),
432                    ),
433                    version_stats,
434                    backfill_order_state,
435                );
436                CreateMviewStatus::Backfilling {
437                    progress,
438                    pending_backfill_nodes: vec![],
439                    table_ids_to_truncate: vec![],
440                }
441            };
442            Self {
443                job_id: creating_job_id,
444                definition,
445                create_type,
446                tracking_job,
447                status,
448            }
449        }
450    }
451
452    /// ## How recovery works
453    ///
454    /// The progress (number of rows consumed) is persisted in state tables.
455    /// During recovery, the backfill executor will restore the number of rows consumed,
456    /// and then it will just report progress like newly created executors.
457    fn recover_progress(
458        job_id: JobId,
459        states: HashMap<ActorId, BackfillState>,
460        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
461        upstream_mv_count: HashMap<TableId, usize>,
462        version_stats: &HummockVersionStats,
463        backfill_order_state: BackfillOrderState,
464    ) -> Progress {
465        let upstream_mvs_total_key_count =
466            calculate_total_key_count(&upstream_mv_count, version_stats);
467        Progress {
468            job_id,
469            states,
470            backfill_order_state,
471            backfill_upstream_types,
472            done_count: 0, // Fill only after first barrier pass
473            upstream_mv_count,
474            upstream_mvs_total_key_count,
475            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
476            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
477            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
478        }
479    }
480
481    pub fn gen_ddl_progress(&self) -> DdlProgress {
482        let progress = match &self.status {
483            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
484            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
485        };
486        DdlProgress {
487            id: self.job_id.as_raw_id() as u64,
488            statement: self.definition.clone(),
489            create_type: self.create_type.as_str().to_owned(),
490            progress,
491        }
492    }
493
494    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
495    /// Return the table ids whose locality provider state tables need to be truncated.
496    pub(super) fn apply_progress(
497        &mut self,
498        create_mview_progress: &CreateMviewProgress,
499        version_stats: &HummockVersionStats,
500    ) {
501        let CreateMviewStatus::Backfilling {
502            progress,
503            pending_backfill_nodes,
504            table_ids_to_truncate,
505        } = &mut self.status
506        else {
507            tracing::warn!(
508                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
509            );
510            return;
511        };
512        {
513            // Update the progress of all commands.
514            {
515                // Those with actors complete can be finished immediately.
516                match progress.apply(create_mview_progress, version_stats) {
517                    UpdateProgressResult::None => {
518                        tracing::trace!(?progress, "update progress");
519                    }
520                    UpdateProgressResult::Finished {
521                        truncate_locality_provider_state_tables,
522                    } => {
523                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
524                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
525                        tracing::trace!(?progress, "finish progress");
526                        self.status = CreateMviewStatus::Finished {
527                            table_ids_to_truncate,
528                        };
529                    }
530                    UpdateProgressResult::BackfillNodeFinished(pending) => {
531                        table_ids_to_truncate
532                            .extend(pending.truncate_locality_provider_state_tables.clone());
533                        tracing::trace!(
534                            ?progress,
535                            next_backfill_nodes = ?pending.next_backfill_nodes,
536                            "start next backfill node"
537                        );
538                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
539                    }
540                }
541            }
542        }
543    }
544
545    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
546        match &mut self.status {
547            CreateMviewStatus::Backfilling {
548                pending_backfill_nodes,
549                ..
550            } => Some(pending_backfill_nodes.drain(..)),
551            CreateMviewStatus::Finished { .. } => None,
552        }
553        .into_iter()
554        .flatten()
555    }
556
557    pub(super) fn collect_staging_commit_info(
558        &mut self,
559    ) -> (bool, impl Iterator<Item = TableId> + '_) {
560        let (is_finished, table_ids) = match &mut self.status {
561            CreateMviewStatus::Backfilling {
562                table_ids_to_truncate,
563                ..
564            } => (false, table_ids_to_truncate),
565            CreateMviewStatus::Finished {
566                table_ids_to_truncate,
567                ..
568            } => (true, table_ids_to_truncate),
569        };
570        (is_finished, table_ids.drain(..))
571    }
572
573    pub(super) fn is_finished(&self) -> bool {
574        matches!(self.status, CreateMviewStatus::Finished { .. })
575    }
576
577    pub(super) fn into_tracking_job(self) -> TrackingJob {
578        let CreateMviewStatus::Finished { .. } = self.status else {
579            panic!("should be called when finished");
580        };
581        self.tracking_job
582    }
583
584    /// Add a new create-mview DDL command to track.
585    ///
586    /// If the actors to track are empty, return the given command as it can be finished immediately.
587    pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
588        tracing::trace!(?info, "add job to track");
589        let CreateStreamingJobCommandInfo {
590            stream_job_fragments,
591            definition,
592            create_type,
593            fragment_backfill_ordering,
594            locality_fragment_state_table_mapping,
595            ..
596        } = info;
597        let job_id = stream_job_fragments.stream_job_id();
598        let definition = definition.clone();
599        let create_type = (*create_type).into();
600        let actors = stream_job_fragments.tracking_progress_actor_ids();
601        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
602        if actors.is_empty() {
603            // The command can be finished immediately.
604            return Self {
605                job_id,
606                definition,
607                create_type,
608                tracking_job,
609                status: CreateMviewStatus::Finished {
610                    table_ids_to_truncate: vec![],
611                },
612            };
613        }
614
615        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
616        let upstream_total_key_count: u64 =
617            calculate_total_key_count(&upstream_mv_count, version_stats);
618
619        let backfill_order_state = BackfillOrderState::new(
620            fragment_backfill_ordering,
621            stream_job_fragments,
622            locality_fragment_state_table_mapping.clone(),
623        );
624        let progress = Progress::new(
625            job_id,
626            actors,
627            upstream_mv_count,
628            upstream_total_key_count,
629            backfill_order_state,
630        );
631        Self {
632            job_id,
633            definition,
634            create_type,
635            tracking_job,
636            status: CreateMviewStatus::Backfilling {
637                progress,
638                pending_backfill_nodes: vec![],
639                table_ids_to_truncate: vec![],
640            },
641        }
642    }
643}
644
645impl Progress {
646    /// Update the progress of `actor` according to the Pb struct.
647    ///
648    /// If all actors in this MV have finished, return the command.
649    fn apply(
650        &mut self,
651        progress: &CreateMviewProgress,
652        version_stats: &HummockVersionStats,
653    ) -> UpdateProgressResult {
654        tracing::trace!(?progress, "update progress");
655        let actor = progress.backfill_actor_id;
656        let job_id = self.job_id;
657
658        let new_state = if progress.done {
659            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
660        } else {
661            BackfillState::ConsumingUpstream(
662                progress.consumed_epoch.into(),
663                progress.consumed_rows,
664                progress.buffered_rows,
665            )
666        };
667
668        {
669            {
670                let progress_state = self;
671
672                let upstream_total_key_count: u64 =
673                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
674
675                tracing::debug!(%job_id, "updating progress for table");
676                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
677
678                if progress_state.is_done() {
679                    tracing::debug!(
680                        %job_id,
681                        "all actors done for creating mview!",
682                    );
683
684                    let PendingBackfillFragments {
685                        next_backfill_nodes,
686                        truncate_locality_provider_state_tables,
687                    } = pending;
688
689                    assert!(next_backfill_nodes.is_empty());
690                    UpdateProgressResult::Finished {
691                        truncate_locality_provider_state_tables,
692                    }
693                } else if !pending.next_backfill_nodes.is_empty()
694                    || !pending.truncate_locality_provider_state_tables.is_empty()
695                {
696                    UpdateProgressResult::BackfillNodeFinished(pending)
697                } else {
698                    UpdateProgressResult::None
699                }
700            }
701        }
702    }
703}
704
705fn calculate_total_key_count(
706    table_count: &HashMap<TableId, usize>,
707    version_stats: &HummockVersionStats,
708) -> u64 {
709    table_count
710        .iter()
711        .map(|(table_id, count)| {
712            assert_ne!(*count, 0);
713            *count as u64
714                * version_stats
715                    .table_stats
716                    .get(table_id)
717                    .map_or(0, |stat| stat.total_key_count as u64)
718        })
719        .sum()
720}