risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::CreateType;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
25
26use crate::MetaResult;
27use crate::barrier::CreateStreamingJobCommandInfo;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::InflightStreamingJobInfo;
30use crate::controller::fragment::InflightFragmentInfo;
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40    Init,
41    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42    Done(ConsumedRows, BufferedRows),
43}
44
45/// Represents the backfill nodes that need to be scheduled or cleaned up.
46#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48    /// Fragment IDs that should start backfilling in the next checkpoint
49    pub next_backfill_nodes: Vec<FragmentId>,
50    /// State tables of locality provider fragments that should be truncated
51    pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54/// Progress of all actors containing backfill executors while creating mview.
55#[derive(Debug)]
56pub(super) struct Progress {
57    job_id: JobId,
58    // `states` and `done_count` decides whether the progress is done. See `is_done`.
59    states: HashMap<ActorId, BackfillState>,
60    backfill_order_state: BackfillOrderState,
61    done_count: usize,
62
63    /// Tells whether the backfill is from source or mv.
64    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66    // The following row counts are used to calculate the progress. See `calculate_progress`.
67    /// Upstream mv count.
68    /// Keep track of how many times each upstream MV
69    /// appears in this stream job.
70    upstream_mv_count: HashMap<TableId, usize>,
71    /// Total key count of all the upstream materialized views
72    upstream_mvs_total_key_count: u64,
73    mv_backfill_consumed_rows: u64,
74    source_backfill_consumed_rows: u64,
75    /// Buffered rows (for locality backfill) that are yet to be consumed
76    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
77    mv_backfill_buffered_rows: u64,
78}
79
80impl Progress {
81    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
82    fn new(
83        job_id: JobId,
84        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
85        upstream_mv_count: HashMap<TableId, usize>,
86        upstream_total_key_count: u64,
87        backfill_order_state: BackfillOrderState,
88    ) -> Self {
89        let mut states = HashMap::new();
90        let mut backfill_upstream_types = HashMap::new();
91        for (actor, backfill_upstream_type) in actors {
92            states.insert(actor, BackfillState::Init);
93            backfill_upstream_types.insert(actor, backfill_upstream_type);
94        }
95        assert!(!states.is_empty());
96
97        Self {
98            job_id,
99            states,
100            backfill_upstream_types,
101            done_count: 0,
102            upstream_mv_count,
103            upstream_mvs_total_key_count: upstream_total_key_count,
104            mv_backfill_consumed_rows: 0,
105            source_backfill_consumed_rows: 0,
106            mv_backfill_buffered_rows: 0,
107            backfill_order_state,
108        }
109    }
110
111    /// Update the progress of `actor`.
112    /// Returns the backfill fragments that need to be scheduled or cleaned up.
113    fn update(
114        &mut self,
115        actor: ActorId,
116        new_state: BackfillState,
117        upstream_total_key_count: u64,
118    ) -> PendingBackfillFragments {
119        let mut result = PendingBackfillFragments::default();
120        self.upstream_mvs_total_key_count = upstream_total_key_count;
121        let total_actors = self.states.len();
122        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
123
124        let mut old_consumed_row = 0;
125        let mut new_consumed_row = 0;
126        let mut old_buffered_row = 0;
127        let mut new_buffered_row = 0;
128        match self.states.remove(&actor).unwrap() {
129            BackfillState::Init => {}
130            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
131                old_consumed_row = consumed_rows;
132                old_buffered_row = buffered_rows;
133            }
134            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
135        };
136        match &new_state {
137            BackfillState::Init => {}
138            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
139                new_consumed_row = *consumed_rows;
140                new_buffered_row = *buffered_rows;
141            }
142            BackfillState::Done(consumed_rows, buffered_rows) => {
143                tracing::debug!("actor {} done", actor);
144                new_consumed_row = *consumed_rows;
145                new_buffered_row = *buffered_rows;
146                self.done_count += 1;
147                let before_backfill_nodes = self
148                    .backfill_order_state
149                    .current_backfill_node_fragment_ids();
150                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
151                let after_backfill_nodes = self
152                    .backfill_order_state
153                    .current_backfill_node_fragment_ids();
154                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
155                let last_backfill_nodes_iter = before_backfill_nodes
156                    .into_iter()
157                    .filter(|x| !after_backfill_nodes.contains(x));
158                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
159                    .filter_map(|fragment_id| {
160                        self.backfill_order_state
161                            .get_locality_fragment_state_table_mapping()
162                            .get(&fragment_id)
163                    })
164                    .flatten()
165                    .copied()
166                    .collect();
167                tracing::debug!(
168                    "{} actors out of {} complete",
169                    self.done_count,
170                    total_actors,
171                );
172            }
173        };
174        debug_assert!(
175            new_consumed_row >= old_consumed_row,
176            "backfill progress should not go backward"
177        );
178        debug_assert!(
179            new_buffered_row >= old_buffered_row,
180            "backfill progress should not go backward"
181        );
182        match backfill_upstream_type {
183            BackfillUpstreamType::MView => {
184                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
185            }
186            BackfillUpstreamType::Source => {
187                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
188            }
189            BackfillUpstreamType::Values => {
190                // do not consider progress for values
191            }
192            BackfillUpstreamType::LocalityProvider => {
193                // Track LocalityProvider progress similar to MView
194                // Update buffered rows for precise progress calculation
195                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
196                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
197            }
198        }
199        self.states.insert(actor, new_state);
200        result
201    }
202
203    /// Returns whether all backfill executors are done.
204    fn is_done(&self) -> bool {
205        tracing::trace!(
206            "Progress::is_done? {}, {}, {:?}",
207            self.done_count,
208            self.states.len(),
209            self.states
210        );
211        self.done_count == self.states.len()
212    }
213
214    /// `progress` = `consumed_rows` / `upstream_total_key_count`
215    fn calculate_progress(&self) -> String {
216        if self.is_done() || self.states.is_empty() {
217            return "100%".to_owned();
218        }
219        let mut mv_count = 0;
220        let mut source_count = 0;
221        for backfill_upstream_type in self.backfill_upstream_types.values() {
222            match backfill_upstream_type {
223                BackfillUpstreamType::MView => mv_count += 1,
224                BackfillUpstreamType::Source => source_count += 1,
225                BackfillUpstreamType::Values => (),
226                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
227            }
228        }
229
230        let mv_progress = (mv_count > 0).then_some({
231            // Include buffered rows in total for precise progress calculation
232            // Progress = consumed / (upstream_total + buffered)
233            let total_rows_to_consume =
234                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
235            if total_rows_to_consume == 0 {
236                "99.99%".to_owned()
237            } else {
238                let mut progress =
239                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
240                if progress > 1.0 {
241                    progress = 0.9999;
242                }
243                format!(
244                    "{:.2}% ({}/{})",
245                    progress * 100.0,
246                    self.mv_backfill_consumed_rows,
247                    total_rows_to_consume
248                )
249            }
250        });
251        let source_progress = (source_count > 0).then_some(format!(
252            "{} rows consumed",
253            self.source_backfill_consumed_rows
254        ));
255        match (mv_progress, source_progress) {
256            (Some(mv_progress), Some(source_progress)) => {
257                format!(
258                    "MView Backfill: {}, Source Backfill: {}",
259                    mv_progress, source_progress
260                )
261            }
262            (Some(mv_progress), None) => mv_progress,
263            (None, Some(source_progress)) => source_progress,
264            (None, None) => "Unknown".to_owned(),
265        }
266    }
267}
268
269/// There are two kinds of `TrackingJobs`:
270/// 1. if `is_recovered` is false, it is a "New" tracking job.
271///    It is instantiated and managed by the stream manager.
272///    On recovery, the stream manager will stop managing the job.
273/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
274///    On recovery, the barrier manager will recover and start managing the job.
275pub struct TrackingJob {
276    job_id: JobId,
277    is_recovered: bool,
278    source_change: Option<SourceChange>,
279}
280
281impl std::fmt::Display for TrackingJob {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        write!(
284            f,
285            "{}{}",
286            self.job_id,
287            if self.is_recovered { "<recovered>" } else { "" }
288        )
289    }
290}
291
292impl TrackingJob {
293    /// Create a new tracking job.
294    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
295        Self {
296            job_id: stream_job_fragments.stream_job_id,
297            is_recovered: false,
298            source_change: Some(SourceChange::CreateJobFinished {
299                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
300            }),
301        }
302    }
303
304    /// Create a recovered tracking job.
305    pub(crate) fn recovered(
306        job_id: JobId,
307        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
308    ) -> Self {
309        let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
310            fragment_infos
311                .iter()
312                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
313        );
314        let source_change = if source_backfill_fragments.is_empty() {
315            None
316        } else {
317            Some(SourceChange::CreateJobFinished {
318                finished_backfill_fragments: source_backfill_fragments,
319            })
320        };
321        Self {
322            job_id,
323            is_recovered: true,
324            source_change,
325        }
326    }
327
328    /// Notify the metadata manager that the job is finished.
329    pub(crate) async fn finish(
330        self,
331        metadata_manager: &MetadataManager,
332        source_manager: &SourceManagerRef,
333    ) -> MetaResult<()> {
334        metadata_manager
335            .catalog_controller
336            .finish_streaming_job(self.job_id)
337            .await?;
338        if let Some(source_change) = self.source_change {
339            source_manager.apply_source_change(source_change).await;
340        }
341        Ok(())
342    }
343}
344
345impl std::fmt::Debug for TrackingJob {
346    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347        if !self.is_recovered {
348            write!(f, "TrackingJob::New({})", self.job_id)
349        } else {
350            write!(f, "TrackingJob::Recovered({})", self.job_id)
351        }
352    }
353}
354
355/// Information collected during barrier completion that needs to be committed.
356#[derive(Debug, Default)]
357pub(super) struct StagingCommitInfo {
358    /// Finished jobs that should be committed
359    pub finished_jobs: Vec<TrackingJob>,
360    /// Table IDs whose locality provider state tables need to be truncated
361    pub table_ids_to_truncate: Vec<TableId>,
362}
363
364pub(super) enum UpdateProgressResult {
365    None,
366    /// The finished job, along with its pending backfill fragments for cleanup.
367    Finished {
368        truncate_locality_provider_state_tables: Vec<TableId>,
369    },
370    /// Backfill nodes have finished and new ones need to be scheduled.
371    BackfillNodeFinished(PendingBackfillFragments),
372}
373
374#[derive(Debug)]
375pub(super) struct CreateMviewProgressTracker {
376    job_id: JobId,
377    definition: String,
378    create_type: CreateType,
379    tracking_job: TrackingJob,
380    status: CreateMviewStatus,
381}
382
383#[derive(Debug)]
384enum CreateMviewStatus {
385    Backfilling {
386        /// Progress of the create-mview DDL.
387        progress: Progress,
388
389        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
390        pending_backfill_nodes: Vec<FragmentId>,
391
392        /// Table IDs whose locality provider state tables need to be truncated
393        table_ids_to_truncate: Vec<TableId>,
394    },
395    Finished {
396        table_ids_to_truncate: Vec<TableId>,
397    },
398}
399
400impl CreateMviewProgressTracker {
401    pub fn recover(
402        creating_job_id: JobId,
403        definition: String,
404        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
405        backfill_order_state: BackfillOrderState,
406        version_stats: &HummockVersionStats,
407    ) -> Self {
408        {
409            let create_type = CreateType::Background;
410            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
411            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
412            let status = if actors.is_empty() {
413                CreateMviewStatus::Finished {
414                    table_ids_to_truncate: vec![],
415                }
416            } else {
417                let mut states = HashMap::new();
418                let mut backfill_upstream_types = HashMap::new();
419
420                for (actor, backfill_upstream_type) in actors {
421                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
422                    backfill_upstream_types.insert(actor, backfill_upstream_type);
423                }
424
425                let progress = Self::recover_progress(
426                    creating_job_id,
427                    states,
428                    backfill_upstream_types,
429                    StreamJobFragments::upstream_table_counts_impl(
430                        fragment_infos.values().map(|fragment| &fragment.nodes),
431                    ),
432                    version_stats,
433                    backfill_order_state,
434                );
435                let pending_backfill_nodes = progress
436                    .backfill_order_state
437                    .current_backfill_node_fragment_ids();
438                CreateMviewStatus::Backfilling {
439                    progress,
440                    pending_backfill_nodes,
441                    table_ids_to_truncate: vec![],
442                }
443            };
444            Self {
445                job_id: creating_job_id,
446                definition,
447                create_type,
448                tracking_job,
449                status,
450            }
451        }
452    }
453
454    /// ## How recovery works
455    ///
456    /// The progress (number of rows consumed) is persisted in state tables.
457    /// During recovery, the backfill executor will restore the number of rows consumed,
458    /// and then it will just report progress like newly created executors.
459    fn recover_progress(
460        job_id: JobId,
461        states: HashMap<ActorId, BackfillState>,
462        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
463        upstream_mv_count: HashMap<TableId, usize>,
464        version_stats: &HummockVersionStats,
465        backfill_order_state: BackfillOrderState,
466    ) -> Progress {
467        let upstream_mvs_total_key_count =
468            calculate_total_key_count(&upstream_mv_count, version_stats);
469        Progress {
470            job_id,
471            states,
472            backfill_order_state,
473            backfill_upstream_types,
474            done_count: 0, // Fill only after first barrier pass
475            upstream_mv_count,
476            upstream_mvs_total_key_count,
477            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
478            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
479            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
480        }
481    }
482
483    pub fn gen_ddl_progress(&self) -> DdlProgress {
484        let progress = match &self.status {
485            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
486            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
487        };
488        DdlProgress {
489            id: self.job_id.as_raw_id() as u64,
490            statement: self.definition.clone(),
491            create_type: self.create_type.as_str().to_owned(),
492            progress,
493        }
494    }
495
496    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
497    /// Return the table ids whose locality provider state tables need to be truncated.
498    pub(super) fn apply_progress(
499        &mut self,
500        create_mview_progress: &CreateMviewProgress,
501        version_stats: &HummockVersionStats,
502    ) {
503        let CreateMviewStatus::Backfilling {
504            progress,
505            pending_backfill_nodes,
506            table_ids_to_truncate,
507        } = &mut self.status
508        else {
509            tracing::warn!(
510                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
511            );
512            return;
513        };
514        {
515            // Update the progress of all commands.
516            {
517                // Those with actors complete can be finished immediately.
518                match progress.apply(create_mview_progress, version_stats) {
519                    UpdateProgressResult::None => {
520                        tracing::trace!(?progress, "update progress");
521                    }
522                    UpdateProgressResult::Finished {
523                        truncate_locality_provider_state_tables,
524                    } => {
525                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
526                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
527                        tracing::trace!(?progress, "finish progress");
528                        self.status = CreateMviewStatus::Finished {
529                            table_ids_to_truncate,
530                        };
531                    }
532                    UpdateProgressResult::BackfillNodeFinished(pending) => {
533                        table_ids_to_truncate
534                            .extend(pending.truncate_locality_provider_state_tables.clone());
535                        tracing::trace!(
536                            ?progress,
537                            next_backfill_nodes = ?pending.next_backfill_nodes,
538                            "start next backfill node"
539                        );
540                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
541                    }
542                }
543            }
544        }
545    }
546
547    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
548        match &mut self.status {
549            CreateMviewStatus::Backfilling {
550                pending_backfill_nodes,
551                ..
552            } => Some(pending_backfill_nodes.drain(..)),
553            CreateMviewStatus::Finished { .. } => None,
554        }
555        .into_iter()
556        .flatten()
557    }
558
559    pub(super) fn collect_staging_commit_info(
560        &mut self,
561    ) -> (bool, impl Iterator<Item = TableId> + '_) {
562        let (is_finished, table_ids) = match &mut self.status {
563            CreateMviewStatus::Backfilling {
564                table_ids_to_truncate,
565                ..
566            } => (false, table_ids_to_truncate),
567            CreateMviewStatus::Finished {
568                table_ids_to_truncate,
569                ..
570            } => (true, table_ids_to_truncate),
571        };
572        (is_finished, table_ids.drain(..))
573    }
574
575    pub(super) fn is_finished(&self) -> bool {
576        matches!(self.status, CreateMviewStatus::Finished { .. })
577    }
578
579    pub(super) fn into_tracking_job(self) -> TrackingJob {
580        let CreateMviewStatus::Finished { .. } = self.status else {
581            panic!("should be called when finished");
582        };
583        self.tracking_job
584    }
585
586    /// Add a new create-mview DDL command to track.
587    ///
588    /// If the actors to track are empty, return the given command as it can be finished immediately.
589    pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
590        tracing::trace!(?info, "add job to track");
591        let CreateStreamingJobCommandInfo {
592            stream_job_fragments,
593            definition,
594            create_type,
595            fragment_backfill_ordering,
596            locality_fragment_state_table_mapping,
597            ..
598        } = info;
599        let job_id = stream_job_fragments.stream_job_id();
600        let definition = definition.clone();
601        let create_type = (*create_type).into();
602        let actors = stream_job_fragments.tracking_progress_actor_ids();
603        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
604        if actors.is_empty() {
605            // The command can be finished immediately.
606            return Self {
607                job_id,
608                definition,
609                create_type,
610                tracking_job,
611                status: CreateMviewStatus::Finished {
612                    table_ids_to_truncate: vec![],
613                },
614            };
615        }
616
617        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
618        let upstream_total_key_count: u64 =
619            calculate_total_key_count(&upstream_mv_count, version_stats);
620
621        let backfill_order_state = BackfillOrderState::new(
622            fragment_backfill_ordering,
623            stream_job_fragments,
624            locality_fragment_state_table_mapping.clone(),
625        );
626        let progress = Progress::new(
627            job_id,
628            actors,
629            upstream_mv_count,
630            upstream_total_key_count,
631            backfill_order_state,
632        );
633        let pending_backfill_nodes = progress
634            .backfill_order_state
635            .current_backfill_node_fragment_ids();
636        Self {
637            job_id,
638            definition,
639            create_type,
640            tracking_job,
641            status: CreateMviewStatus::Backfilling {
642                progress,
643                pending_backfill_nodes,
644                table_ids_to_truncate: vec![],
645            },
646        }
647    }
648}
649
650impl Progress {
651    /// Update the progress of `actor` according to the Pb struct.
652    ///
653    /// If all actors in this MV have finished, return the command.
654    fn apply(
655        &mut self,
656        progress: &CreateMviewProgress,
657        version_stats: &HummockVersionStats,
658    ) -> UpdateProgressResult {
659        tracing::trace!(?progress, "update progress");
660        let actor = progress.backfill_actor_id;
661        let job_id = self.job_id;
662
663        let new_state = if progress.done {
664            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
665        } else {
666            BackfillState::ConsumingUpstream(
667                progress.consumed_epoch.into(),
668                progress.consumed_rows,
669                progress.buffered_rows,
670            )
671        };
672
673        {
674            {
675                let progress_state = self;
676
677                let upstream_total_key_count: u64 =
678                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
679
680                tracing::debug!(%job_id, "updating progress for table");
681                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
682
683                if progress_state.is_done() {
684                    tracing::debug!(
685                        %job_id,
686                        "all actors done for creating mview!",
687                    );
688
689                    let PendingBackfillFragments {
690                        next_backfill_nodes,
691                        truncate_locality_provider_state_tables,
692                    } = pending;
693
694                    assert!(next_backfill_nodes.is_empty());
695                    UpdateProgressResult::Finished {
696                        truncate_locality_provider_state_tables,
697                    }
698                } else if !pending.next_backfill_nodes.is_empty()
699                    || !pending.truncate_locality_provider_state_tables.is_empty()
700                {
701                    UpdateProgressResult::BackfillNodeFinished(pending)
702                } else {
703                    UpdateProgressResult::None
704                }
705            }
706        }
707    }
708}
709
710fn calculate_total_key_count(
711    table_count: &HashMap<TableId, usize>,
712    version_stats: &HummockVersionStats,
713) -> u64 {
714    table_count
715        .iter()
716        .map(|(table_id, count)| {
717            assert_ne!(*count, 0);
718            *count as u64
719                * version_stats
720                    .table_stats
721                    .get(table_id)
722                    .map_or(0, |stat| stat.total_key_count as u64)
723        })
724        .sum()
725}