risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::CreateType;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
25
26use crate::MetaResult;
27use crate::barrier::CreateStreamingJobCommandInfo;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::InflightStreamingJobInfo;
30use crate::controller::fragment::InflightFragmentInfo;
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40    Init,
41    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42    Done(ConsumedRows, BufferedRows),
43}
44
45/// Represents the backfill nodes that need to be scheduled or cleaned up.
46#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48    /// Fragment IDs that should start backfilling in the next checkpoint
49    pub next_backfill_nodes: Vec<FragmentId>,
50    /// State tables of locality provider fragments that should be truncated
51    pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54/// Progress of all actors containing backfill executors while creating mview.
55#[derive(Debug)]
56pub(super) struct Progress {
57    job_id: JobId,
58    // `states` and `done_count` decides whether the progress is done. See `is_done`.
59    states: HashMap<ActorId, BackfillState>,
60    backfill_order_state: BackfillOrderState,
61    done_count: usize,
62
63    /// Tells whether the backfill is from source or mv.
64    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66    // The following row counts are used to calculate the progress. See `calculate_progress`.
67    /// Upstream mv count.
68    /// Keep track of how many times each upstream MV
69    /// appears in this stream job.
70    upstream_mv_count: HashMap<TableId, usize>,
71    /// Total key count of all the upstream materialized views
72    upstream_mvs_total_key_count: u64,
73    mv_backfill_consumed_rows: u64,
74    source_backfill_consumed_rows: u64,
75    /// Buffered rows (for locality backfill) that are yet to be consumed
76    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
77    mv_backfill_buffered_rows: u64,
78}
79
80impl Progress {
81    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
82    fn new(
83        job_id: JobId,
84        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
85        upstream_mv_count: HashMap<TableId, usize>,
86        upstream_total_key_count: u64,
87        backfill_order_state: BackfillOrderState,
88    ) -> Self {
89        let mut states = HashMap::new();
90        let mut backfill_upstream_types = HashMap::new();
91        for (actor, backfill_upstream_type) in actors {
92            states.insert(actor, BackfillState::Init);
93            backfill_upstream_types.insert(actor, backfill_upstream_type);
94        }
95        assert!(!states.is_empty());
96
97        Self {
98            job_id,
99            states,
100            backfill_upstream_types,
101            done_count: 0,
102            upstream_mv_count,
103            upstream_mvs_total_key_count: upstream_total_key_count,
104            mv_backfill_consumed_rows: 0,
105            source_backfill_consumed_rows: 0,
106            mv_backfill_buffered_rows: 0,
107            backfill_order_state,
108        }
109    }
110
111    /// Update the progress of `actor`.
112    /// Returns the backfill fragments that need to be scheduled or cleaned up.
113    fn update(
114        &mut self,
115        actor: ActorId,
116        new_state: BackfillState,
117        upstream_total_key_count: u64,
118    ) -> PendingBackfillFragments {
119        let mut result = PendingBackfillFragments::default();
120        self.upstream_mvs_total_key_count = upstream_total_key_count;
121        let total_actors = self.states.len();
122        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
123
124        let mut old_consumed_row = 0;
125        let mut new_consumed_row = 0;
126        let mut old_buffered_row = 0;
127        let mut new_buffered_row = 0;
128        match self.states.remove(&actor).unwrap() {
129            BackfillState::Init => {}
130            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
131                old_consumed_row = consumed_rows;
132                old_buffered_row = buffered_rows;
133            }
134            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
135        };
136        match &new_state {
137            BackfillState::Init => {}
138            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
139                new_consumed_row = *consumed_rows;
140                new_buffered_row = *buffered_rows;
141            }
142            BackfillState::Done(consumed_rows, buffered_rows) => {
143                tracing::debug!("actor {} done", actor);
144                new_consumed_row = *consumed_rows;
145                new_buffered_row = *buffered_rows;
146                self.done_count += 1;
147                let before_backfill_nodes = self
148                    .backfill_order_state
149                    .current_backfill_node_fragment_ids();
150                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
151                let after_backfill_nodes = self
152                    .backfill_order_state
153                    .current_backfill_node_fragment_ids();
154                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
155                let last_backfill_nodes_iter = before_backfill_nodes
156                    .into_iter()
157                    .filter(|x| !after_backfill_nodes.contains(x));
158                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
159                    .filter_map(|fragment_id| {
160                        self.backfill_order_state
161                            .get_locality_fragment_state_table_mapping()
162                            .get(&fragment_id)
163                    })
164                    .flatten()
165                    .copied()
166                    .collect();
167                tracing::debug!(
168                    "{} actors out of {} complete",
169                    self.done_count,
170                    total_actors,
171                );
172            }
173        };
174        debug_assert!(
175            new_consumed_row >= old_consumed_row,
176            "backfill progress should not go backward"
177        );
178        debug_assert!(
179            new_buffered_row >= old_buffered_row,
180            "backfill progress should not go backward"
181        );
182        match backfill_upstream_type {
183            BackfillUpstreamType::MView => {
184                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
185            }
186            BackfillUpstreamType::Source => {
187                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
188            }
189            BackfillUpstreamType::Values => {
190                // do not consider progress for values
191            }
192            BackfillUpstreamType::LocalityProvider => {
193                // Track LocalityProvider progress similar to MView
194                // Update buffered rows for precise progress calculation
195                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
196                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
197            }
198        }
199        self.states.insert(actor, new_state);
200        result
201    }
202
203    /// Returns whether all backfill executors are done.
204    fn is_done(&self) -> bool {
205        tracing::trace!(
206            "Progress::is_done? {}, {}, {:?}",
207            self.done_count,
208            self.states.len(),
209            self.states
210        );
211        self.done_count == self.states.len()
212    }
213
214    /// `progress` = `consumed_rows` / `upstream_total_key_count`
215    fn calculate_progress(&self) -> String {
216        if self.is_done() || self.states.is_empty() {
217            return "100%".to_owned();
218        }
219        let mut mv_count = 0;
220        let mut source_count = 0;
221        for backfill_upstream_type in self.backfill_upstream_types.values() {
222            match backfill_upstream_type {
223                BackfillUpstreamType::MView => mv_count += 1,
224                BackfillUpstreamType::Source => source_count += 1,
225                BackfillUpstreamType::Values => (),
226                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
227            }
228        }
229
230        let mv_progress = (mv_count > 0).then_some({
231            // Include buffered rows in total for precise progress calculation
232            // Progress = consumed / (upstream_total + buffered)
233            let total_rows_to_consume =
234                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
235            if total_rows_to_consume == 0 {
236                "99.99%".to_owned()
237            } else {
238                let mut progress =
239                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
240                if progress > 1.0 {
241                    progress = 0.9999;
242                }
243                format!(
244                    "{:.2}% ({}/{})",
245                    progress * 100.0,
246                    self.mv_backfill_consumed_rows,
247                    total_rows_to_consume
248                )
249            }
250        });
251        let source_progress = (source_count > 0).then_some(format!(
252            "{} rows consumed",
253            self.source_backfill_consumed_rows
254        ));
255        match (mv_progress, source_progress) {
256            (Some(mv_progress), Some(source_progress)) => {
257                format!(
258                    "MView Backfill: {}, Source Backfill: {}",
259                    mv_progress, source_progress
260                )
261            }
262            (Some(mv_progress), None) => mv_progress,
263            (None, Some(source_progress)) => source_progress,
264            (None, None) => "Unknown".to_owned(),
265        }
266    }
267}
268
269/// There are two kinds of `TrackingJobs`:
270/// 1. if `is_recovered` is false, it is a "New" tracking job.
271///    It is instantiated and managed by the stream manager.
272///    On recovery, the stream manager will stop managing the job.
273/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
274///    On recovery, the barrier manager will recover and start managing the job.
275pub struct TrackingJob {
276    job_id: JobId,
277    is_recovered: bool,
278    source_change: Option<SourceChange>,
279}
280
281impl std::fmt::Display for TrackingJob {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        write!(
284            f,
285            "{}{}",
286            self.job_id,
287            if self.is_recovered { "<recovered>" } else { "" }
288        )
289    }
290}
291
292impl TrackingJob {
293    /// Create a new tracking job.
294    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
295        Self {
296            job_id: stream_job_fragments.stream_job_id,
297            is_recovered: false,
298            source_change: Some(SourceChange::CreateJobFinished {
299                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
300            }),
301        }
302    }
303
304    /// Create a recovered tracking job.
305    pub(crate) fn recovered(
306        job_id: JobId,
307        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
308    ) -> Self {
309        let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
310            fragment_infos
311                .iter()
312                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
313        );
314        let source_change = if source_backfill_fragments.is_empty() {
315            None
316        } else {
317            Some(SourceChange::CreateJobFinished {
318                finished_backfill_fragments: source_backfill_fragments,
319            })
320        };
321        Self {
322            job_id,
323            is_recovered: true,
324            source_change,
325        }
326    }
327
328    pub(crate) fn job_id(&self) -> JobId {
329        self.job_id
330    }
331
332    /// Notify the metadata manager that the job is finished.
333    pub(crate) async fn finish(
334        self,
335        metadata_manager: &MetadataManager,
336        source_manager: &SourceManagerRef,
337    ) -> MetaResult<()> {
338        metadata_manager
339            .catalog_controller
340            .finish_streaming_job(self.job_id)
341            .await?;
342        if let Some(source_change) = self.source_change {
343            source_manager.apply_source_change(source_change).await;
344        }
345        Ok(())
346    }
347}
348
349impl std::fmt::Debug for TrackingJob {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        if !self.is_recovered {
352            write!(f, "TrackingJob::New({})", self.job_id)
353        } else {
354            write!(f, "TrackingJob::Recovered({})", self.job_id)
355        }
356    }
357}
358
359/// Information collected during barrier completion that needs to be committed.
360#[derive(Debug, Default)]
361pub(super) struct StagingCommitInfo {
362    /// Finished jobs that should be committed
363    pub finished_jobs: Vec<TrackingJob>,
364    /// Table IDs whose locality provider state tables need to be truncated
365    pub table_ids_to_truncate: Vec<TableId>,
366}
367
368pub(super) enum UpdateProgressResult {
369    None,
370    /// The finished job, along with its pending backfill fragments for cleanup.
371    Finished {
372        truncate_locality_provider_state_tables: Vec<TableId>,
373    },
374    /// Backfill nodes have finished and new ones need to be scheduled.
375    BackfillNodeFinished(PendingBackfillFragments),
376}
377
378#[derive(Debug)]
379pub(super) struct CreateMviewProgressTracker {
380    job_id: JobId,
381    definition: String,
382    create_type: CreateType,
383    tracking_job: TrackingJob,
384    status: CreateMviewStatus,
385}
386
387#[derive(Debug)]
388enum CreateMviewStatus {
389    Backfilling {
390        /// Progress of the create-mview DDL.
391        progress: Progress,
392
393        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
394        pending_backfill_nodes: Vec<FragmentId>,
395
396        /// Table IDs whose locality provider state tables need to be truncated
397        table_ids_to_truncate: Vec<TableId>,
398    },
399    Finished {
400        table_ids_to_truncate: Vec<TableId>,
401    },
402}
403
404impl CreateMviewProgressTracker {
405    pub fn recover(
406        creating_job_id: JobId,
407        definition: String,
408        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
409        backfill_order_state: BackfillOrderState,
410        version_stats: &HummockVersionStats,
411    ) -> Self {
412        {
413            let create_type = CreateType::Background;
414            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
415            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
416            let status = if actors.is_empty() {
417                CreateMviewStatus::Finished {
418                    table_ids_to_truncate: vec![],
419                }
420            } else {
421                let mut states = HashMap::new();
422                let mut backfill_upstream_types = HashMap::new();
423
424                for (actor, backfill_upstream_type) in actors {
425                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
426                    backfill_upstream_types.insert(actor, backfill_upstream_type);
427                }
428
429                let progress = Self::recover_progress(
430                    creating_job_id,
431                    states,
432                    backfill_upstream_types,
433                    StreamJobFragments::upstream_table_counts_impl(
434                        fragment_infos.values().map(|fragment| &fragment.nodes),
435                    ),
436                    version_stats,
437                    backfill_order_state,
438                );
439                let pending_backfill_nodes = progress
440                    .backfill_order_state
441                    .current_backfill_node_fragment_ids();
442                CreateMviewStatus::Backfilling {
443                    progress,
444                    pending_backfill_nodes,
445                    table_ids_to_truncate: vec![],
446                }
447            };
448            Self {
449                job_id: creating_job_id,
450                definition,
451                create_type,
452                tracking_job,
453                status,
454            }
455        }
456    }
457
458    /// ## How recovery works
459    ///
460    /// The progress (number of rows consumed) is persisted in state tables.
461    /// During recovery, the backfill executor will restore the number of rows consumed,
462    /// and then it will just report progress like newly created executors.
463    fn recover_progress(
464        job_id: JobId,
465        states: HashMap<ActorId, BackfillState>,
466        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
467        upstream_mv_count: HashMap<TableId, usize>,
468        version_stats: &HummockVersionStats,
469        backfill_order_state: BackfillOrderState,
470    ) -> Progress {
471        let upstream_mvs_total_key_count =
472            calculate_total_key_count(&upstream_mv_count, version_stats);
473        Progress {
474            job_id,
475            states,
476            backfill_order_state,
477            backfill_upstream_types,
478            done_count: 0, // Fill only after first barrier pass
479            upstream_mv_count,
480            upstream_mvs_total_key_count,
481            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
482            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
483            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
484        }
485    }
486
487    pub fn gen_ddl_progress(&self) -> DdlProgress {
488        let progress = match &self.status {
489            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
490            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
491        };
492        DdlProgress {
493            id: self.job_id.as_raw_id() as u64,
494            statement: self.definition.clone(),
495            create_type: self.create_type.as_str().to_owned(),
496            progress,
497        }
498    }
499
500    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
501    /// Return the table ids whose locality provider state tables need to be truncated.
502    pub(super) fn apply_progress(
503        &mut self,
504        create_mview_progress: &CreateMviewProgress,
505        version_stats: &HummockVersionStats,
506    ) {
507        let CreateMviewStatus::Backfilling {
508            progress,
509            pending_backfill_nodes,
510            table_ids_to_truncate,
511        } = &mut self.status
512        else {
513            tracing::warn!(
514                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
515            );
516            return;
517        };
518        {
519            // Update the progress of all commands.
520            {
521                // Those with actors complete can be finished immediately.
522                match progress.apply(create_mview_progress, version_stats) {
523                    UpdateProgressResult::None => {
524                        tracing::trace!(?progress, "update progress");
525                    }
526                    UpdateProgressResult::Finished {
527                        truncate_locality_provider_state_tables,
528                    } => {
529                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
530                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
531                        tracing::trace!(?progress, "finish progress");
532                        self.status = CreateMviewStatus::Finished {
533                            table_ids_to_truncate,
534                        };
535                    }
536                    UpdateProgressResult::BackfillNodeFinished(pending) => {
537                        table_ids_to_truncate
538                            .extend(pending.truncate_locality_provider_state_tables.clone());
539                        tracing::trace!(
540                            ?progress,
541                            next_backfill_nodes = ?pending.next_backfill_nodes,
542                            "start next backfill node"
543                        );
544                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
545                    }
546                }
547            }
548        }
549    }
550
551    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
552        match &mut self.status {
553            CreateMviewStatus::Backfilling {
554                pending_backfill_nodes,
555                ..
556            } => Some(pending_backfill_nodes.drain(..)),
557            CreateMviewStatus::Finished { .. } => None,
558        }
559        .into_iter()
560        .flatten()
561    }
562
563    pub(super) fn collect_staging_commit_info(
564        &mut self,
565    ) -> (bool, impl Iterator<Item = TableId> + '_) {
566        let (is_finished, table_ids) = match &mut self.status {
567            CreateMviewStatus::Backfilling {
568                table_ids_to_truncate,
569                ..
570            } => (false, table_ids_to_truncate),
571            CreateMviewStatus::Finished {
572                table_ids_to_truncate,
573                ..
574            } => (true, table_ids_to_truncate),
575        };
576        (is_finished, table_ids.drain(..))
577    }
578
579    pub(super) fn is_finished(&self) -> bool {
580        matches!(self.status, CreateMviewStatus::Finished { .. })
581    }
582
583    pub(super) fn into_tracking_job(self) -> TrackingJob {
584        let CreateMviewStatus::Finished { .. } = self.status else {
585            panic!("should be called when finished");
586        };
587        self.tracking_job
588    }
589
590    /// Add a new create-mview DDL command to track.
591    ///
592    /// If the actors to track are empty, return the given command as it can be finished immediately.
593    pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
594        tracing::trace!(?info, "add job to track");
595        let CreateStreamingJobCommandInfo {
596            stream_job_fragments,
597            definition,
598            create_type,
599            fragment_backfill_ordering,
600            locality_fragment_state_table_mapping,
601            ..
602        } = info;
603        let job_id = stream_job_fragments.stream_job_id();
604        let definition = definition.clone();
605        let create_type = (*create_type).into();
606        let actors = stream_job_fragments.tracking_progress_actor_ids();
607        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
608        if actors.is_empty() {
609            // The command can be finished immediately.
610            return Self {
611                job_id,
612                definition,
613                create_type,
614                tracking_job,
615                status: CreateMviewStatus::Finished {
616                    table_ids_to_truncate: vec![],
617                },
618            };
619        }
620
621        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
622        let upstream_total_key_count: u64 =
623            calculate_total_key_count(&upstream_mv_count, version_stats);
624
625        let backfill_order_state = BackfillOrderState::new(
626            fragment_backfill_ordering,
627            stream_job_fragments,
628            locality_fragment_state_table_mapping.clone(),
629        );
630        let progress = Progress::new(
631            job_id,
632            actors,
633            upstream_mv_count,
634            upstream_total_key_count,
635            backfill_order_state,
636        );
637        let pending_backfill_nodes = progress
638            .backfill_order_state
639            .current_backfill_node_fragment_ids();
640        Self {
641            job_id,
642            definition,
643            create_type,
644            tracking_job,
645            status: CreateMviewStatus::Backfilling {
646                progress,
647                pending_backfill_nodes,
648                table_ids_to_truncate: vec![],
649            },
650        }
651    }
652}
653
654impl Progress {
655    /// Update the progress of `actor` according to the Pb struct.
656    ///
657    /// If all actors in this MV have finished, return the command.
658    fn apply(
659        &mut self,
660        progress: &CreateMviewProgress,
661        version_stats: &HummockVersionStats,
662    ) -> UpdateProgressResult {
663        tracing::trace!(?progress, "update progress");
664        let actor = progress.backfill_actor_id;
665        let job_id = self.job_id;
666
667        let new_state = if progress.done {
668            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
669        } else {
670            BackfillState::ConsumingUpstream(
671                progress.consumed_epoch.into(),
672                progress.consumed_rows,
673                progress.buffered_rows,
674            )
675        };
676
677        {
678            {
679                let progress_state = self;
680
681                let upstream_total_key_count: u64 =
682                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
683
684                tracing::trace!(%job_id, "updating progress for table");
685                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
686
687                if progress_state.is_done() {
688                    tracing::debug!(
689                        %job_id,
690                        "all actors done for creating mview!",
691                    );
692
693                    let PendingBackfillFragments {
694                        next_backfill_nodes,
695                        truncate_locality_provider_state_tables,
696                    } = pending;
697
698                    assert!(next_backfill_nodes.is_empty());
699                    UpdateProgressResult::Finished {
700                        truncate_locality_provider_state_tables,
701                    }
702                } else if !pending.next_backfill_nodes.is_empty()
703                    || !pending.truncate_locality_provider_state_tables.is_empty()
704                {
705                    UpdateProgressResult::BackfillNodeFinished(pending)
706                } else {
707                    UpdateProgressResult::None
708                }
709            }
710        }
711    }
712}
713
714fn calculate_total_key_count(
715    table_count: &HashMap<TableId, usize>,
716    version_stats: &HummockVersionStats,
717) -> u64 {
718    table_count
719        .iter()
720        .map(|(table_id, count)| {
721            assert_ne!(*count, 0);
722            *count as u64
723                * version_stats
724                    .table_stats
725                    .get(table_id)
726                    .map_or(0, |stat| stat.total_key_count as u64)
727        })
728        .sum()
729}