risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_meta_model::CreateType;
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::PbBarrierCompleteResponse;
26use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
27
28use crate::MetaResult;
29use crate::barrier::backfill_order_control::BackfillOrderState;
30use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
31use crate::barrier::{Command, CreateStreamingJobCommandInfo, CreateStreamingJobType};
32use crate::manager::MetadataManager;
33use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
34use crate::stream::{SourceChange, SourceManagerRef};
35
36type ConsumedRows = u64;
37type BufferedRows = u64;
38
39#[derive(Clone, Copy, Debug)]
40enum BackfillState {
41    Init,
42    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
43    Done(ConsumedRows, BufferedRows),
44}
45
46/// Represents the backfill nodes that need to be scheduled or cleaned up.
47#[derive(Debug, Default)]
48pub(super) struct PendingBackfillFragments {
49    /// Fragment IDs that should start backfilling in the next checkpoint
50    pub next_backfill_nodes: Vec<FragmentId>,
51    /// State tables of locality provider fragments that should be truncated
52    pub truncate_locality_provider_state_tables: Vec<TableId>,
53}
54
55/// Progress of all actors containing backfill executors while creating mview.
56#[derive(Debug)]
57pub(super) struct Progress {
58    // `states` and `done_count` decides whether the progress is done. See `is_done`.
59    states: HashMap<ActorId, BackfillState>,
60    backfill_order_state: BackfillOrderState,
61    done_count: usize,
62
63    /// Tells whether the backfill is from source or mv.
64    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66    // The following row counts are used to calculate the progress. See `calculate_progress`.
67    /// Upstream mv count.
68    /// Keep track of how many times each upstream MV
69    /// appears in this stream job.
70    upstream_mv_count: HashMap<TableId, usize>,
71    /// Total key count of all the upstream materialized views
72    upstream_mvs_total_key_count: u64,
73    mv_backfill_consumed_rows: u64,
74    source_backfill_consumed_rows: u64,
75    /// Buffered rows (for locality backfill) that are yet to be consumed
76    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
77    mv_backfill_buffered_rows: u64,
78
79    /// DDL definition
80    definition: String,
81    /// Create type
82    create_type: CreateType,
83}
84
85impl Progress {
86    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
87    fn new(
88        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
89        upstream_mv_count: HashMap<TableId, usize>,
90        upstream_total_key_count: u64,
91        definition: String,
92        create_type: CreateType,
93        backfill_order_state: BackfillOrderState,
94    ) -> Self {
95        let mut states = HashMap::new();
96        let mut backfill_upstream_types = HashMap::new();
97        for (actor, backfill_upstream_type) in actors {
98            states.insert(actor, BackfillState::Init);
99            backfill_upstream_types.insert(actor, backfill_upstream_type);
100        }
101        assert!(!states.is_empty());
102
103        Self {
104            states,
105            backfill_upstream_types,
106            done_count: 0,
107            upstream_mv_count,
108            upstream_mvs_total_key_count: upstream_total_key_count,
109            mv_backfill_consumed_rows: 0,
110            source_backfill_consumed_rows: 0,
111            mv_backfill_buffered_rows: 0,
112            definition,
113            create_type,
114            backfill_order_state,
115        }
116    }
117
118    /// Update the progress of `actor`.
119    /// Returns the backfill fragments that need to be scheduled or cleaned up.
120    fn update(
121        &mut self,
122        actor: ActorId,
123        new_state: BackfillState,
124        upstream_total_key_count: u64,
125    ) -> PendingBackfillFragments {
126        let mut result = PendingBackfillFragments::default();
127        self.upstream_mvs_total_key_count = upstream_total_key_count;
128        let total_actors = self.states.len();
129        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
130
131        let mut old_consumed_row = 0;
132        let mut new_consumed_row = 0;
133        let mut old_buffered_row = 0;
134        let mut new_buffered_row = 0;
135        match self.states.remove(&actor).unwrap() {
136            BackfillState::Init => {}
137            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
138                old_consumed_row = consumed_rows;
139                old_buffered_row = buffered_rows;
140            }
141            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
142        };
143        match &new_state {
144            BackfillState::Init => {}
145            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
146                new_consumed_row = *consumed_rows;
147                new_buffered_row = *buffered_rows;
148            }
149            BackfillState::Done(consumed_rows, buffered_rows) => {
150                tracing::debug!("actor {} done", actor);
151                new_consumed_row = *consumed_rows;
152                new_buffered_row = *buffered_rows;
153                self.done_count += 1;
154                let before_backfill_nodes = self
155                    .backfill_order_state
156                    .current_backfill_node_fragment_ids();
157                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
158                let after_backfill_nodes = self
159                    .backfill_order_state
160                    .current_backfill_node_fragment_ids();
161                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
162                let last_backfill_nodes_iter = before_backfill_nodes
163                    .into_iter()
164                    .filter(|x| !after_backfill_nodes.contains(x));
165                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
166                    .filter_map(|fragment_id| {
167                        self.backfill_order_state
168                            .get_locality_fragment_state_table_mapping()
169                            .get(&fragment_id)
170                    })
171                    .flatten()
172                    .copied()
173                    .collect();
174                tracing::debug!(
175                    "{} actors out of {} complete",
176                    self.done_count,
177                    total_actors,
178                );
179            }
180        };
181        debug_assert!(
182            new_consumed_row >= old_consumed_row,
183            "backfill progress should not go backward"
184        );
185        debug_assert!(
186            new_buffered_row >= old_buffered_row,
187            "backfill progress should not go backward"
188        );
189        match backfill_upstream_type {
190            BackfillUpstreamType::MView => {
191                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
192            }
193            BackfillUpstreamType::Source => {
194                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
195            }
196            BackfillUpstreamType::Values => {
197                // do not consider progress for values
198            }
199            BackfillUpstreamType::LocalityProvider => {
200                // Track LocalityProvider progress similar to MView
201                // Update buffered rows for precise progress calculation
202                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
203                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
204            }
205        }
206        self.states.insert(actor, new_state);
207        result
208    }
209
210    /// Returns whether all backfill executors are done.
211    fn is_done(&self) -> bool {
212        tracing::trace!(
213            "Progress::is_done? {}, {}, {:?}",
214            self.done_count,
215            self.states.len(),
216            self.states
217        );
218        self.done_count == self.states.len()
219    }
220
221    /// Returns the ids of all actors containing the backfill executors for the mview tracked by this
222    /// [`Progress`].
223    fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
224        self.states.keys().cloned()
225    }
226
227    /// `progress` = `consumed_rows` / `upstream_total_key_count`
228    fn calculate_progress(&self) -> String {
229        if self.is_done() || self.states.is_empty() {
230            return "100%".to_owned();
231        }
232        let mut mv_count = 0;
233        let mut source_count = 0;
234        for backfill_upstream_type in self.backfill_upstream_types.values() {
235            match backfill_upstream_type {
236                BackfillUpstreamType::MView => mv_count += 1,
237                BackfillUpstreamType::Source => source_count += 1,
238                BackfillUpstreamType::Values => (),
239                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
240            }
241        }
242
243        let mv_progress = (mv_count > 0).then_some({
244            // Include buffered rows in total for precise progress calculation
245            // Progress = consumed / (upstream_total + buffered)
246            let total_rows_to_consume =
247                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
248            if total_rows_to_consume == 0 {
249                "99.99%".to_owned()
250            } else {
251                let mut progress =
252                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
253                if progress > 1.0 {
254                    progress = 0.9999;
255                }
256                format!(
257                    "{:.2}% ({}/{})",
258                    progress * 100.0,
259                    self.mv_backfill_consumed_rows,
260                    total_rows_to_consume
261                )
262            }
263        });
264        let source_progress = (source_count > 0).then_some(format!(
265            "{} rows consumed",
266            self.source_backfill_consumed_rows
267        ));
268        match (mv_progress, source_progress) {
269            (Some(mv_progress), Some(source_progress)) => {
270                format!(
271                    "MView Backfill: {}, Source Backfill: {}",
272                    mv_progress, source_progress
273                )
274            }
275            (Some(mv_progress), None) => mv_progress,
276            (None, Some(source_progress)) => source_progress,
277            (None, None) => "Unknown".to_owned(),
278        }
279    }
280}
281
282/// There are two kinds of `TrackingJobs`:
283/// 1. if `is_recovered` is false, it is a "New" tracking job.
284///    It is instantiated and managed by the stream manager.
285///    On recovery, the stream manager will stop managing the job.
286/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
287///    On recovery, the barrier manager will recover and start managing the job.
288pub struct TrackingJob {
289    job_id: JobId,
290    is_recovered: bool,
291    source_change: Option<SourceChange>,
292}
293
294impl std::fmt::Display for TrackingJob {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        write!(
297            f,
298            "{}{}",
299            self.job_id,
300            if self.is_recovered { "<recovered>" } else { "" }
301        )
302    }
303}
304
305impl TrackingJob {
306    /// Create a new tracking job.
307    pub(crate) fn new(job_id: JobId, source_change: Option<SourceChange>) -> Self {
308        Self {
309            job_id,
310            is_recovered: false,
311            source_change,
312        }
313    }
314
315    /// Create a recovered tracking job.
316    pub(crate) fn recovered(job_id: JobId, source_change: Option<SourceChange>) -> Self {
317        Self {
318            job_id,
319            is_recovered: true,
320            source_change,
321        }
322    }
323
324    /// Notify the metadata manager that the job is finished.
325    pub(crate) async fn finish(
326        self,
327        metadata_manager: &MetadataManager,
328        source_manager: &SourceManagerRef,
329    ) -> MetaResult<()> {
330        metadata_manager
331            .catalog_controller
332            .finish_streaming_job(self.job_id)
333            .await?;
334        if let Some(source_change) = self.source_change {
335            source_manager.apply_source_change(source_change).await;
336        }
337        Ok(())
338    }
339}
340
341impl std::fmt::Debug for TrackingJob {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        if !self.is_recovered {
344            write!(f, "TrackingJob::New({})", self.job_id)
345        } else {
346            write!(f, "TrackingJob::Recovered({})", self.job_id)
347        }
348    }
349}
350
351/// Information collected during barrier completion that needs to be committed.
352#[derive(Debug, Default)]
353pub(super) struct StagingCommitInfo {
354    /// Finished jobs that should be committed
355    pub finished_jobs: Vec<TrackingJob>,
356    /// Table IDs whose locality provider state tables need to be truncated
357    pub table_ids_to_truncate: Vec<TableId>,
358}
359
360pub(super) enum UpdateProgressResult {
361    None,
362    /// The finished job, along with its pending backfill fragments for cleanup.
363    Finished(TrackingJob, PendingBackfillFragments),
364    /// Backfill nodes have finished and new ones need to be scheduled.
365    BackfillNodeFinished(PendingBackfillFragments),
366}
367
368/// Tracking is done as follows:
369/// 1. We identify a `StreamJob` by its `TableId` of its `Materialized` table.
370/// 2. For each stream job, there are several actors which run its tasks.
371/// 3. With `progress_map` we can use the ID of the `StreamJob` to view its progress.
372/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
373#[derive(Default, Debug)]
374pub(super) struct CreateMviewProgressTracker {
375    /// Progress of the create-mview DDL indicated by the `TableId`.
376    progress_map: HashMap<JobId, (Progress, TrackingJob)>,
377
378    actor_map: HashMap<ActorId, JobId>,
379
380    /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
381    pending_backfill_nodes: Vec<FragmentId>,
382
383    /// Staging commit info that will be committed on checkpoint.
384    staging_commit_info: StagingCommitInfo,
385}
386
387impl CreateMviewProgressTracker {
388    /// This step recovers state from the meta side:
389    /// 1. `Tables`.
390    /// 2. `TableFragments`.
391    ///
392    /// Other state are persisted by the `BackfillExecutor`, such as:
393    /// 1. `CreateMviewProgress`.
394    /// 2. `Backfill` position.
395    pub fn recover(
396        jobs: impl IntoIterator<
397            Item = (
398                JobId,
399                (String, &InflightStreamingJobInfo, BackfillOrderState),
400            ),
401        >,
402        version_stats: &HummockVersionStats,
403    ) -> Self {
404        let mut actor_map = HashMap::new();
405        let mut progress_map = HashMap::new();
406        let mut finished_jobs = vec![];
407        for (creating_job_id, (definition, job_info, backfill_order_state)) in jobs {
408            let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
409                job_info
410                    .fragment_infos
411                    .iter()
412                    .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
413            );
414            let source_change = if source_backfill_fragments.is_empty() {
415                None
416            } else {
417                Some(SourceChange::CreateJobFinished {
418                    finished_backfill_fragments: source_backfill_fragments,
419                })
420            };
421            let tracking_job = TrackingJob::recovered(creating_job_id, source_change);
422            let actors = job_info.tracking_progress_actor_ids();
423            if actors.is_empty() {
424                finished_jobs.push(tracking_job);
425            } else {
426                let mut states = HashMap::new();
427                let mut backfill_upstream_types = HashMap::new();
428
429                for (actor, backfill_upstream_type) in actors {
430                    actor_map.insert(actor, creating_job_id);
431                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
432                    backfill_upstream_types.insert(actor, backfill_upstream_type);
433                }
434
435                let progress = Self::recover_progress(
436                    states,
437                    backfill_upstream_types,
438                    StreamJobFragments::upstream_table_counts_impl(
439                        job_info
440                            .fragment_infos
441                            .values()
442                            .map(|fragment| &fragment.nodes),
443                    ),
444                    definition,
445                    version_stats,
446                    backfill_order_state,
447                );
448                progress_map.insert(creating_job_id, (progress, tracking_job));
449            }
450        }
451        Self {
452            progress_map,
453            actor_map,
454            pending_backfill_nodes: Vec::new(),
455            staging_commit_info: StagingCommitInfo {
456                finished_jobs,
457                table_ids_to_truncate: vec![],
458            },
459        }
460    }
461
462    /// ## How recovery works
463    ///
464    /// The progress (number of rows consumed) is persisted in state tables.
465    /// During recovery, the backfill executor will restore the number of rows consumed,
466    /// and then it will just report progress like newly created executors.
467    fn recover_progress(
468        states: HashMap<ActorId, BackfillState>,
469        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
470        upstream_mv_count: HashMap<TableId, usize>,
471        definition: String,
472        version_stats: &HummockVersionStats,
473        backfill_order_state: BackfillOrderState,
474    ) -> Progress {
475        let upstream_mvs_total_key_count =
476            calculate_total_key_count(&upstream_mv_count, version_stats);
477        Progress {
478            states,
479            backfill_order_state,
480            backfill_upstream_types,
481            done_count: 0, // Fill only after first barrier pass
482            upstream_mv_count,
483            upstream_mvs_total_key_count,
484            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
485            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
486            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
487            definition,
488            create_type: CreateType::Background,
489        }
490    }
491
492    pub fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
493        self.progress_map
494            .iter()
495            .map(|(job_id, (x, _))| {
496                let ddl_progress = DdlProgress {
497                    id: job_id.as_raw_id() as u64,
498                    statement: x.definition.clone(),
499                    create_type: x.create_type.as_str().to_owned(),
500                    progress: x.calculate_progress(),
501                };
502                (*job_id, ddl_progress)
503            })
504            .collect()
505    }
506
507    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
508    /// Return the table ids whose locality provider state tables need to be truncated.
509    pub(super) fn update_tracking_jobs<'a>(
510        &mut self,
511        info: Option<&CreateStreamingJobCommandInfo>,
512        create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
513        version_stats: &HummockVersionStats,
514    ) {
515        let mut table_ids_to_truncate = vec![];
516        // Save `finished_commands` for Create MVs.
517        let finished_commands = {
518            let mut commands = vec![];
519            // Add the command to tracker.
520            if let Some(create_job_info) = info
521                && let Some(command) = self.add(create_job_info, version_stats)
522            {
523                // Those with no actors to track can be finished immediately.
524                commands.push(command);
525            }
526            // Update the progress of all commands.
527            for progress in create_mview_progress {
528                // Those with actors complete can be finished immediately.
529                match self.update(progress, version_stats) {
530                    UpdateProgressResult::None => {
531                        tracing::trace!(?progress, "update progress");
532                    }
533                    UpdateProgressResult::Finished(command, pending) => {
534                        table_ids_to_truncate
535                            .extend(pending.truncate_locality_provider_state_tables.clone());
536                        self.queue_backfill(pending.next_backfill_nodes);
537                        tracing::trace!(?progress, "finish progress");
538                        commands.push(command);
539                    }
540                    UpdateProgressResult::BackfillNodeFinished(pending) => {
541                        table_ids_to_truncate
542                            .extend(pending.truncate_locality_provider_state_tables.clone());
543                        tracing::trace!(
544                            ?progress,
545                            next_backfill_nodes = ?pending.next_backfill_nodes,
546                            "start next backfill node"
547                        );
548                        self.queue_backfill(pending.next_backfill_nodes);
549                    }
550                }
551            }
552            commands
553        };
554
555        for command in finished_commands {
556            self.staging_commit_info.finished_jobs.push(command);
557        }
558        self.staging_commit_info
559            .table_ids_to_truncate
560            .extend(table_ids_to_truncate);
561    }
562
563    /// Apply a collected epoch node command to the tracker
564    /// Return the staging commit info when the barrier kind is `Checkpoint`.
565    pub(super) fn apply_collected_command(
566        &mut self,
567        command: Option<&Command>,
568        barrier_info: &BarrierInfo,
569        resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
570        version_stats: &HummockVersionStats,
571    ) -> Option<StagingCommitInfo> {
572        let new_tracking_job_info =
573            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
574                match job_type {
575                    CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
576                        Some(info)
577                    }
578                    CreateStreamingJobType::SnapshotBackfill(_) => {
579                        // The progress of SnapshotBackfill won't be tracked here
580                        None
581                    }
582                }
583            } else {
584                None
585            };
586        self.update_tracking_jobs(
587            new_tracking_job_info,
588            resps
589                .into_iter()
590                .flat_map(|resp| resp.create_mview_progress.iter()),
591            version_stats,
592        );
593        for job_id in command.map(Command::jobs_to_drop).into_iter().flatten() {
594            // the cancelled command is possibly stashed in `finished_commands` and waiting
595            // for checkpoint, we should also clear it.
596            self.cancel_command(job_id);
597        }
598        if barrier_info.kind.is_checkpoint() {
599            Some(take(&mut self.staging_commit_info))
600        } else {
601            None
602        }
603    }
604
605    fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
606        self.pending_backfill_nodes.extend(backfill_nodes);
607    }
608
609    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
610        take(&mut self.pending_backfill_nodes)
611    }
612
613    pub(super) fn has_pending_finished_jobs(&self) -> bool {
614        !self.staging_commit_info.finished_jobs.is_empty()
615    }
616
617    pub(super) fn cancel_command(&mut self, id: JobId) {
618        let _ = self.progress_map.remove(&id);
619        self.staging_commit_info
620            .finished_jobs
621            .retain(|x| x.job_id != id);
622        self.actor_map.retain(|_, table_id| *table_id != id);
623    }
624
625    /// Notify all tracked commands that error encountered and clear them.
626    pub fn abort_all(&mut self) {
627        self.actor_map.clear();
628        self.staging_commit_info = StagingCommitInfo::default();
629        self.progress_map.clear();
630    }
631
632    /// Add a new create-mview DDL command to track.
633    ///
634    /// If the actors to track are empty, return the given command as it can be finished immediately.
635    pub fn add(
636        &mut self,
637        info: &CreateStreamingJobCommandInfo,
638        version_stats: &HummockVersionStats,
639    ) -> Option<TrackingJob> {
640        tracing::trace!(?info, "add job to track");
641        let (info, actors) = {
642            let CreateStreamingJobCommandInfo {
643                stream_job_fragments,
644                ..
645            } = info;
646            let actors = stream_job_fragments.tracking_progress_actor_ids();
647            if actors.is_empty() {
648                // The command can be finished immediately.
649                return Some(TrackingJob::new(
650                    info.stream_job_fragments.stream_job_id,
651                    Some(SourceChange::CreateJobFinished {
652                        finished_backfill_fragments: stream_job_fragments
653                            .source_backfill_fragments(),
654                    }),
655                ));
656            }
657            (info.clone(), actors)
658        };
659
660        let CreateStreamingJobCommandInfo {
661            stream_job_fragments: table_fragments,
662            definition,
663            create_type,
664            fragment_backfill_ordering,
665            locality_fragment_state_table_mapping,
666            ..
667        } = info;
668
669        let creating_job_id = table_fragments.stream_job_id();
670        let upstream_mv_count = table_fragments.upstream_table_counts();
671        let upstream_total_key_count: u64 =
672            calculate_total_key_count(&upstream_mv_count, version_stats);
673
674        for (actor, _backfill_upstream_type) in &actors {
675            self.actor_map.insert(*actor, creating_job_id);
676        }
677
678        let backfill_order_state = BackfillOrderState::new(
679            fragment_backfill_ordering,
680            &table_fragments,
681            locality_fragment_state_table_mapping,
682        );
683        let progress = Progress::new(
684            actors,
685            upstream_mv_count,
686            upstream_total_key_count,
687            definition,
688            create_type.into(),
689            backfill_order_state,
690        );
691        let old = self.progress_map.insert(
692            creating_job_id,
693            (
694                progress,
695                TrackingJob::new(
696                    creating_job_id,
697                    Some(SourceChange::CreateJobFinished {
698                        finished_backfill_fragments: table_fragments.source_backfill_fragments(),
699                    }),
700                ),
701            ),
702        );
703        assert!(old.is_none());
704        None
705    }
706
707    /// Update the progress of `actor` according to the Pb struct.
708    ///
709    /// If all actors in this MV have finished, return the command.
710    pub fn update(
711        &mut self,
712        progress: &CreateMviewProgress,
713        version_stats: &HummockVersionStats,
714    ) -> UpdateProgressResult {
715        tracing::trace!(?progress, "update progress");
716        let actor = progress.backfill_actor_id;
717        let Some(table_id) = self.actor_map.get(&actor).copied() else {
718            // On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
719            // even if backfill is finished on recovery.
720            // This is because we don't know if only this actor is finished,
721            // OR the entire stream job is finished.
722            // For the first case, we must notify meta.
723            // For the second case, we can still notify meta, but ignore it here.
724            tracing::info!(
725                "no tracked progress for actor {}, the stream job could already be finished",
726                actor
727            );
728            return UpdateProgressResult::None;
729        };
730
731        let new_state = if progress.done {
732            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
733        } else {
734            BackfillState::ConsumingUpstream(
735                progress.consumed_epoch.into(),
736                progress.consumed_rows,
737                progress.buffered_rows,
738            )
739        };
740
741        match self.progress_map.entry(table_id) {
742            Entry::Occupied(mut o) => {
743                let progress_state = &mut o.get_mut().0;
744
745                let upstream_total_key_count: u64 =
746                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
747
748                tracing::debug!(?table_id, "updating progress for table");
749                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
750
751                if progress_state.is_done() {
752                    tracing::debug!(
753                        "all actors done for creating mview with table_id {}!",
754                        table_id
755                    );
756
757                    // Clean-up the mapping from actors to DDL table_id.
758                    for actor in o.get().0.actors() {
759                        self.actor_map.remove(&actor);
760                    }
761                    assert!(pending.next_backfill_nodes.is_empty());
762                    UpdateProgressResult::Finished(o.remove().1, pending)
763                } else if !pending.next_backfill_nodes.is_empty()
764                    || !pending.truncate_locality_provider_state_tables.is_empty()
765                {
766                    UpdateProgressResult::BackfillNodeFinished(pending)
767                } else {
768                    UpdateProgressResult::None
769                }
770            }
771            Entry::Vacant(_) => {
772                tracing::warn!(
773                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
774                );
775                UpdateProgressResult::None
776            }
777        }
778    }
779}
780
781fn calculate_total_key_count(
782    table_count: &HashMap<TableId, usize>,
783    version_stats: &HummockVersionStats,
784) -> u64 {
785    table_count
786        .iter()
787        .map(|(table_id, count)| {
788            assert_ne!(*count, 0);
789            *count as u64
790                * version_stats
791                    .table_stats
792                    .get(&table_id.as_raw_id())
793                    .map_or(0, |stat| stat.total_key_count as u64)
794        })
795        .sum()
796}