risingwave_meta/barrier/
progress.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
23
24use crate::MetaResult;
25use crate::barrier::CreateStreamingJobCommandInfo;
26use crate::barrier::backfill_order_control::BackfillOrderState;
27use crate::barrier::info::InflightStreamingJobInfo;
28use crate::controller::fragment::InflightFragmentInfo;
29use crate::manager::MetadataManager;
30use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
31use crate::stream::{SourceChange, SourceManagerRef};
32
33type ConsumedRows = u64;
34type BufferedRows = u64;
35
36#[derive(Clone, Copy, Debug)]
37enum BackfillState {
38    Init,
39    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
40    Done(ConsumedRows, BufferedRows),
41}
42
43/// Represents the backfill nodes that need to be scheduled or cleaned up.
44#[derive(Debug, Default)]
45pub(super) struct PendingBackfillFragments {
46    /// Fragment IDs that should start backfilling in the next checkpoint
47    pub next_backfill_nodes: Vec<FragmentId>,
48    /// State tables of locality provider fragments that should be truncated
49    pub truncate_locality_provider_state_tables: Vec<TableId>,
50}
51
52/// Progress of all actors containing backfill executors while creating mview.
53#[derive(Debug)]
54pub(super) struct Progress {
55    job_id: JobId,
56    // `states` and `done_count` decides whether the progress is done. See `is_done`.
57    states: HashMap<ActorId, BackfillState>,
58    backfill_order_state: BackfillOrderState,
59    done_count: usize,
60
61    /// Tells whether the backfill is from source or mv.
62    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
63
64    // The following row counts are used to calculate the progress. See `calculate_progress`.
65    /// Upstream mv count.
66    /// Keep track of how many times each upstream MV
67    /// appears in this stream job.
68    upstream_mv_count: HashMap<TableId, usize>,
69    /// Total key count of all the upstream materialized views
70    upstream_mvs_total_key_count: u64,
71    mv_backfill_consumed_rows: u64,
72    source_backfill_consumed_rows: u64,
73    /// Buffered rows (for locality backfill) that are yet to be consumed
74    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
75    mv_backfill_buffered_rows: u64,
76}
77
78impl Progress {
79    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
80    fn new(
81        job_id: JobId,
82        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
83        upstream_mv_count: HashMap<TableId, usize>,
84        upstream_total_key_count: u64,
85        backfill_order_state: BackfillOrderState,
86    ) -> Self {
87        let mut states = HashMap::new();
88        let mut backfill_upstream_types = HashMap::new();
89        for (actor, backfill_upstream_type) in actors {
90            states.insert(actor, BackfillState::Init);
91            backfill_upstream_types.insert(actor, backfill_upstream_type);
92        }
93        assert!(!states.is_empty());
94
95        Self {
96            job_id,
97            states,
98            backfill_upstream_types,
99            done_count: 0,
100            upstream_mv_count,
101            upstream_mvs_total_key_count: upstream_total_key_count,
102            mv_backfill_consumed_rows: 0,
103            source_backfill_consumed_rows: 0,
104            mv_backfill_buffered_rows: 0,
105            backfill_order_state,
106        }
107    }
108
109    /// Update the progress of `actor`.
110    /// Returns the backfill fragments that need to be scheduled or cleaned up.
111    fn update(
112        &mut self,
113        actor: ActorId,
114        new_state: BackfillState,
115        upstream_total_key_count: u64,
116    ) -> PendingBackfillFragments {
117        let mut result = PendingBackfillFragments::default();
118        self.upstream_mvs_total_key_count = upstream_total_key_count;
119        let total_actors = self.states.len();
120        let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
121            tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
122            return result;
123        };
124
125        let mut old_consumed_row = 0;
126        let mut new_consumed_row = 0;
127        let mut old_buffered_row = 0;
128        let mut new_buffered_row = 0;
129        let Some(prev_state) = self.states.remove(&actor) else {
130            tracing::warn!(%actor, "receive progress for actor not in state map");
131            return result;
132        };
133        match prev_state {
134            BackfillState::Init => {}
135            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
136                old_consumed_row = consumed_rows;
137                old_buffered_row = buffered_rows;
138            }
139            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
140        };
141        match &new_state {
142            BackfillState::Init => {}
143            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
144                new_consumed_row = *consumed_rows;
145                new_buffered_row = *buffered_rows;
146            }
147            BackfillState::Done(consumed_rows, buffered_rows) => {
148                tracing::debug!("actor {} done", actor);
149                new_consumed_row = *consumed_rows;
150                new_buffered_row = *buffered_rows;
151                self.done_count += 1;
152                let before_backfill_nodes = self
153                    .backfill_order_state
154                    .current_backfill_node_fragment_ids();
155                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
156                let after_backfill_nodes = self
157                    .backfill_order_state
158                    .current_backfill_node_fragment_ids();
159                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
160                let last_backfill_nodes_iter = before_backfill_nodes
161                    .into_iter()
162                    .filter(|x| !after_backfill_nodes.contains(x));
163                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
164                    .filter_map(|fragment_id| {
165                        self.backfill_order_state
166                            .get_locality_fragment_state_table_mapping()
167                            .get(&fragment_id)
168                    })
169                    .flatten()
170                    .copied()
171                    .collect();
172                tracing::debug!(
173                    "{} actors out of {} complete",
174                    self.done_count,
175                    total_actors,
176                );
177            }
178        };
179        debug_assert!(
180            new_consumed_row >= old_consumed_row,
181            "backfill progress should not go backward"
182        );
183        debug_assert!(
184            new_buffered_row >= old_buffered_row,
185            "backfill progress should not go backward"
186        );
187        match backfill_upstream_type {
188            BackfillUpstreamType::MView => {
189                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
190            }
191            BackfillUpstreamType::Source => {
192                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
193            }
194            BackfillUpstreamType::Values => {
195                // do not consider progress for values
196            }
197            BackfillUpstreamType::LocalityProvider => {
198                // Track LocalityProvider progress similar to MView
199                // Update buffered rows for precise progress calculation
200                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
201                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
202            }
203        }
204        self.states.insert(actor, new_state);
205        result
206    }
207
208    /// Returns whether all backfill executors are done.
209    fn is_done(&self) -> bool {
210        tracing::trace!(
211            "Progress::is_done? {}, {}, {:?}",
212            self.done_count,
213            self.states.len(),
214            self.states
215        );
216        self.done_count == self.states.len()
217    }
218
219    /// `progress` = `consumed_rows` / `upstream_total_key_count`
220    fn calculate_progress(&self) -> String {
221        if self.is_done() || self.states.is_empty() {
222            return "100%".to_owned();
223        }
224        let mut mv_count = 0;
225        let mut source_count = 0;
226        for backfill_upstream_type in self.backfill_upstream_types.values() {
227            match backfill_upstream_type {
228                BackfillUpstreamType::MView => mv_count += 1,
229                BackfillUpstreamType::Source => source_count += 1,
230                BackfillUpstreamType::Values => (),
231                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
232            }
233        }
234
235        let mv_progress = (mv_count > 0).then_some({
236            // Include buffered rows in total for precise progress calculation
237            // Progress = consumed / (upstream_total + buffered)
238            let total_rows_to_consume =
239                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
240            if total_rows_to_consume == 0 {
241                "99.99%".to_owned()
242            } else {
243                let mut progress =
244                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
245                if progress > 1.0 {
246                    progress = 0.9999;
247                }
248                format!(
249                    "{:.2}% ({}/{})",
250                    progress * 100.0,
251                    self.mv_backfill_consumed_rows,
252                    total_rows_to_consume
253                )
254            }
255        });
256        let source_progress = (source_count > 0).then_some(format!(
257            "{} rows consumed",
258            self.source_backfill_consumed_rows
259        ));
260        match (mv_progress, source_progress) {
261            (Some(mv_progress), Some(source_progress)) => {
262                format!(
263                    "MView Backfill: {}, Source Backfill: {}",
264                    mv_progress, source_progress
265                )
266            }
267            (Some(mv_progress), None) => mv_progress,
268            (None, Some(source_progress)) => source_progress,
269            (None, None) => "Unknown".to_owned(),
270        }
271    }
272}
273
274/// There are two kinds of `TrackingJobs`:
275/// 1. if `is_recovered` is false, it is a "New" tracking job.
276///    It is instantiated and managed by the stream manager.
277///    On recovery, the stream manager will stop managing the job.
278/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
279///    On recovery, the barrier manager will recover and start managing the job.
280pub struct TrackingJob {
281    job_id: JobId,
282    is_recovered: bool,
283    source_change: Option<SourceChange>,
284}
285
286impl std::fmt::Display for TrackingJob {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        write!(
289            f,
290            "{}{}",
291            self.job_id,
292            if self.is_recovered { "<recovered>" } else { "" }
293        )
294    }
295}
296
297impl TrackingJob {
298    /// Create a new tracking job.
299    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
300        Self {
301            job_id: stream_job_fragments.stream_job_id,
302            is_recovered: false,
303            source_change: Some(SourceChange::CreateJobFinished {
304                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
305            }),
306        }
307    }
308
309    /// Create a recovered tracking job.
310    pub(crate) fn recovered(
311        job_id: JobId,
312        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
313    ) -> Self {
314        let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
315            fragment_infos
316                .iter()
317                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
318        );
319        let source_change = if source_backfill_fragments.is_empty() {
320            None
321        } else {
322            Some(SourceChange::CreateJobFinished {
323                finished_backfill_fragments: source_backfill_fragments,
324            })
325        };
326        Self {
327            job_id,
328            is_recovered: true,
329            source_change,
330        }
331    }
332
333    pub(crate) fn job_id(&self) -> JobId {
334        self.job_id
335    }
336
337    /// Notify the metadata manager that the job is finished.
338    pub(crate) async fn finish(
339        self,
340        metadata_manager: &MetadataManager,
341        source_manager: &SourceManagerRef,
342    ) -> MetaResult<()> {
343        metadata_manager
344            .catalog_controller
345            .finish_streaming_job(self.job_id)
346            .await?;
347        if let Some(source_change) = self.source_change {
348            source_manager.apply_source_change(source_change).await;
349        }
350        Ok(())
351    }
352}
353
354impl std::fmt::Debug for TrackingJob {
355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356        if !self.is_recovered {
357            write!(f, "TrackingJob::New({})", self.job_id)
358        } else {
359            write!(f, "TrackingJob::Recovered({})", self.job_id)
360        }
361    }
362}
363
364/// Information collected during barrier completion that needs to be committed.
365#[derive(Debug, Default)]
366pub(super) struct StagingCommitInfo {
367    /// Finished jobs that should be committed
368    pub finished_jobs: Vec<TrackingJob>,
369    /// Table IDs whose locality provider state tables need to be truncated
370    pub table_ids_to_truncate: Vec<TableId>,
371    pub finished_cdc_table_backfill: Vec<JobId>,
372}
373
374pub(super) enum UpdateProgressResult {
375    None,
376    /// The finished job, along with its pending backfill fragments for cleanup.
377    Finished {
378        truncate_locality_provider_state_tables: Vec<TableId>,
379    },
380    /// Backfill nodes have finished and new ones need to be scheduled.
381    BackfillNodeFinished(PendingBackfillFragments),
382}
383
384#[derive(Debug)]
385pub(super) struct CreateMviewProgressTracker {
386    tracking_job: TrackingJob,
387    status: CreateMviewStatus,
388}
389
390#[derive(Debug)]
391enum CreateMviewStatus {
392    Backfilling {
393        /// Progress of the create-mview DDL.
394        progress: Progress,
395
396        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
397        pending_backfill_nodes: Vec<FragmentId>,
398
399        /// Table IDs whose locality provider state tables need to be truncated
400        table_ids_to_truncate: Vec<TableId>,
401    },
402    CdcSourceInit,
403    Finished {
404        table_ids_to_truncate: Vec<TableId>,
405    },
406}
407
408impl CreateMviewProgressTracker {
409    pub fn recover(
410        creating_job_id: JobId,
411        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
412        backfill_order_state: BackfillOrderState,
413        version_stats: &HummockVersionStats,
414    ) -> Self {
415        {
416            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
417            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
418            let status = if actors.is_empty() {
419                CreateMviewStatus::Finished {
420                    table_ids_to_truncate: vec![],
421                }
422            } else {
423                let mut states = HashMap::new();
424                let mut backfill_upstream_types = HashMap::new();
425
426                for (actor, backfill_upstream_type) in actors {
427                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
428                    backfill_upstream_types.insert(actor, backfill_upstream_type);
429                }
430
431                let progress = Self::recover_progress(
432                    creating_job_id,
433                    states,
434                    backfill_upstream_types,
435                    StreamJobFragments::upstream_table_counts_impl(
436                        fragment_infos.values().map(|fragment| &fragment.nodes),
437                    ),
438                    version_stats,
439                    backfill_order_state,
440                );
441                let pending_backfill_nodes = progress
442                    .backfill_order_state
443                    .current_backfill_node_fragment_ids();
444                CreateMviewStatus::Backfilling {
445                    progress,
446                    pending_backfill_nodes,
447                    table_ids_to_truncate: vec![],
448                }
449            };
450            Self {
451                tracking_job,
452                status,
453            }
454        }
455    }
456
457    /// ## How recovery works
458    ///
459    /// The progress (number of rows consumed) is persisted in state tables.
460    /// During recovery, the backfill executor will restore the number of rows consumed,
461    /// and then it will just report progress like newly created executors.
462    fn recover_progress(
463        job_id: JobId,
464        states: HashMap<ActorId, BackfillState>,
465        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
466        upstream_mv_count: HashMap<TableId, usize>,
467        version_stats: &HummockVersionStats,
468        backfill_order_state: BackfillOrderState,
469    ) -> Progress {
470        let upstream_mvs_total_key_count =
471            calculate_total_key_count(&upstream_mv_count, version_stats);
472        Progress {
473            job_id,
474            states,
475            backfill_order_state,
476            backfill_upstream_types,
477            done_count: 0, // Fill only after first barrier pass
478            upstream_mv_count,
479            upstream_mvs_total_key_count,
480            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
481            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
482            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
483        }
484    }
485
486    pub fn gen_backfill_progress(&self) -> String {
487        match &self.status {
488            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
489            CreateMviewStatus::CdcSourceInit => "Initializing CDC source...".to_owned(),
490            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
491        }
492    }
493
494    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
495    /// Return the table ids whose locality provider state tables need to be truncated.
496    pub(super) fn apply_progress(
497        &mut self,
498        create_mview_progress: &CreateMviewProgress,
499        version_stats: &HummockVersionStats,
500    ) {
501        let CreateMviewStatus::Backfilling {
502            progress,
503            pending_backfill_nodes,
504            table_ids_to_truncate,
505        } = &mut self.status
506        else {
507            tracing::warn!(
508                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
509            );
510            return;
511        };
512        {
513            // Update the progress of all commands.
514            {
515                // Those with actors complete can be finished immediately.
516                match progress.apply(create_mview_progress, version_stats) {
517                    UpdateProgressResult::None => {
518                        tracing::trace!(?progress, "update progress");
519                    }
520                    UpdateProgressResult::Finished {
521                        truncate_locality_provider_state_tables,
522                    } => {
523                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
524                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
525                        tracing::trace!(?progress, "finish progress");
526                        self.status = CreateMviewStatus::Finished {
527                            table_ids_to_truncate,
528                        };
529                    }
530                    UpdateProgressResult::BackfillNodeFinished(pending) => {
531                        table_ids_to_truncate
532                            .extend(pending.truncate_locality_provider_state_tables.clone());
533                        tracing::trace!(
534                            ?progress,
535                            next_backfill_nodes = ?pending.next_backfill_nodes,
536                            "start next backfill node"
537                        );
538                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
539                    }
540                }
541            }
542        }
543    }
544
545    /// Refresh tracker state after reschedule so new actors can report progress correctly.
546    pub fn refresh_after_reschedule(
547        &mut self,
548        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
549        version_stats: &HummockVersionStats,
550    ) {
551        let CreateMviewStatus::Backfilling {
552            progress,
553            pending_backfill_nodes,
554            ..
555        } = &mut self.status
556        else {
557            return;
558        };
559
560        let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
561            fragment_infos
562                .values()
563                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
564        );
565
566        #[cfg(debug_assertions)]
567        {
568            use std::collections::HashSet;
569            let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
570            let new_actor_ids: HashSet<_> = new_tracking_actors
571                .iter()
572                .map(|(actor_id, _)| *actor_id)
573                .collect();
574            debug_assert!(
575                old_actor_ids.is_disjoint(&new_actor_ids),
576                "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
577            );
578        }
579
580        let mut new_states = HashMap::new();
581        let mut new_backfill_types = HashMap::new();
582        for (actor_id, upstream_type) in new_tracking_actors {
583            new_states.insert(actor_id, BackfillState::Init);
584            new_backfill_types.insert(actor_id, upstream_type);
585        }
586
587        let fragment_actors: HashMap<_, _> = fragment_infos
588            .iter()
589            .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
590            .collect();
591
592        let newly_scheduled = progress
593            .backfill_order_state
594            .refresh_actors(&fragment_actors);
595
596        progress.backfill_upstream_types = new_backfill_types;
597        progress.states = new_states;
598        progress.done_count = 0;
599
600        progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
601            fragment_infos.values().map(|fragment| &fragment.nodes),
602        );
603        progress.upstream_mvs_total_key_count =
604            calculate_total_key_count(&progress.upstream_mv_count, version_stats);
605
606        progress.mv_backfill_consumed_rows = 0;
607        progress.source_backfill_consumed_rows = 0;
608        progress.mv_backfill_buffered_rows = 0;
609
610        let mut pending = progress
611            .backfill_order_state
612            .current_backfill_node_fragment_ids();
613        pending.extend(newly_scheduled);
614        pending.sort_unstable();
615        pending.dedup();
616        *pending_backfill_nodes = pending;
617    }
618
619    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
620        match &mut self.status {
621            CreateMviewStatus::Backfilling {
622                pending_backfill_nodes,
623                ..
624            } => Some(pending_backfill_nodes.drain(..)),
625            CreateMviewStatus::CdcSourceInit => None,
626            CreateMviewStatus::Finished { .. } => None,
627        }
628        .into_iter()
629        .flatten()
630    }
631
632    pub(super) fn collect_staging_commit_info(
633        &mut self,
634    ) -> (bool, Box<dyn Iterator<Item = TableId> + '_>) {
635        match &mut self.status {
636            CreateMviewStatus::Backfilling {
637                table_ids_to_truncate,
638                ..
639            } => (false, Box::new(table_ids_to_truncate.drain(..))),
640            CreateMviewStatus::CdcSourceInit => (false, Box::new(std::iter::empty())),
641            CreateMviewStatus::Finished {
642                table_ids_to_truncate,
643                ..
644            } => (true, Box::new(table_ids_to_truncate.drain(..))),
645        }
646    }
647
648    pub(super) fn is_finished(&self) -> bool {
649        matches!(self.status, CreateMviewStatus::Finished { .. })
650    }
651
652    /// Mark CDC source as finished when offset is updated.
653    pub(super) fn mark_cdc_source_finished(&mut self) {
654        if matches!(self.status, CreateMviewStatus::CdcSourceInit) {
655            self.status = CreateMviewStatus::Finished {
656                table_ids_to_truncate: vec![],
657            };
658        }
659    }
660
661    pub(super) fn into_tracking_job(self) -> TrackingJob {
662        let CreateMviewStatus::Finished { .. } = self.status else {
663            panic!("should be called when finished");
664        };
665        self.tracking_job
666    }
667
668    /// Add a new create-mview DDL command to track.
669    ///
670    /// If the actors to track are empty, return the given command as it can be finished immediately.
671    /// For CDC sources, mark as `CdcSourceInit` instead of Finished.
672    pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
673        tracing::trace!(?info, "add job to track");
674        let CreateStreamingJobCommandInfo {
675            stream_job_fragments,
676            fragment_backfill_ordering,
677            locality_fragment_state_table_mapping,
678            streaming_job,
679            ..
680        } = info;
681        let job_id = stream_job_fragments.stream_job_id();
682        let actors = stream_job_fragments.tracking_progress_actor_ids();
683        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
684        if actors.is_empty() {
685            // NOTE: This CDC source detection uses hardcoded property checks and should be replaced
686            // with a more reliable identification method in the future.
687            let is_cdc_source = matches!(
688                streaming_job,
689                crate::manager::StreamingJob::Source(source)
690                    if source.info.as_ref().map(|info| info.is_shared()).unwrap_or(false) && source
691                    .get_with_properties()
692                    .get("connector")
693                    .map(|connector| connector.to_lowercase().contains("-cdc"))
694                    .unwrap_or(false)
695            );
696            if is_cdc_source {
697                // Mark CDC source as CdcSourceInit, will be finished when offset is updated
698                return Self {
699                    tracking_job,
700                    status: CreateMviewStatus::CdcSourceInit,
701                };
702            }
703            // The command can be finished immediately.
704            return Self {
705                tracking_job,
706                status: CreateMviewStatus::Finished {
707                    table_ids_to_truncate: vec![],
708                },
709            };
710        }
711
712        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
713        let upstream_total_key_count: u64 =
714            calculate_total_key_count(&upstream_mv_count, version_stats);
715
716        let backfill_order_state = BackfillOrderState::new(
717            fragment_backfill_ordering,
718            stream_job_fragments,
719            locality_fragment_state_table_mapping.clone(),
720        );
721        let progress = Progress::new(
722            job_id,
723            actors,
724            upstream_mv_count,
725            upstream_total_key_count,
726            backfill_order_state,
727        );
728        let pending_backfill_nodes = progress
729            .backfill_order_state
730            .current_backfill_node_fragment_ids();
731        Self {
732            tracking_job,
733            status: CreateMviewStatus::Backfilling {
734                progress,
735                pending_backfill_nodes,
736                table_ids_to_truncate: vec![],
737            },
738        }
739    }
740}
741
742impl Progress {
743    /// Update the progress of `actor` according to the Pb struct.
744    ///
745    /// If all actors in this MV have finished, return the command.
746    fn apply(
747        &mut self,
748        progress: &CreateMviewProgress,
749        version_stats: &HummockVersionStats,
750    ) -> UpdateProgressResult {
751        tracing::trace!(?progress, "update progress");
752        let actor = progress.backfill_actor_id;
753        let job_id = self.job_id;
754
755        let new_state = if progress.done {
756            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
757        } else {
758            BackfillState::ConsumingUpstream(
759                progress.consumed_epoch.into(),
760                progress.consumed_rows,
761                progress.buffered_rows,
762            )
763        };
764
765        {
766            {
767                let progress_state = self;
768
769                let upstream_total_key_count: u64 =
770                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
771
772                tracing::trace!(%job_id, "updating progress for table");
773                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
774
775                if progress_state.is_done() {
776                    tracing::debug!(
777                        %job_id,
778                        "all actors done for creating mview!",
779                    );
780
781                    let PendingBackfillFragments {
782                        next_backfill_nodes,
783                        truncate_locality_provider_state_tables,
784                    } = pending;
785
786                    assert!(next_backfill_nodes.is_empty());
787                    UpdateProgressResult::Finished {
788                        truncate_locality_provider_state_tables,
789                    }
790                } else if !pending.next_backfill_nodes.is_empty()
791                    || !pending.truncate_locality_provider_state_tables.is_empty()
792                {
793                    UpdateProgressResult::BackfillNodeFinished(pending)
794                } else {
795                    UpdateProgressResult::None
796                }
797            }
798        }
799    }
800}
801
802fn calculate_total_key_count(
803    table_count: &HashMap<TableId, usize>,
804    version_stats: &HummockVersionStats,
805) -> u64 {
806    table_count
807        .iter()
808        .map(|(table_id, count)| {
809            assert_ne!(*count, 0);
810            *count as u64
811                * version_stats
812                    .table_stats
813                    .get(table_id)
814                    .map_or(0, |stat| stat.total_key_count as u64)
815        })
816        .sum()
817}
818
819#[cfg(test)]
820mod tests {
821    use std::collections::HashSet;
822
823    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
824    use risingwave_common::id::WorkerId;
825    use risingwave_meta_model::fragment::DistributionType;
826    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
827
828    use super::*;
829    use crate::controller::fragment::InflightActorInfo;
830
831    fn sample_inflight_fragment(
832        fragment_id: FragmentId,
833        actor_ids: &[ActorId],
834        flag: FragmentTypeFlag,
835    ) -> InflightFragmentInfo {
836        let mut fragment_type_mask = FragmentTypeMask::empty();
837        fragment_type_mask.add(flag);
838        InflightFragmentInfo {
839            fragment_id,
840            distribution_type: DistributionType::Single,
841            fragment_type_mask,
842            vnode_count: 0,
843            nodes: PbStreamNode::default(),
844            actors: actor_ids
845                .iter()
846                .map(|actor_id| {
847                    (
848                        *actor_id,
849                        InflightActorInfo {
850                            worker_id: WorkerId::new(1),
851                            vnode_bitmap: None,
852                            splits: vec![],
853                        },
854                    )
855                })
856                .collect(),
857            state_table_ids: HashSet::new(),
858        }
859    }
860
861    fn sample_progress(actor_id: ActorId) -> Progress {
862        Progress {
863            job_id: JobId::new(1),
864            states: HashMap::from([(actor_id, BackfillState::Init)]),
865            backfill_order_state: BackfillOrderState::default(),
866            done_count: 0,
867            backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
868            upstream_mv_count: HashMap::new(),
869            upstream_mvs_total_key_count: 0,
870            mv_backfill_consumed_rows: 0,
871            source_backfill_consumed_rows: 0,
872            mv_backfill_buffered_rows: 0,
873        }
874    }
875
876    #[test]
877    fn update_ignores_unknown_actor() {
878        let actor_known = ActorId::new(1);
879        let actor_unknown = ActorId::new(2);
880        let mut progress = sample_progress(actor_known);
881
882        let pending = progress.update(
883            actor_unknown,
884            BackfillState::Done(0, 0),
885            progress.upstream_mvs_total_key_count,
886        );
887
888        assert!(pending.next_backfill_nodes.is_empty());
889        assert_eq!(progress.states.len(), 1);
890        assert!(progress.states.contains_key(&actor_known));
891    }
892
893    #[test]
894    fn refresh_rebuilds_tracking_after_reschedule() {
895        let actor_old = ActorId::new(1);
896        let actor_new = ActorId::new(2);
897
898        let progress = Progress {
899            job_id: JobId::new(1),
900            states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
901            backfill_order_state: BackfillOrderState::default(),
902            done_count: 1,
903            backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
904            upstream_mv_count: HashMap::new(),
905            upstream_mvs_total_key_count: 0,
906            mv_backfill_consumed_rows: 5,
907            source_backfill_consumed_rows: 0,
908            mv_backfill_buffered_rows: 0,
909        };
910
911        let mut tracker = CreateMviewProgressTracker {
912            tracking_job: TrackingJob {
913                job_id: JobId::new(1),
914                is_recovered: false,
915                source_change: None,
916            },
917            status: CreateMviewStatus::Backfilling {
918                progress,
919                pending_backfill_nodes: vec![],
920                table_ids_to_truncate: vec![],
921            },
922        };
923
924        let fragment_infos = HashMap::from([(
925            FragmentId::new(10),
926            sample_inflight_fragment(
927                FragmentId::new(10),
928                &[actor_new],
929                FragmentTypeFlag::StreamScan,
930            ),
931        )]);
932
933        tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
934
935        let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
936            panic!("expected backfilling status");
937        };
938        assert!(progress.states.contains_key(&actor_new));
939        assert!(!progress.states.contains_key(&actor_old));
940        assert_eq!(progress.done_count, 0);
941        assert_eq!(progress.mv_backfill_consumed_rows, 0);
942        assert_eq!(progress.source_backfill_consumed_rows, 0);
943    }
944
945    // CDC sources should be initialized as CdcSourceInit
946    #[test]
947    fn test_cdc_source_initialized_as_cdc_source_init() {
948        use std::collections::BTreeMap;
949
950        use risingwave_pb::catalog::{CreateType, PbSource, StreamSourceInfo};
951
952        use crate::barrier::command::CreateStreamingJobCommandInfo;
953        use crate::manager::{StreamingJob, StreamingJobType};
954        use crate::model::StreamJobFragmentsToCreate;
955
956        // Create a CDC source with cdc_source_job = true
957        let source_info = StreamSourceInfo {
958            cdc_source_job: true,
959            ..Default::default()
960        };
961
962        let source = PbSource {
963            id: risingwave_common::id::SourceId::new(100),
964            info: Some(source_info),
965            with_properties: BTreeMap::from([("connector".to_owned(), "fake-cdc".to_owned())]),
966            ..Default::default()
967        };
968
969        // Create empty fragments (no actors to track)
970        let fragments = StreamJobFragments::for_test(JobId::new(100), BTreeMap::new());
971        let stream_job_fragments = StreamJobFragmentsToCreate {
972            inner: fragments,
973            downstreams: Default::default(),
974        };
975
976        let info = CreateStreamingJobCommandInfo {
977            stream_job_fragments,
978            upstream_fragment_downstreams: Default::default(),
979            init_split_assignment: Default::default(),
980            definition: "CREATE SOURCE ...".to_owned(),
981            job_type: StreamingJobType::Source,
982            create_type: CreateType::Foreground,
983            streaming_job: StreamingJob::Source(source),
984            fragment_backfill_ordering: Default::default(),
985            cdc_table_snapshot_splits: None,
986            locality_fragment_state_table_mapping: Default::default(),
987            is_serverless: false,
988        };
989
990        let tracker = CreateMviewProgressTracker::new(&info, &HummockVersionStats::default());
991
992        // CDC source should be in CdcSourceInit state
993        assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
994        assert!(!tracker.is_finished());
995    }
996
997    // CDC source should transition from CdcSourceInit to Finished when offset is updated
998    #[test]
999    fn test_cdc_source_transitions_to_finished_on_offset_update() {
1000        let mut tracker = CreateMviewProgressTracker {
1001            tracking_job: TrackingJob {
1002                job_id: JobId::new(300),
1003                is_recovered: false,
1004                source_change: None,
1005            },
1006            status: CreateMviewStatus::CdcSourceInit,
1007        };
1008
1009        // Initially in CdcSourceInit state
1010        assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1011        assert!(!tracker.is_finished());
1012
1013        // Mark as finished when offset is updated
1014        tracker.mark_cdc_source_finished();
1015
1016        // Should now be in Finished state
1017        assert!(matches!(tracker.status, CreateMviewStatus::Finished { .. }));
1018        assert!(tracker.is_finished());
1019    }
1020}