risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::{CreateType, ObjectId};
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::PbBarrierCompleteResponse;
25use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
26
27use crate::MetaResult;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
30use crate::barrier::{Command, CreateStreamingJobCommandInfo, CreateStreamingJobType};
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40    Init,
41    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42    Done(ConsumedRows, BufferedRows),
43}
44
45/// Represents the backfill nodes that need to be scheduled or cleaned up.
46#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48    /// Fragment IDs that should start backfilling in the next checkpoint
49    pub next_backfill_nodes: Vec<FragmentId>,
50    /// State tables of locality provider fragments that should be truncated
51    pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54/// Progress of all actors containing backfill executors while creating mview.
55#[derive(Debug)]
56pub(super) struct Progress {
57    // `states` and `done_count` decides whether the progress is done. See `is_done`.
58    states: HashMap<ActorId, BackfillState>,
59    backfill_order_state: BackfillOrderState,
60    done_count: usize,
61
62    /// Tells whether the backfill is from source or mv.
63    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
64
65    // The following row counts are used to calculate the progress. See `calculate_progress`.
66    /// Upstream mv count.
67    /// Keep track of how many times each upstream MV
68    /// appears in this stream job.
69    upstream_mv_count: HashMap<TableId, usize>,
70    /// Total key count of all the upstream materialized views
71    upstream_mvs_total_key_count: u64,
72    mv_backfill_consumed_rows: u64,
73    source_backfill_consumed_rows: u64,
74    /// Buffered rows (for locality backfill) that are yet to be consumed
75    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
76    mv_backfill_buffered_rows: u64,
77
78    /// DDL definition
79    definition: String,
80    /// Create type
81    create_type: CreateType,
82}
83
84impl Progress {
85    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
86    fn new(
87        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
88        upstream_mv_count: HashMap<TableId, usize>,
89        upstream_total_key_count: u64,
90        definition: String,
91        create_type: CreateType,
92        backfill_order_state: BackfillOrderState,
93    ) -> Self {
94        let mut states = HashMap::new();
95        let mut backfill_upstream_types = HashMap::new();
96        for (actor, backfill_upstream_type) in actors {
97            states.insert(actor, BackfillState::Init);
98            backfill_upstream_types.insert(actor, backfill_upstream_type);
99        }
100        assert!(!states.is_empty());
101
102        Self {
103            states,
104            backfill_upstream_types,
105            done_count: 0,
106            upstream_mv_count,
107            upstream_mvs_total_key_count: upstream_total_key_count,
108            mv_backfill_consumed_rows: 0,
109            source_backfill_consumed_rows: 0,
110            mv_backfill_buffered_rows: 0,
111            definition,
112            create_type,
113            backfill_order_state,
114        }
115    }
116
117    /// Update the progress of `actor`.
118    /// Returns the backfill fragments that need to be scheduled or cleaned up.
119    fn update(
120        &mut self,
121        actor: ActorId,
122        new_state: BackfillState,
123        upstream_total_key_count: u64,
124    ) -> PendingBackfillFragments {
125        let mut result = PendingBackfillFragments::default();
126        self.upstream_mvs_total_key_count = upstream_total_key_count;
127        let total_actors = self.states.len();
128        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
129
130        let mut old_consumed_row = 0;
131        let mut new_consumed_row = 0;
132        let mut old_buffered_row = 0;
133        let mut new_buffered_row = 0;
134        match self.states.remove(&actor).unwrap() {
135            BackfillState::Init => {}
136            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
137                old_consumed_row = consumed_rows;
138                old_buffered_row = buffered_rows;
139            }
140            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
141        };
142        match &new_state {
143            BackfillState::Init => {}
144            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
145                new_consumed_row = *consumed_rows;
146                new_buffered_row = *buffered_rows;
147            }
148            BackfillState::Done(consumed_rows, buffered_rows) => {
149                tracing::debug!("actor {} done", actor);
150                new_consumed_row = *consumed_rows;
151                new_buffered_row = *buffered_rows;
152                self.done_count += 1;
153                let before_backfill_nodes = self
154                    .backfill_order_state
155                    .current_backfill_node_fragment_ids();
156                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
157                let after_backfill_nodes = self
158                    .backfill_order_state
159                    .current_backfill_node_fragment_ids();
160                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
161                let last_backfill_nodes_iter = before_backfill_nodes
162                    .into_iter()
163                    .filter(|x| !after_backfill_nodes.contains(x));
164                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
165                    .filter_map(|fragment_id| {
166                        self.backfill_order_state
167                            .get_locality_fragment_state_table_mapping()
168                            .get(&fragment_id)
169                    })
170                    .flatten()
171                    .copied()
172                    .collect();
173                tracing::debug!(
174                    "{} actors out of {} complete",
175                    self.done_count,
176                    total_actors,
177                );
178            }
179        };
180        debug_assert!(
181            new_consumed_row >= old_consumed_row,
182            "backfill progress should not go backward"
183        );
184        debug_assert!(
185            new_buffered_row >= old_buffered_row,
186            "backfill progress should not go backward"
187        );
188        match backfill_upstream_type {
189            BackfillUpstreamType::MView => {
190                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
191            }
192            BackfillUpstreamType::Source => {
193                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
194            }
195            BackfillUpstreamType::Values => {
196                // do not consider progress for values
197            }
198            BackfillUpstreamType::LocalityProvider => {
199                // Track LocalityProvider progress similar to MView
200                // Update buffered rows for precise progress calculation
201                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
202                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
203            }
204        }
205        self.states.insert(actor, new_state);
206        result
207    }
208
209    /// Returns whether all backfill executors are done.
210    fn is_done(&self) -> bool {
211        tracing::trace!(
212            "Progress::is_done? {}, {}, {:?}",
213            self.done_count,
214            self.states.len(),
215            self.states
216        );
217        self.done_count == self.states.len()
218    }
219
220    /// Returns the ids of all actors containing the backfill executors for the mview tracked by this
221    /// [`Progress`].
222    fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
223        self.states.keys().cloned()
224    }
225
226    /// `progress` = `consumed_rows` / `upstream_total_key_count`
227    fn calculate_progress(&self) -> String {
228        if self.is_done() || self.states.is_empty() {
229            return "100%".to_owned();
230        }
231        let mut mv_count = 0;
232        let mut source_count = 0;
233        for backfill_upstream_type in self.backfill_upstream_types.values() {
234            match backfill_upstream_type {
235                BackfillUpstreamType::MView => mv_count += 1,
236                BackfillUpstreamType::Source => source_count += 1,
237                BackfillUpstreamType::Values => (),
238                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
239            }
240        }
241
242        let mv_progress = (mv_count > 0).then_some({
243            // Include buffered rows in total for precise progress calculation
244            // Progress = consumed / (upstream_total + buffered)
245            let total_rows_to_consume =
246                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
247            if total_rows_to_consume == 0 {
248                "99.99%".to_owned()
249            } else {
250                let mut progress =
251                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
252                if progress > 1.0 {
253                    progress = 0.9999;
254                }
255                format!(
256                    "{:.2}% ({}/{})",
257                    progress * 100.0,
258                    self.mv_backfill_consumed_rows,
259                    total_rows_to_consume
260                )
261            }
262        });
263        let source_progress = (source_count > 0).then_some(format!(
264            "{} rows consumed",
265            self.source_backfill_consumed_rows
266        ));
267        match (mv_progress, source_progress) {
268            (Some(mv_progress), Some(source_progress)) => {
269                format!(
270                    "MView Backfill: {}, Source Backfill: {}",
271                    mv_progress, source_progress
272                )
273            }
274            (Some(mv_progress), None) => mv_progress,
275            (None, Some(source_progress)) => source_progress,
276            (None, None) => "Unknown".to_owned(),
277        }
278    }
279}
280
281/// There are two kinds of `TrackingJobs`:
282/// 1. if `is_recovered` is false, it is a "New" tracking job.
283///    It is instantiated and managed by the stream manager.
284///    On recovery, the stream manager will stop managing the job.
285/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
286///    On recovery, the barrier manager will recover and start managing the job.
287pub struct TrackingJob {
288    job_id: ObjectId,
289    is_recovered: bool,
290    source_change: Option<SourceChange>,
291}
292
293impl std::fmt::Display for TrackingJob {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        write!(
296            f,
297            "{}{}",
298            self.job_id,
299            if self.is_recovered { "<recovered>" } else { "" }
300        )
301    }
302}
303
304impl TrackingJob {
305    /// Create a new tracking job.
306    pub(crate) fn new(job_id: ObjectId, source_change: Option<SourceChange>) -> Self {
307        Self {
308            job_id,
309            is_recovered: false,
310            source_change,
311        }
312    }
313
314    /// Create a recovered tracking job.
315    pub(crate) fn recovered(job_id: ObjectId, source_change: Option<SourceChange>) -> Self {
316        Self {
317            job_id,
318            is_recovered: true,
319            source_change,
320        }
321    }
322
323    /// Notify the metadata manager that the job is finished.
324    pub(crate) async fn finish(
325        self,
326        metadata_manager: &MetadataManager,
327        source_manager: &SourceManagerRef,
328    ) -> MetaResult<()> {
329        metadata_manager
330            .catalog_controller
331            .finish_streaming_job(self.job_id)
332            .await?;
333        if let Some(source_change) = self.source_change {
334            source_manager.apply_source_change(source_change).await;
335        }
336        Ok(())
337    }
338
339    pub(crate) fn table_to_create(&self) -> TableId {
340        (self.job_id as u32).into()
341    }
342}
343
344impl std::fmt::Debug for TrackingJob {
345    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346        if !self.is_recovered {
347            write!(f, "TrackingJob::New({})", self.job_id)
348        } else {
349            write!(f, "TrackingJob::Recovered({})", self.job_id)
350        }
351    }
352}
353
354/// Information collected during barrier completion that needs to be committed.
355#[derive(Debug, Default)]
356pub(super) struct StagingCommitInfo {
357    /// Finished jobs that should be committed
358    pub finished_jobs: Vec<TrackingJob>,
359    /// Table IDs whose locality provider state tables need to be truncated
360    pub table_ids_to_truncate: Vec<TableId>,
361}
362
363pub(super) enum UpdateProgressResult {
364    None,
365    /// The finished job, along with its pending backfill fragments for cleanup.
366    Finished(TrackingJob, PendingBackfillFragments),
367    /// Backfill nodes have finished and new ones need to be scheduled.
368    BackfillNodeFinished(PendingBackfillFragments),
369}
370
371/// Tracking is done as follows:
372/// 1. We identify a `StreamJob` by its `TableId` of its `Materialized` table.
373/// 2. For each stream job, there are several actors which run its tasks.
374/// 3. With `progress_map` we can use the ID of the `StreamJob` to view its progress.
375/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
376#[derive(Default, Debug)]
377pub(super) struct CreateMviewProgressTracker {
378    /// Progress of the create-mview DDL indicated by the `TableId`.
379    progress_map: HashMap<TableId, (Progress, TrackingJob)>,
380
381    actor_map: HashMap<ActorId, TableId>,
382
383    /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
384    pending_backfill_nodes: Vec<FragmentId>,
385
386    /// Staging commit info that will be committed on checkpoint.
387    staging_commit_info: StagingCommitInfo,
388}
389
390impl CreateMviewProgressTracker {
391    /// This step recovers state from the meta side:
392    /// 1. `Tables`.
393    /// 2. `TableFragments`.
394    ///
395    /// Other state are persisted by the `BackfillExecutor`, such as:
396    /// 1. `CreateMviewProgress`.
397    /// 2. `Backfill` position.
398    pub fn recover(
399        jobs: impl IntoIterator<
400            Item = (
401                TableId,
402                (String, &InflightStreamingJobInfo, BackfillOrderState),
403            ),
404        >,
405        version_stats: &HummockVersionStats,
406    ) -> Self {
407        let mut actor_map = HashMap::new();
408        let mut progress_map = HashMap::new();
409        let mut finished_jobs = vec![];
410        for (creating_table_id, (definition, job_info, backfill_order_state)) in jobs {
411            let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
412                job_info
413                    .fragment_infos
414                    .iter()
415                    .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
416            );
417            let source_change = if source_backfill_fragments.is_empty() {
418                None
419            } else {
420                Some(SourceChange::CreateJobFinished {
421                    finished_backfill_fragments: source_backfill_fragments,
422                })
423            };
424            let tracking_job =
425                TrackingJob::recovered(creating_table_id.table_id as _, source_change);
426            let actors = job_info.tracking_progress_actor_ids();
427            if actors.is_empty() {
428                finished_jobs.push(tracking_job);
429            } else {
430                let mut states = HashMap::new();
431                let mut backfill_upstream_types = HashMap::new();
432
433                for (actor, backfill_upstream_type) in actors {
434                    actor_map.insert(actor, creating_table_id);
435                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
436                    backfill_upstream_types.insert(actor, backfill_upstream_type);
437                }
438
439                let progress = Self::recover_progress(
440                    states,
441                    backfill_upstream_types,
442                    StreamJobFragments::upstream_table_counts_impl(
443                        job_info
444                            .fragment_infos
445                            .values()
446                            .map(|fragment| &fragment.nodes),
447                    ),
448                    definition,
449                    version_stats,
450                    backfill_order_state,
451                );
452                progress_map.insert(creating_table_id, (progress, tracking_job));
453            }
454        }
455        Self {
456            progress_map,
457            actor_map,
458            pending_backfill_nodes: Vec::new(),
459            staging_commit_info: StagingCommitInfo {
460                finished_jobs,
461                table_ids_to_truncate: vec![],
462            },
463        }
464    }
465
466    /// ## How recovery works
467    ///
468    /// The progress (number of rows consumed) is persisted in state tables.
469    /// During recovery, the backfill executor will restore the number of rows consumed,
470    /// and then it will just report progress like newly created executors.
471    fn recover_progress(
472        states: HashMap<ActorId, BackfillState>,
473        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
474        upstream_mv_count: HashMap<TableId, usize>,
475        definition: String,
476        version_stats: &HummockVersionStats,
477        backfill_order_state: BackfillOrderState,
478    ) -> Progress {
479        let upstream_mvs_total_key_count =
480            calculate_total_key_count(&upstream_mv_count, version_stats);
481        Progress {
482            states,
483            backfill_order_state,
484            backfill_upstream_types,
485            done_count: 0, // Fill only after first barrier pass
486            upstream_mv_count,
487            upstream_mvs_total_key_count,
488            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
489            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
490            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
491            definition,
492            create_type: CreateType::Background,
493        }
494    }
495
496    pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
497        self.progress_map
498            .iter()
499            .map(|(table_id, (x, _))| {
500                let table_id = table_id.table_id;
501                let ddl_progress = DdlProgress {
502                    id: table_id as u64,
503                    statement: x.definition.clone(),
504                    create_type: x.create_type.as_str().to_owned(),
505                    progress: x.calculate_progress(),
506                };
507                (table_id, ddl_progress)
508            })
509            .collect()
510    }
511
512    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
513    /// Return the table ids whose locality provider state tables need to be truncated.
514    pub(super) fn update_tracking_jobs<'a>(
515        &mut self,
516        info: Option<&CreateStreamingJobCommandInfo>,
517        create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
518        version_stats: &HummockVersionStats,
519    ) {
520        let mut table_ids_to_truncate = vec![];
521        // Save `finished_commands` for Create MVs.
522        let finished_commands = {
523            let mut commands = vec![];
524            // Add the command to tracker.
525            if let Some(create_job_info) = info
526                && let Some(command) = self.add(create_job_info, version_stats)
527            {
528                // Those with no actors to track can be finished immediately.
529                commands.push(command);
530            }
531            // Update the progress of all commands.
532            for progress in create_mview_progress {
533                // Those with actors complete can be finished immediately.
534                match self.update(progress, version_stats) {
535                    UpdateProgressResult::None => {
536                        tracing::trace!(?progress, "update progress");
537                    }
538                    UpdateProgressResult::Finished(command, pending) => {
539                        table_ids_to_truncate
540                            .extend(pending.truncate_locality_provider_state_tables.clone());
541                        self.queue_backfill(pending.next_backfill_nodes);
542                        tracing::trace!(?progress, "finish progress");
543                        commands.push(command);
544                    }
545                    UpdateProgressResult::BackfillNodeFinished(pending) => {
546                        table_ids_to_truncate
547                            .extend(pending.truncate_locality_provider_state_tables.clone());
548                        tracing::trace!(
549                            ?progress,
550                            next_backfill_nodes = ?pending.next_backfill_nodes,
551                            "start next backfill node"
552                        );
553                        self.queue_backfill(pending.next_backfill_nodes);
554                    }
555                }
556            }
557            commands
558        };
559
560        for command in finished_commands {
561            self.staging_commit_info.finished_jobs.push(command);
562        }
563        self.staging_commit_info
564            .table_ids_to_truncate
565            .extend(table_ids_to_truncate);
566    }
567
568    /// Apply a collected epoch node command to the tracker
569    /// Return the staging commit info when the barrier kind is `Checkpoint`.
570    pub(super) fn apply_collected_command(
571        &mut self,
572        command: Option<&Command>,
573        barrier_info: &BarrierInfo,
574        resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
575        version_stats: &HummockVersionStats,
576    ) -> Option<StagingCommitInfo> {
577        let new_tracking_job_info =
578            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
579                match job_type {
580                    CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
581                        Some(info)
582                    }
583                    CreateStreamingJobType::SnapshotBackfill(_) => {
584                        // The progress of SnapshotBackfill won't be tracked here
585                        None
586                    }
587                }
588            } else {
589                None
590            };
591        self.update_tracking_jobs(
592            new_tracking_job_info,
593            resps
594                .into_iter()
595                .flat_map(|resp| resp.create_mview_progress.iter()),
596            version_stats,
597        );
598        for table_id in command.map(Command::jobs_to_drop).into_iter().flatten() {
599            // the cancelled command is possibly stashed in `finished_commands` and waiting
600            // for checkpoint, we should also clear it.
601            self.cancel_command(table_id);
602        }
603        if barrier_info.kind.is_checkpoint() {
604            Some(take(&mut self.staging_commit_info))
605        } else {
606            None
607        }
608    }
609
610    fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
611        self.pending_backfill_nodes.extend(backfill_nodes);
612    }
613
614    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
615        take(&mut self.pending_backfill_nodes)
616    }
617
618    pub(super) fn has_pending_finished_jobs(&self) -> bool {
619        !self.staging_commit_info.finished_jobs.is_empty()
620    }
621
622    pub(super) fn cancel_command(&mut self, id: TableId) {
623        let _ = self.progress_map.remove(&id);
624        self.staging_commit_info
625            .finished_jobs
626            .retain(|x| x.table_to_create() != id);
627        self.actor_map.retain(|_, table_id| *table_id != id);
628    }
629
630    /// Notify all tracked commands that error encountered and clear them.
631    pub fn abort_all(&mut self) {
632        self.actor_map.clear();
633        self.staging_commit_info = StagingCommitInfo::default();
634        self.progress_map.clear();
635    }
636
637    /// Add a new create-mview DDL command to track.
638    ///
639    /// If the actors to track are empty, return the given command as it can be finished immediately.
640    pub fn add(
641        &mut self,
642        info: &CreateStreamingJobCommandInfo,
643        version_stats: &HummockVersionStats,
644    ) -> Option<TrackingJob> {
645        tracing::trace!(?info, "add job to track");
646        let (info, actors) = {
647            let CreateStreamingJobCommandInfo {
648                stream_job_fragments,
649                ..
650            } = info;
651            let actors = stream_job_fragments.tracking_progress_actor_ids();
652            if actors.is_empty() {
653                // The command can be finished immediately.
654                return Some(TrackingJob::new(
655                    info.stream_job_fragments.stream_job_id.table_id as _,
656                    Some(SourceChange::CreateJobFinished {
657                        finished_backfill_fragments: stream_job_fragments
658                            .source_backfill_fragments(),
659                    }),
660                ));
661            }
662            (info.clone(), actors)
663        };
664
665        let CreateStreamingJobCommandInfo {
666            stream_job_fragments: table_fragments,
667            definition,
668            create_type,
669            fragment_backfill_ordering,
670            locality_fragment_state_table_mapping,
671            ..
672        } = info;
673
674        let creating_job_id = table_fragments.stream_job_id();
675        let upstream_mv_count = table_fragments.upstream_table_counts();
676        let upstream_total_key_count: u64 =
677            calculate_total_key_count(&upstream_mv_count, version_stats);
678
679        for (actor, _backfill_upstream_type) in &actors {
680            self.actor_map.insert(*actor, creating_job_id);
681        }
682
683        let backfill_order_state = BackfillOrderState::new(
684            fragment_backfill_ordering,
685            &table_fragments,
686            locality_fragment_state_table_mapping,
687        );
688        let progress = Progress::new(
689            actors,
690            upstream_mv_count,
691            upstream_total_key_count,
692            definition,
693            create_type.into(),
694            backfill_order_state,
695        );
696        let old = self.progress_map.insert(
697            creating_job_id,
698            (
699                progress,
700                TrackingJob::new(
701                    creating_job_id.table_id as _,
702                    Some(SourceChange::CreateJobFinished {
703                        finished_backfill_fragments: table_fragments.source_backfill_fragments(),
704                    }),
705                ),
706            ),
707        );
708        assert!(old.is_none());
709        None
710    }
711
712    /// Update the progress of `actor` according to the Pb struct.
713    ///
714    /// If all actors in this MV have finished, return the command.
715    pub fn update(
716        &mut self,
717        progress: &CreateMviewProgress,
718        version_stats: &HummockVersionStats,
719    ) -> UpdateProgressResult {
720        tracing::trace!(?progress, "update progress");
721        let actor = progress.backfill_actor_id;
722        let Some(table_id) = self.actor_map.get(&actor).copied() else {
723            // On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
724            // even if backfill is finished on recovery.
725            // This is because we don't know if only this actor is finished,
726            // OR the entire stream job is finished.
727            // For the first case, we must notify meta.
728            // For the second case, we can still notify meta, but ignore it here.
729            tracing::info!(
730                "no tracked progress for actor {}, the stream job could already be finished",
731                actor
732            );
733            return UpdateProgressResult::None;
734        };
735
736        let new_state = if progress.done {
737            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
738        } else {
739            BackfillState::ConsumingUpstream(
740                progress.consumed_epoch.into(),
741                progress.consumed_rows,
742                progress.buffered_rows,
743            )
744        };
745
746        match self.progress_map.entry(table_id) {
747            Entry::Occupied(mut o) => {
748                let progress_state = &mut o.get_mut().0;
749
750                let upstream_total_key_count: u64 =
751                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
752
753                tracing::debug!(?table_id, "updating progress for table");
754                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
755
756                if progress_state.is_done() {
757                    tracing::debug!(
758                        "all actors done for creating mview with table_id {}!",
759                        table_id
760                    );
761
762                    // Clean-up the mapping from actors to DDL table_id.
763                    for actor in o.get().0.actors() {
764                        self.actor_map.remove(&actor);
765                    }
766                    assert!(pending.next_backfill_nodes.is_empty());
767                    UpdateProgressResult::Finished(o.remove().1, pending)
768                } else if !pending.next_backfill_nodes.is_empty()
769                    || !pending.truncate_locality_provider_state_tables.is_empty()
770                {
771                    UpdateProgressResult::BackfillNodeFinished(pending)
772                } else {
773                    UpdateProgressResult::None
774                }
775            }
776            Entry::Vacant(_) => {
777                tracing::warn!(
778                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
779                );
780                UpdateProgressResult::None
781            }
782        }
783    }
784}
785
786fn calculate_total_key_count(
787    table_count: &HashMap<TableId, usize>,
788    version_stats: &HummockVersionStats,
789) -> u64 {
790    table_count
791        .iter()
792        .map(|(table_id, count)| {
793            assert_ne!(*count, 0);
794            *count as u64
795                * version_stats
796                    .table_stats
797                    .get(&table_id.table_id)
798                    .map_or(0, |stat| stat.total_key_count as u64)
799        })
800        .sum()
801}