risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::{CreateType, ObjectId};
22use risingwave_pb::catalog::PbCreateType;
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::PbBarrierCompleteResponse;
26use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
27
28use crate::MetaResult;
29use crate::barrier::backfill_order_control::BackfillOrderState;
30use crate::barrier::info::BarrierInfo;
31use crate::barrier::{
32    Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
33};
34use crate::manager::MetadataManager;
35use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
36
37type ConsumedRows = u64;
38
39#[derive(Clone, Copy, Debug)]
40enum BackfillState {
41    Init,
42    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
43    Done(ConsumedRows),
44}
45
46/// Progress of all actors containing backfill executors while creating mview.
47#[derive(Debug)]
48pub(super) struct Progress {
49    // `states` and `done_count` decides whether the progress is done. See `is_done`.
50    states: HashMap<ActorId, BackfillState>,
51    backfill_order_state: BackfillOrderState,
52    done_count: usize,
53
54    /// Tells whether the backfill is from source or mv.
55    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
56
57    // The following row counts are used to calculate the progress. See `calculate_progress`.
58    /// Upstream mv count.
59    /// Keep track of how many times each upstream MV
60    /// appears in this stream job.
61    upstream_mv_count: HashMap<TableId, usize>,
62    /// Total key count of all the upstream materialized views
63    upstream_mvs_total_key_count: u64,
64    mv_backfill_consumed_rows: u64,
65    source_backfill_consumed_rows: u64,
66
67    /// DDL definition
68    definition: String,
69    /// Create type
70    create_type: CreateType,
71}
72
73impl Progress {
74    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
75    fn new(
76        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
77        upstream_mv_count: HashMap<TableId, usize>,
78        upstream_total_key_count: u64,
79        definition: String,
80        create_type: CreateType,
81        backfill_order_state: BackfillOrderState,
82    ) -> Self {
83        let mut states = HashMap::new();
84        let mut backfill_upstream_types = HashMap::new();
85        for (actor, backfill_upstream_type) in actors {
86            states.insert(actor, BackfillState::Init);
87            backfill_upstream_types.insert(actor, backfill_upstream_type);
88        }
89        assert!(!states.is_empty());
90
91        Self {
92            states,
93            backfill_upstream_types,
94            done_count: 0,
95            upstream_mv_count,
96            upstream_mvs_total_key_count: upstream_total_key_count,
97            mv_backfill_consumed_rows: 0,
98            source_backfill_consumed_rows: 0,
99            definition,
100            create_type,
101            backfill_order_state,
102        }
103    }
104
105    /// Update the progress of `actor`.
106    fn update(
107        &mut self,
108        actor: ActorId,
109        new_state: BackfillState,
110        upstream_total_key_count: u64,
111    ) -> Vec<FragmentId> {
112        let mut next_backfill_nodes = vec![];
113        self.upstream_mvs_total_key_count = upstream_total_key_count;
114        let total_actors = self.states.len();
115        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
116        tracing::debug!(?actor, states = ?self.states, "update progress for actor");
117
118        let mut old = 0;
119        let mut new = 0;
120        match self.states.remove(&actor).unwrap() {
121            BackfillState::Init => {}
122            BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
123                old = old_consumed_rows;
124            }
125            BackfillState::Done(_) => panic!("should not report done multiple times"),
126        };
127        match &new_state {
128            BackfillState::Init => {}
129            BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
130                new = *new_consumed_rows;
131            }
132            BackfillState::Done(new_consumed_rows) => {
133                tracing::debug!("actor {} done", actor);
134                new = *new_consumed_rows;
135                self.done_count += 1;
136                next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
137                tracing::debug!(
138                    "{} actors out of {} complete",
139                    self.done_count,
140                    total_actors,
141                );
142            }
143        };
144        debug_assert!(new >= old, "backfill progress should not go backward");
145        match backfill_upstream_type {
146            BackfillUpstreamType::MView => {
147                self.mv_backfill_consumed_rows += new - old;
148            }
149            BackfillUpstreamType::Source => {
150                self.source_backfill_consumed_rows += new - old;
151            }
152            BackfillUpstreamType::Values => {
153                // do not consider progress for values
154            }
155        }
156        self.states.insert(actor, new_state);
157        next_backfill_nodes
158    }
159
160    /// Returns whether all backfill executors are done.
161    fn is_done(&self) -> bool {
162        tracing::trace!(
163            "Progress::is_done? {}, {}, {:?}",
164            self.done_count,
165            self.states.len(),
166            self.states
167        );
168        self.done_count == self.states.len()
169    }
170
171    /// Returns the ids of all actors containing the backfill executors for the mview tracked by this
172    /// [`Progress`].
173    fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
174        self.states.keys().cloned()
175    }
176
177    /// `progress` = `consumed_rows` / `upstream_total_key_count`
178    fn calculate_progress(&self) -> String {
179        if self.is_done() || self.states.is_empty() {
180            return "100%".to_owned();
181        }
182        let mut mv_count = 0;
183        let mut source_count = 0;
184        for backfill_upstream_type in self.backfill_upstream_types.values() {
185            match backfill_upstream_type {
186                BackfillUpstreamType::MView => mv_count += 1,
187                BackfillUpstreamType::Source => source_count += 1,
188                BackfillUpstreamType::Values => (),
189            }
190        }
191
192        let mv_progress = (mv_count > 0).then_some({
193            if self.upstream_mvs_total_key_count == 0 {
194                "99.99%".to_owned()
195            } else {
196                let mut progress = self.mv_backfill_consumed_rows as f64
197                    / (self.upstream_mvs_total_key_count as f64);
198                if progress > 1.0 {
199                    progress = 0.9999;
200                }
201                format!(
202                    "{:.2}% ({}/{})",
203                    progress * 100.0,
204                    self.mv_backfill_consumed_rows,
205                    self.upstream_mvs_total_key_count
206                )
207            }
208        });
209        let source_progress = (source_count > 0).then_some(format!(
210            "{} rows consumed",
211            self.source_backfill_consumed_rows
212        ));
213        match (mv_progress, source_progress) {
214            (Some(mv_progress), Some(source_progress)) => {
215                format!(
216                    "MView Backfill: {}, Source Backfill: {}",
217                    mv_progress, source_progress
218                )
219            }
220            (Some(mv_progress), None) => mv_progress,
221            (None, Some(source_progress)) => source_progress,
222            (None, None) => "Unknown".to_owned(),
223        }
224    }
225}
226
227/// There are two kinds of `TrackingJobs`:
228/// 1. `New`. This refers to the "New" type of tracking job.
229///    It is instantiated and managed by the stream manager.
230///    On recovery, the stream manager will stop managing the job.
231/// 2. `Recovered`. This refers to the "Recovered" type of tracking job.
232///    On recovery, the barrier manager will recover and start managing the job.
233pub enum TrackingJob {
234    New(TrackingCommand),
235    Recovered(RecoveredTrackingJob),
236}
237
238impl std::fmt::Display for TrackingJob {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        match self {
241            TrackingJob::New(command) => write!(f, "{}", command.job_id),
242            TrackingJob::Recovered(recovered) => write!(f, "{}<recovered>", recovered.id),
243        }
244    }
245}
246
247impl TrackingJob {
248    /// Notify the metadata manager that the job is finished.
249    pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
250        match self {
251            TrackingJob::New(command) => {
252                metadata_manager
253                    .catalog_controller
254                    .finish_streaming_job(
255                        command.job_id.table_id as i32,
256                        command.replace_stream_job.clone(),
257                    )
258                    .await?;
259                Ok(())
260            }
261            TrackingJob::Recovered(recovered) => {
262                metadata_manager
263                    .catalog_controller
264                    .finish_streaming_job(recovered.id, None)
265                    .await?;
266                Ok(())
267            }
268        }
269    }
270
271    pub(crate) fn table_to_create(&self) -> TableId {
272        match self {
273            TrackingJob::New(command) => command.job_id,
274            TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
275        }
276    }
277}
278
279impl std::fmt::Debug for TrackingJob {
280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281        match self {
282            TrackingJob::New(command) => write!(f, "TrackingJob::New({:?})", command.job_id),
283            TrackingJob::Recovered(recovered) => {
284                write!(f, "TrackingJob::Recovered({:?})", recovered.id)
285            }
286        }
287    }
288}
289
290pub struct RecoveredTrackingJob {
291    pub id: ObjectId,
292}
293
294/// The command tracking by the [`CreateMviewProgressTracker`].
295pub(super) struct TrackingCommand {
296    pub job_id: TableId,
297    pub replace_stream_job: Option<ReplaceStreamJobPlan>,
298}
299
300pub(super) enum UpdateProgressResult {
301    None,
302    Finished(TrackingJob),
303    BackfillNodeFinished(Vec<FragmentId>),
304}
305
306/// Tracking is done as follows:
307/// 1. We identify a `StreamJob` by its `TableId` of its `Materialized` table.
308/// 2. For each stream job, there are several actors which run its tasks.
309/// 3. With `progress_map` we can use the ID of the `StreamJob` to view its progress.
310/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
311#[derive(Default, Debug)]
312pub(super) struct CreateMviewProgressTracker {
313    /// Progress of the create-mview DDL indicated by the `TableId`.
314    progress_map: HashMap<TableId, (Progress, TrackingJob)>,
315
316    actor_map: HashMap<ActorId, TableId>,
317
318    /// Stash of finished jobs. They will be finally finished on checkpoint.
319    pending_finished_jobs: Vec<TrackingJob>,
320
321    /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
322    pending_backfill_nodes: Vec<FragmentId>,
323}
324
325impl CreateMviewProgressTracker {
326    /// This step recovers state from the meta side:
327    /// 1. `Tables`.
328    /// 2. `TableFragments`.
329    ///
330    /// Other state are persisted by the `BackfillExecutor`, such as:
331    /// 1. `CreateMviewProgress`.
332    /// 2. `Backfill` position.
333    pub fn recover(
334        jobs: impl IntoIterator<Item = (TableId, (String, &StreamJobFragments, BackfillOrderState))>,
335        version_stats: &HummockVersionStats,
336    ) -> Self {
337        let mut actor_map = HashMap::new();
338        let mut progress_map = HashMap::new();
339        for (creating_table_id, (definition, table_fragments, backfill_order_state)) in jobs {
340            let mut states = HashMap::new();
341            let mut backfill_upstream_types = HashMap::new();
342            let actors = table_fragments.tracking_progress_actor_ids();
343            for (actor, backfill_upstream_type) in actors {
344                actor_map.insert(actor, creating_table_id);
345                states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
346                backfill_upstream_types.insert(actor, backfill_upstream_type);
347            }
348
349            let progress = Self::recover_progress(
350                states,
351                backfill_upstream_types,
352                table_fragments.upstream_table_counts(),
353                definition,
354                version_stats,
355                backfill_order_state,
356            );
357            let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob {
358                id: creating_table_id.table_id as i32,
359            });
360            progress_map.insert(creating_table_id, (progress, tracking_job));
361        }
362        Self {
363            progress_map,
364            actor_map,
365            pending_finished_jobs: Vec::new(),
366            pending_backfill_nodes: Vec::new(),
367        }
368    }
369
370    /// ## How recovery works
371    ///
372    /// The progress (number of rows consumed) is persisted in state tables.
373    /// During recovery, the backfill executor will restore the number of rows consumed,
374    /// and then it will just report progress like newly created executors.
375    fn recover_progress(
376        states: HashMap<ActorId, BackfillState>,
377        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
378        upstream_mv_count: HashMap<TableId, usize>,
379        definition: String,
380        version_stats: &HummockVersionStats,
381        backfill_order_state: BackfillOrderState,
382    ) -> Progress {
383        let upstream_mvs_total_key_count =
384            calculate_total_key_count(&upstream_mv_count, version_stats);
385        Progress {
386            states,
387            backfill_order_state,
388            backfill_upstream_types,
389            done_count: 0, // Fill only after first barrier pass
390            upstream_mv_count,
391            upstream_mvs_total_key_count,
392            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
393            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
394            definition,
395            create_type: CreateType::Background,
396        }
397    }
398
399    pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
400        self.progress_map
401            .iter()
402            .map(|(table_id, (x, _))| {
403                let table_id = table_id.table_id;
404                let ddl_progress = DdlProgress {
405                    id: table_id as u64,
406                    statement: x.definition.clone(),
407                    create_type: x.create_type.as_str().to_owned(),
408                    progress: x.calculate_progress(),
409                };
410                (table_id, ddl_progress)
411            })
412            .collect()
413    }
414
415    pub(super) fn update_tracking_jobs<'a>(
416        &mut self,
417        info: Option<(
418            &CreateStreamingJobCommandInfo,
419            Option<&ReplaceStreamJobPlan>,
420        )>,
421        create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
422        version_stats: &HummockVersionStats,
423    ) {
424        {
425            {
426                // Save `finished_commands` for Create MVs.
427                let finished_commands = {
428                    let mut commands = vec![];
429                    // Add the command to tracker.
430                    if let Some((create_job_info, replace_stream_job)) = info
431                        && let Some(command) =
432                            self.add(create_job_info, replace_stream_job, version_stats)
433                    {
434                        // Those with no actors to track can be finished immediately.
435                        commands.push(command);
436                    }
437                    // Update the progress of all commands.
438                    for progress in create_mview_progress {
439                        // Those with actors complete can be finished immediately.
440                        match self.update(progress, version_stats) {
441                            UpdateProgressResult::None => {
442                                tracing::trace!(?progress, "update progress");
443                            }
444                            UpdateProgressResult::Finished(command) => {
445                                tracing::trace!(?progress, "finish progress");
446                                commands.push(command);
447                            }
448                            UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) => {
449                                tracing::trace!(
450                                    ?progress,
451                                    ?next_backfill_nodes,
452                                    "start next backfill node"
453                                );
454                                self.queue_backfill(next_backfill_nodes);
455                            }
456                        }
457                    }
458                    commands
459                };
460
461                for command in finished_commands {
462                    self.stash_command_to_finish(command);
463                }
464            }
465        }
466    }
467
468    /// Apply a collected epoch node command to the tracker
469    /// Return the finished jobs when the barrier kind is `Checkpoint`
470    pub(super) fn apply_collected_command(
471        &mut self,
472        command: Option<&Command>,
473        barrier_info: &BarrierInfo,
474        resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
475        version_stats: &HummockVersionStats,
476    ) -> Vec<TrackingJob> {
477        let new_tracking_job_info =
478            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
479                match job_type {
480                    CreateStreamingJobType::Normal => Some((info, None)),
481                    CreateStreamingJobType::SinkIntoTable(replace_stream_job) => {
482                        Some((info, Some(replace_stream_job)))
483                    }
484                    CreateStreamingJobType::SnapshotBackfill(_) => {
485                        // The progress of SnapshotBackfill won't be tracked here
486                        None
487                    }
488                }
489            } else {
490                None
491            };
492        self.update_tracking_jobs(
493            new_tracking_job_info,
494            resps
495                .into_iter()
496                .flat_map(|resp| resp.create_mview_progress.iter()),
497            version_stats,
498        );
499        for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
500            // the cancelled command is possibly stashed in `finished_commands` and waiting
501            // for checkpoint, we should also clear it.
502            self.cancel_command(table_id);
503        }
504        if barrier_info.kind.is_checkpoint() {
505            self.take_finished_jobs()
506        } else {
507            vec![]
508        }
509    }
510
511    /// Stash a command to finish later.
512    pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
513        self.pending_finished_jobs.push(finished_job);
514    }
515
516    fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
517        self.pending_backfill_nodes.extend(backfill_nodes);
518    }
519
520    /// Finish stashed jobs on checkpoint.
521    pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
522        tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
523        take(&mut self.pending_finished_jobs)
524    }
525
526    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
527        take(&mut self.pending_backfill_nodes)
528    }
529
530    pub(super) fn has_pending_finished_jobs(&self) -> bool {
531        !self.pending_finished_jobs.is_empty()
532    }
533
534    pub(super) fn cancel_command(&mut self, id: TableId) {
535        let _ = self.progress_map.remove(&id);
536        self.pending_finished_jobs
537            .retain(|x| x.table_to_create() != id);
538        self.actor_map.retain(|_, table_id| *table_id != id);
539    }
540
541    /// Notify all tracked commands that error encountered and clear them.
542    pub fn abort_all(&mut self) {
543        self.actor_map.clear();
544        self.pending_finished_jobs.clear();
545        self.progress_map.clear();
546    }
547
548    /// Add a new create-mview DDL command to track.
549    ///
550    /// If the actors to track are empty, return the given command as it can be finished immediately.
551    pub fn add(
552        &mut self,
553        info: &CreateStreamingJobCommandInfo,
554        replace_stream_job: Option<&ReplaceStreamJobPlan>,
555        version_stats: &HummockVersionStats,
556    ) -> Option<TrackingJob> {
557        tracing::trace!(?info, "add job to track");
558        let (info, actors, replace_table_info) = {
559            let CreateStreamingJobCommandInfo {
560                stream_job_fragments,
561                ..
562            } = info;
563            let actors = stream_job_fragments.tracking_progress_actor_ids();
564            if actors.is_empty() {
565                // The command can be finished immediately.
566                return Some(TrackingJob::New(TrackingCommand {
567                    job_id: info.stream_job_fragments.stream_job_id,
568                    replace_stream_job: replace_stream_job.cloned(),
569                }));
570            }
571            (info.clone(), actors, replace_stream_job.cloned())
572        };
573
574        let CreateStreamingJobCommandInfo {
575            stream_job_fragments: table_fragments,
576            definition,
577            create_type,
578            fragment_backfill_ordering,
579            streaming_job,
580            ..
581        } = info;
582
583        let creating_job_id = table_fragments.stream_job_id();
584        let upstream_mv_count = table_fragments.upstream_table_counts();
585        let upstream_total_key_count: u64 =
586            calculate_total_key_count(&upstream_mv_count, version_stats);
587
588        for (actor, _backfill_upstream_type) in &actors {
589            self.actor_map.insert(*actor, creating_job_id);
590        }
591
592        let backfill_order_state =
593            BackfillOrderState::new(fragment_backfill_ordering, &table_fragments);
594        let progress = Progress::new(
595            actors,
596            upstream_mv_count,
597            upstream_total_key_count,
598            definition.clone(),
599            create_type.into(),
600            backfill_order_state,
601        );
602        if create_type == PbCreateType::Background && streaming_job.is_sink_into_table() {
603            // We return the original tracking job immediately.
604            // This is because sink can be decoupled with backfill progress.
605            // We don't need to wait for sink to finish backfill.
606            // This still contains the notifiers, so we can tell listeners
607            // that the sink job has been created.
608            // TODO(August): unify background notification for sink into table.
609            Some(TrackingJob::New(TrackingCommand {
610                job_id: creating_job_id,
611                replace_stream_job: replace_table_info,
612            }))
613        } else {
614            let old = self.progress_map.insert(
615                creating_job_id,
616                (
617                    progress,
618                    TrackingJob::New(TrackingCommand {
619                        job_id: creating_job_id,
620                        replace_stream_job: replace_table_info,
621                    }),
622                ),
623            );
624            assert!(old.is_none());
625            None
626        }
627    }
628
629    /// Update the progress of `actor` according to the Pb struct.
630    ///
631    /// If all actors in this MV have finished, return the command.
632    pub fn update(
633        &mut self,
634        progress: &CreateMviewProgress,
635        version_stats: &HummockVersionStats,
636    ) -> UpdateProgressResult {
637        tracing::trace!(?progress, "update progress");
638        let actor = progress.backfill_actor_id;
639        let Some(table_id) = self.actor_map.get(&actor).copied() else {
640            // On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
641            // even if backfill is finished on recovery.
642            // This is because we don't know if only this actor is finished,
643            // OR the entire stream job is finished.
644            // For the first case, we must notify meta.
645            // For the second case, we can still notify meta, but ignore it here.
646            tracing::info!(
647                "no tracked progress for actor {}, the stream job could already be finished",
648                actor
649            );
650            return UpdateProgressResult::None;
651        };
652
653        let new_state = if progress.done {
654            BackfillState::Done(progress.consumed_rows)
655        } else {
656            BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
657        };
658
659        match self.progress_map.entry(table_id) {
660            Entry::Occupied(mut o) => {
661                let progress = &mut o.get_mut().0;
662
663                let upstream_total_key_count: u64 =
664                    calculate_total_key_count(&progress.upstream_mv_count, version_stats);
665
666                tracing::debug!(?table_id, "updating progress for table");
667                let next_backfill_nodes =
668                    progress.update(actor, new_state, upstream_total_key_count);
669
670                if progress.is_done() {
671                    tracing::debug!(
672                        "all actors done for creating mview with table_id {}!",
673                        table_id
674                    );
675
676                    // Clean-up the mapping from actors to DDL table_id.
677                    for actor in o.get().0.actors() {
678                        self.actor_map.remove(&actor);
679                    }
680                    assert!(next_backfill_nodes.is_empty());
681                    UpdateProgressResult::Finished(o.remove().1)
682                } else if !next_backfill_nodes.is_empty() {
683                    tracing::debug!("scheduling next backfill nodes: {:?}", next_backfill_nodes);
684                    UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes)
685                } else {
686                    UpdateProgressResult::None
687                }
688            }
689            Entry::Vacant(_) => {
690                tracing::warn!(
691                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
692                );
693                UpdateProgressResult::None
694            }
695        }
696    }
697}
698
699fn calculate_total_key_count(
700    table_count: &HashMap<TableId, usize>,
701    version_stats: &HummockVersionStats,
702) -> u64 {
703    table_count
704        .iter()
705        .map(|(table_id, count)| {
706            assert_ne!(*count, 0);
707            *count as u64
708                * version_stats
709                    .table_stats
710                    .get(&table_id.table_id)
711                    .map_or(0, |stat| stat.total_key_count as u64)
712        })
713        .sum()
714}