risingwave_meta/barrier/
progress.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
23
24use crate::MetaResult;
25use crate::barrier::CreateStreamingJobCommandInfo;
26use crate::barrier::backfill_order_control::BackfillOrderState;
27use crate::barrier::info::InflightStreamingJobInfo;
28use crate::controller::fragment::InflightFragmentInfo;
29use crate::manager::MetadataManager;
30use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
31use crate::stream::{SourceChange, SourceManagerRef};
32
33type ConsumedRows = u64;
34type BufferedRows = u64;
35
36#[derive(Clone, Copy, Debug)]
37enum BackfillState {
38    Init,
39    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
40    Done(ConsumedRows, BufferedRows),
41}
42
43/// Represents the backfill nodes that need to be scheduled or cleaned up.
44#[derive(Debug, Default)]
45pub(super) struct PendingBackfillFragments {
46    /// Fragment IDs that should start backfilling in the next checkpoint
47    pub next_backfill_nodes: Vec<FragmentId>,
48    /// State tables of locality provider fragments that should be truncated
49    pub truncate_locality_provider_state_tables: Vec<TableId>,
50}
51
52/// Progress of all actors containing backfill executors while creating mview.
53#[derive(Debug)]
54pub(super) struct Progress {
55    job_id: JobId,
56    // `states` and `done_count` decides whether the progress is done. See `is_done`.
57    states: HashMap<ActorId, BackfillState>,
58    backfill_order_state: BackfillOrderState,
59    done_count: usize,
60
61    /// Tells whether the backfill is from source or mv.
62    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
63
64    // The following row counts are used to calculate the progress. See `calculate_progress`.
65    /// Upstream mv count.
66    /// Keep track of how many times each upstream MV
67    /// appears in this stream job.
68    upstream_mv_count: HashMap<TableId, usize>,
69    /// Total key count of all the upstream materialized views
70    upstream_mvs_total_key_count: u64,
71    mv_backfill_consumed_rows: u64,
72    source_backfill_consumed_rows: u64,
73    /// Buffered rows (for locality backfill) that are yet to be consumed
74    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
75    mv_backfill_buffered_rows: u64,
76}
77
78impl Progress {
79    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
80    fn new(
81        job_id: JobId,
82        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
83        upstream_mv_count: HashMap<TableId, usize>,
84        upstream_total_key_count: u64,
85        backfill_order_state: BackfillOrderState,
86    ) -> Self {
87        let mut states = HashMap::new();
88        let mut backfill_upstream_types = HashMap::new();
89        for (actor, backfill_upstream_type) in actors {
90            states.insert(actor, BackfillState::Init);
91            backfill_upstream_types.insert(actor, backfill_upstream_type);
92        }
93        assert!(!states.is_empty());
94
95        Self {
96            job_id,
97            states,
98            backfill_upstream_types,
99            done_count: 0,
100            upstream_mv_count,
101            upstream_mvs_total_key_count: upstream_total_key_count,
102            mv_backfill_consumed_rows: 0,
103            source_backfill_consumed_rows: 0,
104            mv_backfill_buffered_rows: 0,
105            backfill_order_state,
106        }
107    }
108
109    /// Update the progress of `actor`.
110    /// Returns the backfill fragments that need to be scheduled or cleaned up.
111    fn update(
112        &mut self,
113        actor: ActorId,
114        new_state: BackfillState,
115        upstream_total_key_count: u64,
116    ) -> PendingBackfillFragments {
117        let mut result = PendingBackfillFragments::default();
118        self.upstream_mvs_total_key_count = upstream_total_key_count;
119        let total_actors = self.states.len();
120        let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
121            tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
122            return result;
123        };
124
125        let mut old_consumed_row = 0;
126        let mut new_consumed_row = 0;
127        let mut old_buffered_row = 0;
128        let mut new_buffered_row = 0;
129        let Some(prev_state) = self.states.remove(&actor) else {
130            tracing::warn!(%actor, "receive progress for actor not in state map");
131            return result;
132        };
133        match prev_state {
134            BackfillState::Init => {}
135            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
136                old_consumed_row = consumed_rows;
137                old_buffered_row = buffered_rows;
138            }
139            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
140        };
141        match &new_state {
142            BackfillState::Init => {}
143            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
144                new_consumed_row = *consumed_rows;
145                new_buffered_row = *buffered_rows;
146            }
147            BackfillState::Done(consumed_rows, buffered_rows) => {
148                tracing::debug!("actor {} done", actor);
149                new_consumed_row = *consumed_rows;
150                new_buffered_row = *buffered_rows;
151                self.done_count += 1;
152                let before_backfill_nodes = self
153                    .backfill_order_state
154                    .current_backfill_node_fragment_ids();
155                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
156                let after_backfill_nodes = self
157                    .backfill_order_state
158                    .current_backfill_node_fragment_ids();
159                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
160                let last_backfill_nodes_iter = before_backfill_nodes
161                    .into_iter()
162                    .filter(|x| !after_backfill_nodes.contains(x));
163                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
164                    .filter_map(|fragment_id| {
165                        self.backfill_order_state
166                            .get_locality_fragment_state_table_mapping()
167                            .get(&fragment_id)
168                    })
169                    .flatten()
170                    .copied()
171                    .collect();
172                tracing::debug!(
173                    "{} actors out of {} complete",
174                    self.done_count,
175                    total_actors,
176                );
177            }
178        };
179        debug_assert!(
180            new_consumed_row >= old_consumed_row,
181            "backfill progress should not go backward"
182        );
183        debug_assert!(
184            new_buffered_row >= old_buffered_row,
185            "backfill progress should not go backward"
186        );
187        match backfill_upstream_type {
188            BackfillUpstreamType::MView => {
189                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
190            }
191            BackfillUpstreamType::Source => {
192                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
193            }
194            BackfillUpstreamType::Values => {
195                // do not consider progress for values
196            }
197            BackfillUpstreamType::LocalityProvider => {
198                // Track LocalityProvider progress similar to MView
199                // Update buffered rows for precise progress calculation
200                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
201                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
202            }
203        }
204        self.states.insert(actor, new_state);
205        result
206    }
207
208    /// Returns whether all backfill executors are done.
209    fn is_done(&self) -> bool {
210        tracing::trace!(
211            "Progress::is_done? {}, {}, {:?}",
212            self.done_count,
213            self.states.len(),
214            self.states
215        );
216        self.done_count == self.states.len()
217    }
218
219    /// `progress` = `consumed_rows` / `upstream_total_key_count`
220    fn calculate_progress(&self) -> String {
221        if self.is_done() || self.states.is_empty() {
222            return "100%".to_owned();
223        }
224        let mut mv_count = 0;
225        let mut source_count = 0;
226        for backfill_upstream_type in self.backfill_upstream_types.values() {
227            match backfill_upstream_type {
228                BackfillUpstreamType::MView => mv_count += 1,
229                BackfillUpstreamType::Source => source_count += 1,
230                BackfillUpstreamType::Values => (),
231                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
232            }
233        }
234
235        let mv_progress = (mv_count > 0).then_some({
236            // Include buffered rows in total for precise progress calculation
237            // Progress = consumed / (upstream_total + buffered)
238            let total_rows_to_consume =
239                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
240            if total_rows_to_consume == 0 {
241                "99.99%".to_owned()
242            } else {
243                let mut progress =
244                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
245                if progress > 1.0 {
246                    progress = 0.9999;
247                }
248                format!(
249                    "{:.2}% ({}/{})",
250                    progress * 100.0,
251                    self.mv_backfill_consumed_rows,
252                    total_rows_to_consume
253                )
254            }
255        });
256        let source_progress = (source_count > 0).then_some(format!(
257            "{} rows consumed",
258            self.source_backfill_consumed_rows
259        ));
260        match (mv_progress, source_progress) {
261            (Some(mv_progress), Some(source_progress)) => {
262                format!(
263                    "MView Backfill: {}, Source Backfill: {}",
264                    mv_progress, source_progress
265                )
266            }
267            (Some(mv_progress), None) => mv_progress,
268            (None, Some(source_progress)) => source_progress,
269            (None, None) => "Unknown".to_owned(),
270        }
271    }
272}
273
274/// There are two kinds of `TrackingJobs`:
275/// 1. if `is_recovered` is false, it is a "New" tracking job.
276///    It is instantiated and managed by the stream manager.
277///    On recovery, the stream manager will stop managing the job.
278/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
279///    On recovery, the barrier manager will recover and start managing the job.
280pub struct TrackingJob {
281    job_id: JobId,
282    is_recovered: bool,
283    source_change: Option<SourceChange>,
284}
285
286impl std::fmt::Display for TrackingJob {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        write!(
289            f,
290            "{}{}",
291            self.job_id,
292            if self.is_recovered { "<recovered>" } else { "" }
293        )
294    }
295}
296
297impl TrackingJob {
298    /// Create a new tracking job.
299    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
300        Self {
301            job_id: stream_job_fragments.stream_job_id,
302            is_recovered: false,
303            source_change: Some(SourceChange::CreateJobFinished {
304                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
305            }),
306        }
307    }
308
309    /// Create a recovered tracking job.
310    pub(crate) fn recovered(
311        job_id: JobId,
312        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
313    ) -> Self {
314        let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
315            fragment_infos
316                .iter()
317                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
318        );
319        let source_change = if source_backfill_fragments.is_empty() {
320            None
321        } else {
322            Some(SourceChange::CreateJobFinished {
323                finished_backfill_fragments: source_backfill_fragments,
324            })
325        };
326        Self {
327            job_id,
328            is_recovered: true,
329            source_change,
330        }
331    }
332
333    pub(crate) fn job_id(&self) -> JobId {
334        self.job_id
335    }
336
337    /// Notify the metadata manager that the job is finished.
338    pub(crate) async fn finish(
339        self,
340        metadata_manager: &MetadataManager,
341        source_manager: &SourceManagerRef,
342    ) -> MetaResult<()> {
343        metadata_manager
344            .catalog_controller
345            .finish_streaming_job(self.job_id)
346            .await?;
347        if let Some(source_change) = self.source_change {
348            source_manager.apply_source_change(source_change).await;
349        }
350        Ok(())
351    }
352}
353
354impl std::fmt::Debug for TrackingJob {
355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356        if !self.is_recovered {
357            write!(f, "TrackingJob::New({})", self.job_id)
358        } else {
359            write!(f, "TrackingJob::Recovered({})", self.job_id)
360        }
361    }
362}
363
364/// Information collected during barrier completion that needs to be committed.
365#[derive(Debug, Default)]
366pub(super) struct StagingCommitInfo {
367    /// Finished jobs that should be committed
368    pub finished_jobs: Vec<TrackingJob>,
369    /// Table IDs whose locality provider state tables need to be truncated
370    pub table_ids_to_truncate: Vec<TableId>,
371    pub finished_cdc_table_backfill: Vec<JobId>,
372}
373
374pub(super) enum UpdateProgressResult {
375    None,
376    /// The finished job, along with its pending backfill fragments for cleanup.
377    Finished {
378        truncate_locality_provider_state_tables: Vec<TableId>,
379    },
380    /// Backfill nodes have finished and new ones need to be scheduled.
381    BackfillNodeFinished(PendingBackfillFragments),
382}
383
384#[derive(Debug)]
385pub(super) struct CreateMviewProgressTracker {
386    tracking_job: TrackingJob,
387    status: CreateMviewStatus,
388}
389
390#[derive(Debug)]
391enum CreateMviewStatus {
392    Backfilling {
393        /// Progress of the create-mview DDL.
394        progress: Progress,
395
396        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
397        pending_backfill_nodes: Vec<FragmentId>,
398
399        /// Table IDs whose locality provider state tables need to be truncated
400        table_ids_to_truncate: Vec<TableId>,
401    },
402    Finished {
403        table_ids_to_truncate: Vec<TableId>,
404    },
405}
406
407impl CreateMviewProgressTracker {
408    pub fn recover(
409        creating_job_id: JobId,
410        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
411        backfill_order_state: BackfillOrderState,
412        version_stats: &HummockVersionStats,
413    ) -> Self {
414        {
415            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
416            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
417            let status = if actors.is_empty() {
418                CreateMviewStatus::Finished {
419                    table_ids_to_truncate: vec![],
420                }
421            } else {
422                let mut states = HashMap::new();
423                let mut backfill_upstream_types = HashMap::new();
424
425                for (actor, backfill_upstream_type) in actors {
426                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
427                    backfill_upstream_types.insert(actor, backfill_upstream_type);
428                }
429
430                let progress = Self::recover_progress(
431                    creating_job_id,
432                    states,
433                    backfill_upstream_types,
434                    StreamJobFragments::upstream_table_counts_impl(
435                        fragment_infos.values().map(|fragment| &fragment.nodes),
436                    ),
437                    version_stats,
438                    backfill_order_state,
439                );
440                let pending_backfill_nodes = progress
441                    .backfill_order_state
442                    .current_backfill_node_fragment_ids();
443                CreateMviewStatus::Backfilling {
444                    progress,
445                    pending_backfill_nodes,
446                    table_ids_to_truncate: vec![],
447                }
448            };
449            Self {
450                tracking_job,
451                status,
452            }
453        }
454    }
455
456    /// ## How recovery works
457    ///
458    /// The progress (number of rows consumed) is persisted in state tables.
459    /// During recovery, the backfill executor will restore the number of rows consumed,
460    /// and then it will just report progress like newly created executors.
461    fn recover_progress(
462        job_id: JobId,
463        states: HashMap<ActorId, BackfillState>,
464        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
465        upstream_mv_count: HashMap<TableId, usize>,
466        version_stats: &HummockVersionStats,
467        backfill_order_state: BackfillOrderState,
468    ) -> Progress {
469        let upstream_mvs_total_key_count =
470            calculate_total_key_count(&upstream_mv_count, version_stats);
471        Progress {
472            job_id,
473            states,
474            backfill_order_state,
475            backfill_upstream_types,
476            done_count: 0, // Fill only after first barrier pass
477            upstream_mv_count,
478            upstream_mvs_total_key_count,
479            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
480            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
481            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
482        }
483    }
484
485    pub fn gen_backfill_progress(&self) -> String {
486        match &self.status {
487            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
488            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
489        }
490    }
491
492    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
493    /// Return the table ids whose locality provider state tables need to be truncated.
494    pub(super) fn apply_progress(
495        &mut self,
496        create_mview_progress: &CreateMviewProgress,
497        version_stats: &HummockVersionStats,
498    ) {
499        let CreateMviewStatus::Backfilling {
500            progress,
501            pending_backfill_nodes,
502            table_ids_to_truncate,
503        } = &mut self.status
504        else {
505            tracing::warn!(
506                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
507            );
508            return;
509        };
510        {
511            // Update the progress of all commands.
512            {
513                // Those with actors complete can be finished immediately.
514                match progress.apply(create_mview_progress, version_stats) {
515                    UpdateProgressResult::None => {
516                        tracing::trace!(?progress, "update progress");
517                    }
518                    UpdateProgressResult::Finished {
519                        truncate_locality_provider_state_tables,
520                    } => {
521                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
522                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
523                        tracing::trace!(?progress, "finish progress");
524                        self.status = CreateMviewStatus::Finished {
525                            table_ids_to_truncate,
526                        };
527                    }
528                    UpdateProgressResult::BackfillNodeFinished(pending) => {
529                        table_ids_to_truncate
530                            .extend(pending.truncate_locality_provider_state_tables.clone());
531                        tracing::trace!(
532                            ?progress,
533                            next_backfill_nodes = ?pending.next_backfill_nodes,
534                            "start next backfill node"
535                        );
536                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
537                    }
538                }
539            }
540        }
541    }
542
543    /// Refresh tracker state after reschedule so new actors can report progress correctly.
544    pub fn refresh_after_reschedule(
545        &mut self,
546        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
547        version_stats: &HummockVersionStats,
548    ) {
549        let CreateMviewStatus::Backfilling {
550            progress,
551            pending_backfill_nodes,
552            ..
553        } = &mut self.status
554        else {
555            return;
556        };
557
558        let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
559            fragment_infos
560                .values()
561                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
562        );
563
564        #[cfg(debug_assertions)]
565        {
566            use std::collections::HashSet;
567            let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
568            let new_actor_ids: HashSet<_> = new_tracking_actors
569                .iter()
570                .map(|(actor_id, _)| *actor_id)
571                .collect();
572            debug_assert!(
573                old_actor_ids.is_disjoint(&new_actor_ids),
574                "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
575            );
576        }
577
578        let mut new_states = HashMap::new();
579        let mut new_backfill_types = HashMap::new();
580        for (actor_id, upstream_type) in new_tracking_actors {
581            new_states.insert(actor_id, BackfillState::Init);
582            new_backfill_types.insert(actor_id, upstream_type);
583        }
584
585        let fragment_actors: HashMap<_, _> = fragment_infos
586            .iter()
587            .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
588            .collect();
589
590        let newly_scheduled = progress
591            .backfill_order_state
592            .refresh_actors(&fragment_actors);
593
594        progress.backfill_upstream_types = new_backfill_types;
595        progress.states = new_states;
596        progress.done_count = 0;
597
598        progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
599            fragment_infos.values().map(|fragment| &fragment.nodes),
600        );
601        progress.upstream_mvs_total_key_count =
602            calculate_total_key_count(&progress.upstream_mv_count, version_stats);
603
604        progress.mv_backfill_consumed_rows = 0;
605        progress.source_backfill_consumed_rows = 0;
606        progress.mv_backfill_buffered_rows = 0;
607
608        let mut pending = progress
609            .backfill_order_state
610            .current_backfill_node_fragment_ids();
611        pending.extend(newly_scheduled);
612        pending.sort_unstable();
613        pending.dedup();
614        *pending_backfill_nodes = pending;
615    }
616
617    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
618        match &mut self.status {
619            CreateMviewStatus::Backfilling {
620                pending_backfill_nodes,
621                ..
622            } => Some(pending_backfill_nodes.drain(..)),
623            CreateMviewStatus::Finished { .. } => None,
624        }
625        .into_iter()
626        .flatten()
627    }
628
629    pub(super) fn collect_staging_commit_info(
630        &mut self,
631    ) -> (bool, impl Iterator<Item = TableId> + '_) {
632        let (is_finished, table_ids) = match &mut self.status {
633            CreateMviewStatus::Backfilling {
634                table_ids_to_truncate,
635                ..
636            } => (false, table_ids_to_truncate),
637            CreateMviewStatus::Finished {
638                table_ids_to_truncate,
639                ..
640            } => (true, table_ids_to_truncate),
641        };
642        (is_finished, table_ids.drain(..))
643    }
644
645    pub(super) fn is_finished(&self) -> bool {
646        matches!(self.status, CreateMviewStatus::Finished { .. })
647    }
648
649    pub(super) fn into_tracking_job(self) -> TrackingJob {
650        let CreateMviewStatus::Finished { .. } = self.status else {
651            panic!("should be called when finished");
652        };
653        self.tracking_job
654    }
655
656    /// Add a new create-mview DDL command to track.
657    ///
658    /// If the actors to track are empty, return the given command as it can be finished immediately.
659    pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
660        tracing::trace!(?info, "add job to track");
661        let CreateStreamingJobCommandInfo {
662            stream_job_fragments,
663            fragment_backfill_ordering,
664            locality_fragment_state_table_mapping,
665            ..
666        } = info;
667        let job_id = stream_job_fragments.stream_job_id();
668        let actors = stream_job_fragments.tracking_progress_actor_ids();
669        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
670        if actors.is_empty() {
671            // The command can be finished immediately.
672            return Self {
673                tracking_job,
674                status: CreateMviewStatus::Finished {
675                    table_ids_to_truncate: vec![],
676                },
677            };
678        }
679
680        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
681        let upstream_total_key_count: u64 =
682            calculate_total_key_count(&upstream_mv_count, version_stats);
683
684        let backfill_order_state = BackfillOrderState::new(
685            fragment_backfill_ordering,
686            stream_job_fragments,
687            locality_fragment_state_table_mapping.clone(),
688        );
689        let progress = Progress::new(
690            job_id,
691            actors,
692            upstream_mv_count,
693            upstream_total_key_count,
694            backfill_order_state,
695        );
696        let pending_backfill_nodes = progress
697            .backfill_order_state
698            .current_backfill_node_fragment_ids();
699        Self {
700            tracking_job,
701            status: CreateMviewStatus::Backfilling {
702                progress,
703                pending_backfill_nodes,
704                table_ids_to_truncate: vec![],
705            },
706        }
707    }
708}
709
710impl Progress {
711    /// Update the progress of `actor` according to the Pb struct.
712    ///
713    /// If all actors in this MV have finished, return the command.
714    fn apply(
715        &mut self,
716        progress: &CreateMviewProgress,
717        version_stats: &HummockVersionStats,
718    ) -> UpdateProgressResult {
719        tracing::trace!(?progress, "update progress");
720        let actor = progress.backfill_actor_id;
721        let job_id = self.job_id;
722
723        let new_state = if progress.done {
724            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
725        } else {
726            BackfillState::ConsumingUpstream(
727                progress.consumed_epoch.into(),
728                progress.consumed_rows,
729                progress.buffered_rows,
730            )
731        };
732
733        {
734            {
735                let progress_state = self;
736
737                let upstream_total_key_count: u64 =
738                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
739
740                tracing::trace!(%job_id, "updating progress for table");
741                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
742
743                if progress_state.is_done() {
744                    tracing::debug!(
745                        %job_id,
746                        "all actors done for creating mview!",
747                    );
748
749                    let PendingBackfillFragments {
750                        next_backfill_nodes,
751                        truncate_locality_provider_state_tables,
752                    } = pending;
753
754                    assert!(next_backfill_nodes.is_empty());
755                    UpdateProgressResult::Finished {
756                        truncate_locality_provider_state_tables,
757                    }
758                } else if !pending.next_backfill_nodes.is_empty()
759                    || !pending.truncate_locality_provider_state_tables.is_empty()
760                {
761                    UpdateProgressResult::BackfillNodeFinished(pending)
762                } else {
763                    UpdateProgressResult::None
764                }
765            }
766        }
767    }
768}
769
770fn calculate_total_key_count(
771    table_count: &HashMap<TableId, usize>,
772    version_stats: &HummockVersionStats,
773) -> u64 {
774    table_count
775        .iter()
776        .map(|(table_id, count)| {
777            assert_ne!(*count, 0);
778            *count as u64
779                * version_stats
780                    .table_stats
781                    .get(table_id)
782                    .map_or(0, |stat| stat.total_key_count as u64)
783        })
784        .sum()
785}
786
787#[cfg(test)]
788mod tests {
789    use std::collections::HashSet;
790
791    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
792    use risingwave_common::id::WorkerId;
793    use risingwave_meta_model::fragment::DistributionType;
794    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
795
796    use super::*;
797    use crate::controller::fragment::InflightActorInfo;
798
799    fn sample_inflight_fragment(
800        fragment_id: FragmentId,
801        actor_ids: &[ActorId],
802        flag: FragmentTypeFlag,
803    ) -> InflightFragmentInfo {
804        let mut fragment_type_mask = FragmentTypeMask::empty();
805        fragment_type_mask.add(flag);
806        InflightFragmentInfo {
807            fragment_id,
808            distribution_type: DistributionType::Single,
809            fragment_type_mask,
810            vnode_count: 0,
811            nodes: PbStreamNode::default(),
812            actors: actor_ids
813                .iter()
814                .map(|actor_id| {
815                    (
816                        *actor_id,
817                        InflightActorInfo {
818                            worker_id: WorkerId::new(1),
819                            vnode_bitmap: None,
820                            splits: vec![],
821                        },
822                    )
823                })
824                .collect(),
825            state_table_ids: HashSet::new(),
826        }
827    }
828
829    fn sample_progress(actor_id: ActorId) -> Progress {
830        Progress {
831            job_id: JobId::new(1),
832            states: HashMap::from([(actor_id, BackfillState::Init)]),
833            backfill_order_state: BackfillOrderState::default(),
834            done_count: 0,
835            backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
836            upstream_mv_count: HashMap::new(),
837            upstream_mvs_total_key_count: 0,
838            mv_backfill_consumed_rows: 0,
839            source_backfill_consumed_rows: 0,
840            mv_backfill_buffered_rows: 0,
841        }
842    }
843
844    #[test]
845    fn update_ignores_unknown_actor() {
846        let actor_known = ActorId::new(1);
847        let actor_unknown = ActorId::new(2);
848        let mut progress = sample_progress(actor_known);
849
850        let pending = progress.update(
851            actor_unknown,
852            BackfillState::Done(0, 0),
853            progress.upstream_mvs_total_key_count,
854        );
855
856        assert!(pending.next_backfill_nodes.is_empty());
857        assert_eq!(progress.states.len(), 1);
858        assert!(progress.states.contains_key(&actor_known));
859    }
860
861    #[test]
862    fn refresh_rebuilds_tracking_after_reschedule() {
863        let actor_old = ActorId::new(1);
864        let actor_new = ActorId::new(2);
865
866        let progress = Progress {
867            job_id: JobId::new(1),
868            states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
869            backfill_order_state: BackfillOrderState::default(),
870            done_count: 1,
871            backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
872            upstream_mv_count: HashMap::new(),
873            upstream_mvs_total_key_count: 0,
874            mv_backfill_consumed_rows: 5,
875            source_backfill_consumed_rows: 0,
876            mv_backfill_buffered_rows: 0,
877        };
878
879        let mut tracker = CreateMviewProgressTracker {
880            tracking_job: TrackingJob {
881                job_id: JobId::new(1),
882                is_recovered: false,
883                source_change: None,
884            },
885            status: CreateMviewStatus::Backfilling {
886                progress,
887                pending_backfill_nodes: vec![],
888                table_ids_to_truncate: vec![],
889            },
890        };
891
892        let fragment_infos = HashMap::from([(
893            FragmentId::new(10),
894            sample_inflight_fragment(
895                FragmentId::new(10),
896                &[actor_new],
897                FragmentTypeFlag::StreamScan,
898            ),
899        )]);
900
901        tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
902
903        let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
904            panic!("expected backfilling status");
905        };
906        assert!(progress.states.contains_key(&actor_new));
907        assert!(!progress.states.contains_key(&actor_old));
908        assert_eq!(progress.done_count, 0);
909        assert_eq!(progress.mv_backfill_consumed_rows, 0);
910        assert_eq!(progress.source_backfill_consumed_rows, 0);
911    }
912}