risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::CreateType;
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
25
26use crate::MetaResult;
27use crate::barrier::CreateStreamingJobCommandInfo;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::InflightStreamingJobInfo;
30use crate::controller::fragment::InflightFragmentInfo;
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40    Init,
41    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42    Done(ConsumedRows, BufferedRows),
43}
44
45/// Represents the backfill nodes that need to be scheduled or cleaned up.
46#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48    /// Fragment IDs that should start backfilling in the next checkpoint
49    pub next_backfill_nodes: Vec<FragmentId>,
50    /// State tables of locality provider fragments that should be truncated
51    pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54/// Progress of all actors containing backfill executors while creating mview.
55#[derive(Debug)]
56pub(super) struct Progress {
57    job_id: JobId,
58    // `states` and `done_count` decides whether the progress is done. See `is_done`.
59    states: HashMap<ActorId, BackfillState>,
60    backfill_order_state: BackfillOrderState,
61    done_count: usize,
62
63    /// Tells whether the backfill is from source or mv.
64    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66    // The following row counts are used to calculate the progress. See `calculate_progress`.
67    /// Upstream mv count.
68    /// Keep track of how many times each upstream MV
69    /// appears in this stream job.
70    upstream_mv_count: HashMap<TableId, usize>,
71    /// Total key count of all the upstream materialized views
72    upstream_mvs_total_key_count: u64,
73    mv_backfill_consumed_rows: u64,
74    source_backfill_consumed_rows: u64,
75    /// Buffered rows (for locality backfill) that are yet to be consumed
76    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
77    mv_backfill_buffered_rows: u64,
78}
79
80impl Progress {
81    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
82    fn new(
83        job_id: JobId,
84        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
85        upstream_mv_count: HashMap<TableId, usize>,
86        upstream_total_key_count: u64,
87        backfill_order_state: BackfillOrderState,
88    ) -> Self {
89        let mut states = HashMap::new();
90        let mut backfill_upstream_types = HashMap::new();
91        for (actor, backfill_upstream_type) in actors {
92            states.insert(actor, BackfillState::Init);
93            backfill_upstream_types.insert(actor, backfill_upstream_type);
94        }
95        assert!(!states.is_empty());
96
97        Self {
98            job_id,
99            states,
100            backfill_upstream_types,
101            done_count: 0,
102            upstream_mv_count,
103            upstream_mvs_total_key_count: upstream_total_key_count,
104            mv_backfill_consumed_rows: 0,
105            source_backfill_consumed_rows: 0,
106            mv_backfill_buffered_rows: 0,
107            backfill_order_state,
108        }
109    }
110
111    /// Update the progress of `actor`.
112    /// Returns the backfill fragments that need to be scheduled or cleaned up.
113    fn update(
114        &mut self,
115        actor: ActorId,
116        new_state: BackfillState,
117        upstream_total_key_count: u64,
118    ) -> PendingBackfillFragments {
119        let mut result = PendingBackfillFragments::default();
120        self.upstream_mvs_total_key_count = upstream_total_key_count;
121        let total_actors = self.states.len();
122        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
123
124        let mut old_consumed_row = 0;
125        let mut new_consumed_row = 0;
126        let mut old_buffered_row = 0;
127        let mut new_buffered_row = 0;
128        match self.states.remove(&actor).unwrap() {
129            BackfillState::Init => {}
130            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
131                old_consumed_row = consumed_rows;
132                old_buffered_row = buffered_rows;
133            }
134            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
135        };
136        match &new_state {
137            BackfillState::Init => {}
138            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
139                new_consumed_row = *consumed_rows;
140                new_buffered_row = *buffered_rows;
141            }
142            BackfillState::Done(consumed_rows, buffered_rows) => {
143                tracing::debug!("actor {} done", actor);
144                new_consumed_row = *consumed_rows;
145                new_buffered_row = *buffered_rows;
146                self.done_count += 1;
147                let before_backfill_nodes = self
148                    .backfill_order_state
149                    .current_backfill_node_fragment_ids();
150                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
151                let after_backfill_nodes = self
152                    .backfill_order_state
153                    .current_backfill_node_fragment_ids();
154                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
155                let last_backfill_nodes_iter = before_backfill_nodes
156                    .into_iter()
157                    .filter(|x| !after_backfill_nodes.contains(x));
158                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
159                    .filter_map(|fragment_id| {
160                        self.backfill_order_state
161                            .get_locality_fragment_state_table_mapping()
162                            .get(&fragment_id)
163                    })
164                    .flatten()
165                    .copied()
166                    .collect();
167                tracing::debug!(
168                    "{} actors out of {} complete",
169                    self.done_count,
170                    total_actors,
171                );
172            }
173        };
174        debug_assert!(
175            new_consumed_row >= old_consumed_row,
176            "backfill progress should not go backward"
177        );
178        debug_assert!(
179            new_buffered_row >= old_buffered_row,
180            "backfill progress should not go backward"
181        );
182        match backfill_upstream_type {
183            BackfillUpstreamType::MView => {
184                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
185            }
186            BackfillUpstreamType::Source => {
187                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
188            }
189            BackfillUpstreamType::Values => {
190                // do not consider progress for values
191            }
192            BackfillUpstreamType::LocalityProvider => {
193                // Track LocalityProvider progress similar to MView
194                // Update buffered rows for precise progress calculation
195                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
196                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
197            }
198        }
199        self.states.insert(actor, new_state);
200        result
201    }
202
203    /// Returns whether all backfill executors are done.
204    fn is_done(&self) -> bool {
205        tracing::trace!(
206            "Progress::is_done? {}, {}, {:?}",
207            self.done_count,
208            self.states.len(),
209            self.states
210        );
211        self.done_count == self.states.len()
212    }
213
214    /// `progress` = `consumed_rows` / `upstream_total_key_count`
215    fn calculate_progress(&self) -> String {
216        if self.is_done() || self.states.is_empty() {
217            return "100%".to_owned();
218        }
219        let mut mv_count = 0;
220        let mut source_count = 0;
221        for backfill_upstream_type in self.backfill_upstream_types.values() {
222            match backfill_upstream_type {
223                BackfillUpstreamType::MView => mv_count += 1,
224                BackfillUpstreamType::Source => source_count += 1,
225                BackfillUpstreamType::Values => (),
226                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
227            }
228        }
229
230        let mv_progress = (mv_count > 0).then_some({
231            // Include buffered rows in total for precise progress calculation
232            // Progress = consumed / (upstream_total + buffered)
233            let total_rows_to_consume =
234                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
235            if total_rows_to_consume == 0 {
236                "99.99%".to_owned()
237            } else {
238                let mut progress =
239                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
240                if progress > 1.0 {
241                    progress = 0.9999;
242                }
243                format!(
244                    "{:.2}% ({}/{})",
245                    progress * 100.0,
246                    self.mv_backfill_consumed_rows,
247                    total_rows_to_consume
248                )
249            }
250        });
251        let source_progress = (source_count > 0).then_some(format!(
252            "{} rows consumed",
253            self.source_backfill_consumed_rows
254        ));
255        match (mv_progress, source_progress) {
256            (Some(mv_progress), Some(source_progress)) => {
257                format!(
258                    "MView Backfill: {}, Source Backfill: {}",
259                    mv_progress, source_progress
260                )
261            }
262            (Some(mv_progress), None) => mv_progress,
263            (None, Some(source_progress)) => source_progress,
264            (None, None) => "Unknown".to_owned(),
265        }
266    }
267}
268
269/// There are two kinds of `TrackingJobs`:
270/// 1. if `is_recovered` is false, it is a "New" tracking job.
271///    It is instantiated and managed by the stream manager.
272///    On recovery, the stream manager will stop managing the job.
273/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
274///    On recovery, the barrier manager will recover and start managing the job.
275pub struct TrackingJob {
276    job_id: JobId,
277    is_recovered: bool,
278    source_change: Option<SourceChange>,
279}
280
281impl std::fmt::Display for TrackingJob {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        write!(
284            f,
285            "{}{}",
286            self.job_id,
287            if self.is_recovered { "<recovered>" } else { "" }
288        )
289    }
290}
291
292impl TrackingJob {
293    /// Create a new tracking job.
294    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
295        Self {
296            job_id: stream_job_fragments.stream_job_id,
297            is_recovered: false,
298            source_change: Some(SourceChange::CreateJobFinished {
299                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
300            }),
301        }
302    }
303
304    /// Create a recovered tracking job.
305    pub(crate) fn recovered(
306        job_id: JobId,
307        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
308    ) -> Self {
309        let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
310            fragment_infos
311                .iter()
312                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
313        );
314        let source_change = if source_backfill_fragments.is_empty() {
315            None
316        } else {
317            Some(SourceChange::CreateJobFinished {
318                finished_backfill_fragments: source_backfill_fragments,
319            })
320        };
321        Self {
322            job_id,
323            is_recovered: true,
324            source_change,
325        }
326    }
327
328    pub(crate) fn job_id(&self) -> JobId {
329        self.job_id
330    }
331
332    /// Notify the metadata manager that the job is finished.
333    pub(crate) async fn finish(
334        self,
335        metadata_manager: &MetadataManager,
336        source_manager: &SourceManagerRef,
337    ) -> MetaResult<()> {
338        metadata_manager
339            .catalog_controller
340            .finish_streaming_job(self.job_id)
341            .await?;
342        if let Some(source_change) = self.source_change {
343            source_manager.apply_source_change(source_change).await;
344        }
345        Ok(())
346    }
347}
348
349impl std::fmt::Debug for TrackingJob {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        if !self.is_recovered {
352            write!(f, "TrackingJob::New({})", self.job_id)
353        } else {
354            write!(f, "TrackingJob::Recovered({})", self.job_id)
355        }
356    }
357}
358
359/// Information collected during barrier completion that needs to be committed.
360#[derive(Debug, Default)]
361pub(super) struct StagingCommitInfo {
362    /// Finished jobs that should be committed
363    pub finished_jobs: Vec<TrackingJob>,
364    /// Table IDs whose locality provider state tables need to be truncated
365    pub table_ids_to_truncate: Vec<TableId>,
366    pub finished_cdc_table_backfill: Vec<JobId>,
367}
368
369pub(super) enum UpdateProgressResult {
370    None,
371    /// The finished job, along with its pending backfill fragments for cleanup.
372    Finished {
373        truncate_locality_provider_state_tables: Vec<TableId>,
374    },
375    /// Backfill nodes have finished and new ones need to be scheduled.
376    BackfillNodeFinished(PendingBackfillFragments),
377}
378
379#[derive(Debug)]
380pub(super) struct CreateMviewProgressTracker {
381    job_id: JobId,
382    definition: String,
383    create_type: CreateType,
384    tracking_job: TrackingJob,
385    status: CreateMviewStatus,
386}
387
388#[derive(Debug)]
389enum CreateMviewStatus {
390    Backfilling {
391        /// Progress of the create-mview DDL.
392        progress: Progress,
393
394        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
395        pending_backfill_nodes: Vec<FragmentId>,
396
397        /// Table IDs whose locality provider state tables need to be truncated
398        table_ids_to_truncate: Vec<TableId>,
399    },
400    Finished {
401        table_ids_to_truncate: Vec<TableId>,
402    },
403}
404
405impl CreateMviewProgressTracker {
406    pub fn recover(
407        creating_job_id: JobId,
408        definition: String,
409        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
410        backfill_order_state: BackfillOrderState,
411        version_stats: &HummockVersionStats,
412    ) -> Self {
413        {
414            let create_type = CreateType::Background;
415            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
416            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
417            let status = if actors.is_empty() {
418                CreateMviewStatus::Finished {
419                    table_ids_to_truncate: vec![],
420                }
421            } else {
422                let mut states = HashMap::new();
423                let mut backfill_upstream_types = HashMap::new();
424
425                for (actor, backfill_upstream_type) in actors {
426                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
427                    backfill_upstream_types.insert(actor, backfill_upstream_type);
428                }
429
430                let progress = Self::recover_progress(
431                    creating_job_id,
432                    states,
433                    backfill_upstream_types,
434                    StreamJobFragments::upstream_table_counts_impl(
435                        fragment_infos.values().map(|fragment| &fragment.nodes),
436                    ),
437                    version_stats,
438                    backfill_order_state,
439                );
440                let pending_backfill_nodes = progress
441                    .backfill_order_state
442                    .current_backfill_node_fragment_ids();
443                CreateMviewStatus::Backfilling {
444                    progress,
445                    pending_backfill_nodes,
446                    table_ids_to_truncate: vec![],
447                }
448            };
449            Self {
450                job_id: creating_job_id,
451                definition,
452                create_type,
453                tracking_job,
454                status,
455            }
456        }
457    }
458
459    /// ## How recovery works
460    ///
461    /// The progress (number of rows consumed) is persisted in state tables.
462    /// During recovery, the backfill executor will restore the number of rows consumed,
463    /// and then it will just report progress like newly created executors.
464    fn recover_progress(
465        job_id: JobId,
466        states: HashMap<ActorId, BackfillState>,
467        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
468        upstream_mv_count: HashMap<TableId, usize>,
469        version_stats: &HummockVersionStats,
470        backfill_order_state: BackfillOrderState,
471    ) -> Progress {
472        let upstream_mvs_total_key_count =
473            calculate_total_key_count(&upstream_mv_count, version_stats);
474        Progress {
475            job_id,
476            states,
477            backfill_order_state,
478            backfill_upstream_types,
479            done_count: 0, // Fill only after first barrier pass
480            upstream_mv_count,
481            upstream_mvs_total_key_count,
482            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
483            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
484            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
485        }
486    }
487
488    pub fn gen_ddl_progress(&self) -> DdlProgress {
489        let progress = match &self.status {
490            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
491            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
492        };
493        DdlProgress {
494            id: self.job_id.as_raw_id() as u64,
495            statement: self.definition.clone(),
496            create_type: self.create_type.as_str().to_owned(),
497            progress,
498        }
499    }
500
501    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
502    /// Return the table ids whose locality provider state tables need to be truncated.
503    pub(super) fn apply_progress(
504        &mut self,
505        create_mview_progress: &CreateMviewProgress,
506        version_stats: &HummockVersionStats,
507    ) {
508        let CreateMviewStatus::Backfilling {
509            progress,
510            pending_backfill_nodes,
511            table_ids_to_truncate,
512        } = &mut self.status
513        else {
514            tracing::warn!(
515                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
516            );
517            return;
518        };
519        {
520            // Update the progress of all commands.
521            {
522                // Those with actors complete can be finished immediately.
523                match progress.apply(create_mview_progress, version_stats) {
524                    UpdateProgressResult::None => {
525                        tracing::trace!(?progress, "update progress");
526                    }
527                    UpdateProgressResult::Finished {
528                        truncate_locality_provider_state_tables,
529                    } => {
530                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
531                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
532                        tracing::trace!(?progress, "finish progress");
533                        self.status = CreateMviewStatus::Finished {
534                            table_ids_to_truncate,
535                        };
536                    }
537                    UpdateProgressResult::BackfillNodeFinished(pending) => {
538                        table_ids_to_truncate
539                            .extend(pending.truncate_locality_provider_state_tables.clone());
540                        tracing::trace!(
541                            ?progress,
542                            next_backfill_nodes = ?pending.next_backfill_nodes,
543                            "start next backfill node"
544                        );
545                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
546                    }
547                }
548            }
549        }
550    }
551
552    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
553        match &mut self.status {
554            CreateMviewStatus::Backfilling {
555                pending_backfill_nodes,
556                ..
557            } => Some(pending_backfill_nodes.drain(..)),
558            CreateMviewStatus::Finished { .. } => None,
559        }
560        .into_iter()
561        .flatten()
562    }
563
564    pub(super) fn collect_staging_commit_info(
565        &mut self,
566    ) -> (bool, impl Iterator<Item = TableId> + '_) {
567        let (is_finished, table_ids) = match &mut self.status {
568            CreateMviewStatus::Backfilling {
569                table_ids_to_truncate,
570                ..
571            } => (false, table_ids_to_truncate),
572            CreateMviewStatus::Finished {
573                table_ids_to_truncate,
574                ..
575            } => (true, table_ids_to_truncate),
576        };
577        (is_finished, table_ids.drain(..))
578    }
579
580    pub(super) fn is_finished(&self) -> bool {
581        matches!(self.status, CreateMviewStatus::Finished { .. })
582    }
583
584    pub(super) fn into_tracking_job(self) -> TrackingJob {
585        let CreateMviewStatus::Finished { .. } = self.status else {
586            panic!("should be called when finished");
587        };
588        self.tracking_job
589    }
590
591    /// Add a new create-mview DDL command to track.
592    ///
593    /// If the actors to track are empty, return the given command as it can be finished immediately.
594    pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
595        tracing::trace!(?info, "add job to track");
596        let CreateStreamingJobCommandInfo {
597            stream_job_fragments,
598            definition,
599            create_type,
600            fragment_backfill_ordering,
601            locality_fragment_state_table_mapping,
602            ..
603        } = info;
604        let job_id = stream_job_fragments.stream_job_id();
605        let definition = definition.clone();
606        let create_type = (*create_type).into();
607        let actors = stream_job_fragments.tracking_progress_actor_ids();
608        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
609        if actors.is_empty() {
610            // The command can be finished immediately.
611            return Self {
612                job_id,
613                definition,
614                create_type,
615                tracking_job,
616                status: CreateMviewStatus::Finished {
617                    table_ids_to_truncate: vec![],
618                },
619            };
620        }
621
622        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
623        let upstream_total_key_count: u64 =
624            calculate_total_key_count(&upstream_mv_count, version_stats);
625
626        let backfill_order_state = BackfillOrderState::new(
627            fragment_backfill_ordering,
628            stream_job_fragments,
629            locality_fragment_state_table_mapping.clone(),
630        );
631        let progress = Progress::new(
632            job_id,
633            actors,
634            upstream_mv_count,
635            upstream_total_key_count,
636            backfill_order_state,
637        );
638        let pending_backfill_nodes = progress
639            .backfill_order_state
640            .current_backfill_node_fragment_ids();
641        Self {
642            job_id,
643            definition,
644            create_type,
645            tracking_job,
646            status: CreateMviewStatus::Backfilling {
647                progress,
648                pending_backfill_nodes,
649                table_ids_to_truncate: vec![],
650            },
651        }
652    }
653}
654
655impl Progress {
656    /// Update the progress of `actor` according to the Pb struct.
657    ///
658    /// If all actors in this MV have finished, return the command.
659    fn apply(
660        &mut self,
661        progress: &CreateMviewProgress,
662        version_stats: &HummockVersionStats,
663    ) -> UpdateProgressResult {
664        tracing::trace!(?progress, "update progress");
665        let actor = progress.backfill_actor_id;
666        let job_id = self.job_id;
667
668        let new_state = if progress.done {
669            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
670        } else {
671            BackfillState::ConsumingUpstream(
672                progress.consumed_epoch.into(),
673                progress.consumed_rows,
674                progress.buffered_rows,
675            )
676        };
677
678        {
679            {
680                let progress_state = self;
681
682                let upstream_total_key_count: u64 =
683                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
684
685                tracing::trace!(%job_id, "updating progress for table");
686                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
687
688                if progress_state.is_done() {
689                    tracing::debug!(
690                        %job_id,
691                        "all actors done for creating mview!",
692                    );
693
694                    let PendingBackfillFragments {
695                        next_backfill_nodes,
696                        truncate_locality_provider_state_tables,
697                    } = pending;
698
699                    assert!(next_backfill_nodes.is_empty());
700                    UpdateProgressResult::Finished {
701                        truncate_locality_provider_state_tables,
702                    }
703                } else if !pending.next_backfill_nodes.is_empty()
704                    || !pending.truncate_locality_provider_state_tables.is_empty()
705                {
706                    UpdateProgressResult::BackfillNodeFinished(pending)
707                } else {
708                    UpdateProgressResult::None
709                }
710            }
711        }
712    }
713}
714
715fn calculate_total_key_count(
716    table_count: &HashMap<TableId, usize>,
717    version_stats: &HummockVersionStats,
718) -> u64 {
719    table_count
720        .iter()
721        .map(|(table_id, count)| {
722            assert_ne!(*count, 0);
723            *count as u64
724                * version_stats
725                    .table_stats
726                    .get(table_id)
727                    .map_or(0, |stat| stat.total_key_count as u64)
728        })
729        .sum()
730}