risingwave_meta/barrier/
progress.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::{FragmentTypeFlag, TableId};
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
23
24use crate::MetaResult;
25use crate::barrier::backfill_order_control::BackfillOrderState;
26use crate::barrier::info::InflightStreamingJobInfo;
27use crate::barrier::{CreateStreamingJobCommandInfo, FragmentBackfillProgress};
28use crate::controller::fragment::InflightFragmentInfo;
29use crate::manager::MetadataManager;
30use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
31use crate::stream::{SourceChange, SourceManagerRef};
32
33type ConsumedRows = u64;
34type BufferedRows = u64;
35
36#[derive(Debug, Clone, Copy)]
37pub(crate) struct ActorBackfillProgress {
38    pub(crate) actor_id: ActorId,
39    pub(crate) upstream_type: BackfillUpstreamType,
40    pub(crate) consumed_rows: u64,
41    pub(crate) done: bool,
42}
43
44#[derive(Clone, Copy, Debug)]
45enum BackfillState {
46    Init,
47    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
48    Done(ConsumedRows, BufferedRows),
49}
50
51/// Represents the backfill nodes that need to be scheduled or cleaned up.
52#[derive(Debug, Default)]
53pub(super) struct PendingBackfillFragments {
54    /// Fragment IDs that should start backfilling in the next checkpoint
55    pub next_backfill_nodes: Vec<FragmentId>,
56    /// State tables of locality provider fragments that should be truncated
57    pub truncate_locality_provider_state_tables: Vec<TableId>,
58}
59
60/// Progress of all actors containing backfill executors while creating mview.
61#[derive(Debug)]
62pub(super) struct Progress {
63    job_id: JobId,
64    // `states` and `done_count` decides whether the progress is done. See `is_done`.
65    states: HashMap<ActorId, BackfillState>,
66    backfill_order_state: BackfillOrderState,
67    done_count: usize,
68
69    /// Tells whether the backfill is from source or mv.
70    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
71
72    // The following row counts are used to calculate the progress. See `calculate_progress`.
73    /// Upstream mv count.
74    /// Keep track of how many times each upstream MV
75    /// appears in this stream job.
76    upstream_mv_count: HashMap<TableId, usize>,
77    /// Total key count of all the upstream materialized views
78    upstream_mvs_total_key_count: u64,
79    mv_backfill_consumed_rows: u64,
80    source_backfill_consumed_rows: u64,
81    /// Buffered rows (for locality backfill) that are yet to be consumed
82    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
83    mv_backfill_buffered_rows: u64,
84}
85
86impl Progress {
87    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
88    fn new(
89        job_id: JobId,
90        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
91        upstream_mv_count: HashMap<TableId, usize>,
92        upstream_total_key_count: u64,
93        backfill_order_state: BackfillOrderState,
94    ) -> Self {
95        let mut states = HashMap::new();
96        let mut backfill_upstream_types = HashMap::new();
97        for (actor, backfill_upstream_type) in actors {
98            states.insert(actor, BackfillState::Init);
99            backfill_upstream_types.insert(actor, backfill_upstream_type);
100        }
101        assert!(!states.is_empty());
102
103        Self {
104            job_id,
105            states,
106            backfill_upstream_types,
107            done_count: 0,
108            upstream_mv_count,
109            upstream_mvs_total_key_count: upstream_total_key_count,
110            mv_backfill_consumed_rows: 0,
111            source_backfill_consumed_rows: 0,
112            mv_backfill_buffered_rows: 0,
113            backfill_order_state,
114        }
115    }
116
117    /// Update the progress of `actor`.
118    /// Returns the backfill fragments that need to be scheduled or cleaned up.
119    fn update(
120        &mut self,
121        actor: ActorId,
122        new_state: BackfillState,
123        upstream_total_key_count: u64,
124    ) -> PendingBackfillFragments {
125        let mut result = PendingBackfillFragments::default();
126        self.upstream_mvs_total_key_count = upstream_total_key_count;
127        let total_actors = self.states.len();
128        let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
129            tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
130            return result;
131        };
132
133        let mut old_consumed_row = 0;
134        let mut new_consumed_row = 0;
135        let mut old_buffered_row = 0;
136        let mut new_buffered_row = 0;
137        let Some(prev_state) = self.states.remove(&actor) else {
138            tracing::warn!(%actor, "receive progress for actor not in state map");
139            return result;
140        };
141        match prev_state {
142            BackfillState::Init => {}
143            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
144                old_consumed_row = consumed_rows;
145                old_buffered_row = buffered_rows;
146            }
147            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
148        };
149        match &new_state {
150            BackfillState::Init => {}
151            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
152                new_consumed_row = *consumed_rows;
153                new_buffered_row = *buffered_rows;
154            }
155            BackfillState::Done(consumed_rows, buffered_rows) => {
156                tracing::debug!("actor {} done", actor);
157                new_consumed_row = *consumed_rows;
158                new_buffered_row = *buffered_rows;
159                self.done_count += 1;
160                let before_backfill_nodes = self
161                    .backfill_order_state
162                    .current_backfill_node_fragment_ids();
163                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
164                let after_backfill_nodes = self
165                    .backfill_order_state
166                    .current_backfill_node_fragment_ids();
167                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
168                let last_backfill_nodes_iter = before_backfill_nodes
169                    .into_iter()
170                    .filter(|x| !after_backfill_nodes.contains(x));
171                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
172                    .filter_map(|fragment_id| {
173                        self.backfill_order_state
174                            .get_locality_fragment_state_table_mapping()
175                            .get(&fragment_id)
176                    })
177                    .flatten()
178                    .copied()
179                    .collect();
180                tracing::debug!(
181                    "{} actors out of {} complete",
182                    self.done_count,
183                    total_actors,
184                );
185            }
186        };
187        debug_assert!(
188            new_consumed_row >= old_consumed_row,
189            "backfill progress should not go backward"
190        );
191        debug_assert!(
192            new_buffered_row >= old_buffered_row,
193            "backfill progress should not go backward"
194        );
195        match backfill_upstream_type {
196            BackfillUpstreamType::MView => {
197                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
198            }
199            BackfillUpstreamType::Source => {
200                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
201            }
202            BackfillUpstreamType::Values => {
203                // do not consider progress for values
204            }
205            BackfillUpstreamType::LocalityProvider => {
206                // Track LocalityProvider progress similar to MView
207                // Update buffered rows for precise progress calculation
208                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
209                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
210            }
211        }
212        self.states.insert(actor, new_state);
213        result
214    }
215
216    fn iter_actor_progress(&self) -> impl Iterator<Item = ActorBackfillProgress> + '_ {
217        self.states.iter().filter_map(|(actor_id, state)| {
218            let upstream_type = *self.backfill_upstream_types.get(actor_id)?;
219            let (consumed_rows, done) = match *state {
220                BackfillState::Init => (0, false),
221                BackfillState::ConsumingUpstream(_, consumed_rows, _) => (consumed_rows, false),
222                BackfillState::Done(consumed_rows, _) => (consumed_rows, true),
223            };
224            Some(ActorBackfillProgress {
225                actor_id: *actor_id,
226                upstream_type,
227                consumed_rows,
228                done,
229            })
230        })
231    }
232
233    /// Returns whether all backfill executors are done.
234    fn is_done(&self) -> bool {
235        tracing::trace!(
236            "Progress::is_done? {}, {}, {:?}",
237            self.done_count,
238            self.states.len(),
239            self.states
240        );
241        self.done_count == self.states.len()
242    }
243
244    /// `progress` = `consumed_rows` / `upstream_total_key_count`
245    fn calculate_progress(&self) -> String {
246        if self.is_done() || self.states.is_empty() {
247            return "100%".to_owned();
248        }
249        let mut mv_count = 0;
250        let mut source_count = 0;
251        for backfill_upstream_type in self.backfill_upstream_types.values() {
252            match backfill_upstream_type {
253                BackfillUpstreamType::MView => mv_count += 1,
254                BackfillUpstreamType::Source => source_count += 1,
255                BackfillUpstreamType::Values => (),
256                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
257            }
258        }
259
260        let mv_progress = (mv_count > 0).then_some({
261            // Include buffered rows in total for precise progress calculation
262            // Progress = consumed / (upstream_total + buffered)
263            let total_rows_to_consume =
264                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
265            if total_rows_to_consume == 0 {
266                "99.99%".to_owned()
267            } else {
268                let mut progress =
269                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
270                if progress > 1.0 {
271                    progress = 0.9999;
272                }
273                format!(
274                    "{:.2}% ({}/{})",
275                    progress * 100.0,
276                    self.mv_backfill_consumed_rows,
277                    total_rows_to_consume
278                )
279            }
280        });
281        let source_progress = (source_count > 0).then_some(format!(
282            "{} rows consumed",
283            self.source_backfill_consumed_rows
284        ));
285        match (mv_progress, source_progress) {
286            (Some(mv_progress), Some(source_progress)) => {
287                format!(
288                    "MView Backfill: {}, Source Backfill: {}",
289                    mv_progress, source_progress
290                )
291            }
292            (Some(mv_progress), None) => mv_progress,
293            (None, Some(source_progress)) => source_progress,
294            (None, None) => "Unknown".to_owned(),
295        }
296    }
297}
298
299/// There are two kinds of `TrackingJobs`:
300/// 1. if `is_recovered` is false, it is a "New" tracking job.
301///    It is instantiated and managed by the stream manager.
302///    On recovery, the stream manager will stop managing the job.
303/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
304///    On recovery, the barrier manager will recover and start managing the job.
305pub struct TrackingJob {
306    job_id: JobId,
307    is_recovered: bool,
308    source_change: Option<SourceChange>,
309}
310
311impl std::fmt::Display for TrackingJob {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        write!(
314            f,
315            "{}{}",
316            self.job_id,
317            if self.is_recovered { "<recovered>" } else { "" }
318        )
319    }
320}
321
322impl TrackingJob {
323    /// Create a new tracking job.
324    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
325        Self {
326            job_id: stream_job_fragments.stream_job_id,
327            is_recovered: false,
328            source_change: Some(SourceChange::CreateJobFinished {
329                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
330            }),
331        }
332    }
333
334    /// Create a recovered tracking job.
335    pub(crate) fn recovered(
336        job_id: JobId,
337        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
338    ) -> Self {
339        let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
340            fragment_infos
341                .iter()
342                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
343        );
344        let source_change = if source_backfill_fragments.is_empty() {
345            None
346        } else {
347            Some(SourceChange::CreateJobFinished {
348                finished_backfill_fragments: source_backfill_fragments,
349            })
350        };
351        Self {
352            job_id,
353            is_recovered: true,
354            source_change,
355        }
356    }
357
358    pub(crate) fn job_id(&self) -> JobId {
359        self.job_id
360    }
361
362    /// Notify the metadata manager that the job is finished.
363    pub(crate) async fn finish(
364        self,
365        metadata_manager: &MetadataManager,
366        source_manager: &SourceManagerRef,
367    ) -> MetaResult<()> {
368        metadata_manager
369            .catalog_controller
370            .finish_streaming_job(self.job_id)
371            .await?;
372        if let Some(source_change) = self.source_change {
373            source_manager.apply_source_change(source_change).await;
374        }
375        Ok(())
376    }
377}
378
379impl std::fmt::Debug for TrackingJob {
380    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381        if !self.is_recovered {
382            write!(f, "TrackingJob::New({})", self.job_id)
383        } else {
384            write!(f, "TrackingJob::Recovered({})", self.job_id)
385        }
386    }
387}
388
389/// Information collected during barrier completion that needs to be committed.
390#[derive(Debug, Default)]
391pub(super) struct StagingCommitInfo {
392    /// Finished jobs that should be committed
393    pub finished_jobs: Vec<TrackingJob>,
394    /// Table IDs whose locality provider state tables need to be truncated
395    pub table_ids_to_truncate: Vec<TableId>,
396    pub finished_cdc_table_backfill: Vec<JobId>,
397}
398
399pub(super) enum UpdateProgressResult {
400    None,
401    /// The finished job, along with its pending backfill fragments for cleanup.
402    Finished {
403        truncate_locality_provider_state_tables: Vec<TableId>,
404    },
405    /// Backfill nodes have finished and new ones need to be scheduled.
406    BackfillNodeFinished(PendingBackfillFragments),
407}
408
409#[derive(Debug)]
410pub(super) struct CreateMviewProgressTracker {
411    tracking_job: TrackingJob,
412    status: CreateMviewStatus,
413}
414
415#[derive(Debug)]
416enum CreateMviewStatus {
417    Backfilling {
418        /// Progress of the create-mview DDL.
419        progress: Progress,
420
421        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
422        pending_backfill_nodes: Vec<FragmentId>,
423
424        /// Table IDs whose locality provider state tables need to be truncated
425        table_ids_to_truncate: Vec<TableId>,
426    },
427    CdcSourceInit,
428    Finished {
429        table_ids_to_truncate: Vec<TableId>,
430    },
431}
432
433impl CreateMviewProgressTracker {
434    pub fn recover(
435        creating_job_id: JobId,
436        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
437        backfill_order_state: BackfillOrderState,
438        version_stats: &HummockVersionStats,
439    ) -> Self {
440        {
441            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
442            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
443            let status = if actors.is_empty() {
444                CreateMviewStatus::Finished {
445                    table_ids_to_truncate: vec![],
446                }
447            } else {
448                let mut states = HashMap::new();
449                let mut backfill_upstream_types = HashMap::new();
450
451                for (actor, backfill_upstream_type) in actors {
452                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
453                    backfill_upstream_types.insert(actor, backfill_upstream_type);
454                }
455
456                let progress = Self::recover_progress(
457                    creating_job_id,
458                    states,
459                    backfill_upstream_types,
460                    StreamJobFragments::upstream_table_counts_impl(
461                        fragment_infos.values().map(|fragment| &fragment.nodes),
462                    ),
463                    version_stats,
464                    backfill_order_state,
465                );
466                let pending_backfill_nodes = progress
467                    .backfill_order_state
468                    .current_backfill_node_fragment_ids();
469                CreateMviewStatus::Backfilling {
470                    progress,
471                    pending_backfill_nodes,
472                    table_ids_to_truncate: vec![],
473                }
474            };
475            Self {
476                tracking_job,
477                status,
478            }
479        }
480    }
481
482    /// ## How recovery works
483    ///
484    /// The progress (number of rows consumed) is persisted in state tables.
485    /// During recovery, the backfill executor will restore the number of rows consumed,
486    /// and then it will just report progress like newly created executors.
487    fn recover_progress(
488        job_id: JobId,
489        states: HashMap<ActorId, BackfillState>,
490        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
491        upstream_mv_count: HashMap<TableId, usize>,
492        version_stats: &HummockVersionStats,
493        backfill_order_state: BackfillOrderState,
494    ) -> Progress {
495        let upstream_mvs_total_key_count =
496            calculate_total_key_count(&upstream_mv_count, version_stats);
497        Progress {
498            job_id,
499            states,
500            backfill_order_state,
501            backfill_upstream_types,
502            done_count: 0, // Fill only after first barrier pass
503            upstream_mv_count,
504            upstream_mvs_total_key_count,
505            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
506            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
507            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
508        }
509    }
510
511    pub fn gen_backfill_progress(&self) -> String {
512        match &self.status {
513            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
514            CreateMviewStatus::CdcSourceInit => "Initializing CDC source...".to_owned(),
515            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
516        }
517    }
518
519    pub(crate) fn actor_progresses(&self) -> Vec<ActorBackfillProgress> {
520        match &self.status {
521            CreateMviewStatus::Backfilling { progress, .. } => {
522                progress.iter_actor_progress().collect()
523            }
524            CreateMviewStatus::CdcSourceInit | CreateMviewStatus::Finished { .. } => vec![],
525        }
526    }
527
528    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
529    /// Return the table ids whose locality provider state tables need to be truncated.
530    pub(super) fn apply_progress(
531        &mut self,
532        create_mview_progress: &CreateMviewProgress,
533        version_stats: &HummockVersionStats,
534    ) {
535        let CreateMviewStatus::Backfilling {
536            progress,
537            pending_backfill_nodes,
538            table_ids_to_truncate,
539        } = &mut self.status
540        else {
541            tracing::warn!(
542                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
543            );
544            return;
545        };
546        {
547            // Update the progress of all commands.
548            {
549                // Those with actors complete can be finished immediately.
550                match progress.apply(create_mview_progress, version_stats) {
551                    UpdateProgressResult::None => {
552                        tracing::trace!(?progress, "update progress");
553                    }
554                    UpdateProgressResult::Finished {
555                        truncate_locality_provider_state_tables,
556                    } => {
557                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
558                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
559                        tracing::trace!(?progress, "finish progress");
560                        self.status = CreateMviewStatus::Finished {
561                            table_ids_to_truncate,
562                        };
563                    }
564                    UpdateProgressResult::BackfillNodeFinished(pending) => {
565                        table_ids_to_truncate
566                            .extend(pending.truncate_locality_provider_state_tables.clone());
567                        tracing::trace!(
568                            ?progress,
569                            next_backfill_nodes = ?pending.next_backfill_nodes,
570                            "start next backfill node"
571                        );
572                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
573                    }
574                }
575            }
576        }
577    }
578
579    /// Refresh tracker state after reschedule so new actors can report progress correctly.
580    pub fn refresh_after_reschedule(
581        &mut self,
582        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
583        version_stats: &HummockVersionStats,
584    ) {
585        let CreateMviewStatus::Backfilling {
586            progress,
587            pending_backfill_nodes,
588            ..
589        } = &mut self.status
590        else {
591            return;
592        };
593
594        let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
595            fragment_infos
596                .values()
597                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
598        );
599
600        #[cfg(debug_assertions)]
601        {
602            use std::collections::HashSet;
603            let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
604            let new_actor_ids: HashSet<_> = new_tracking_actors
605                .iter()
606                .map(|(actor_id, _)| *actor_id)
607                .collect();
608            debug_assert!(
609                old_actor_ids.is_disjoint(&new_actor_ids),
610                "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
611            );
612        }
613
614        let mut new_states = HashMap::new();
615        let mut new_backfill_types = HashMap::new();
616        for (actor_id, upstream_type) in new_tracking_actors {
617            new_states.insert(actor_id, BackfillState::Init);
618            new_backfill_types.insert(actor_id, upstream_type);
619        }
620
621        let fragment_actors: HashMap<_, _> = fragment_infos
622            .iter()
623            .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
624            .collect();
625
626        let newly_scheduled = progress
627            .backfill_order_state
628            .refresh_actors(&fragment_actors);
629
630        progress.backfill_upstream_types = new_backfill_types;
631        progress.states = new_states;
632        progress.done_count = 0;
633
634        progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
635            fragment_infos.values().map(|fragment| &fragment.nodes),
636        );
637        progress.upstream_mvs_total_key_count =
638            calculate_total_key_count(&progress.upstream_mv_count, version_stats);
639
640        progress.mv_backfill_consumed_rows = 0;
641        progress.source_backfill_consumed_rows = 0;
642        progress.mv_backfill_buffered_rows = 0;
643
644        let mut pending = progress
645            .backfill_order_state
646            .current_backfill_node_fragment_ids();
647        pending.extend(newly_scheduled);
648        pending.sort_unstable();
649        pending.dedup();
650        *pending_backfill_nodes = pending;
651    }
652
653    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
654        match &mut self.status {
655            CreateMviewStatus::Backfilling {
656                pending_backfill_nodes,
657                ..
658            } => Some(pending_backfill_nodes.drain(..)),
659            CreateMviewStatus::CdcSourceInit => None,
660            CreateMviewStatus::Finished { .. } => None,
661        }
662        .into_iter()
663        .flatten()
664    }
665
666    pub(super) fn collect_staging_commit_info(
667        &mut self,
668    ) -> (bool, Box<dyn Iterator<Item = TableId> + '_>) {
669        match &mut self.status {
670            CreateMviewStatus::Backfilling {
671                table_ids_to_truncate,
672                ..
673            } => (false, Box::new(table_ids_to_truncate.drain(..))),
674            CreateMviewStatus::CdcSourceInit => (false, Box::new(std::iter::empty())),
675            CreateMviewStatus::Finished {
676                table_ids_to_truncate,
677                ..
678            } => (true, Box::new(table_ids_to_truncate.drain(..))),
679        }
680    }
681
682    pub(super) fn is_finished(&self) -> bool {
683        matches!(self.status, CreateMviewStatus::Finished { .. })
684    }
685
686    /// Mark CDC source as finished when offset is updated.
687    pub(super) fn mark_cdc_source_finished(&mut self) {
688        if matches!(self.status, CreateMviewStatus::CdcSourceInit) {
689            self.status = CreateMviewStatus::Finished {
690                table_ids_to_truncate: vec![],
691            };
692        }
693    }
694
695    pub(super) fn into_tracking_job(self) -> TrackingJob {
696        let CreateMviewStatus::Finished { .. } = self.status else {
697            panic!("should be called when finished");
698        };
699        self.tracking_job
700    }
701
702    pub(crate) fn job_id(&self) -> JobId {
703        self.tracking_job.job_id
704    }
705
706    pub(crate) fn collect_fragment_progress(
707        &self,
708        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
709        mark_done_when_empty: bool,
710    ) -> Vec<FragmentBackfillProgress> {
711        let actor_progresses = self.actor_progresses();
712        if actor_progresses.is_empty() {
713            if mark_done_when_empty && self.is_finished() {
714                return collect_done_fragments(self.job_id(), fragment_infos);
715            }
716            return vec![];
717        }
718        collect_fragment_progress_from_actors(self.job_id(), fragment_infos, &actor_progresses)
719    }
720
721    /// Add a new create-mview DDL command to track.
722    ///
723    /// If the actors to track are empty, return the given command as it can be finished immediately.
724    /// For CDC sources, mark as `CdcSourceInit` instead of Finished.
725    pub fn new(
726        info: &CreateStreamingJobCommandInfo,
727        version_stats: &HummockVersionStats,
728        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
729    ) -> Self {
730        tracing::trace!(?info, "add job to track");
731        let CreateStreamingJobCommandInfo {
732            stream_job_fragments,
733            fragment_backfill_ordering,
734            locality_fragment_state_table_mapping,
735            streaming_job,
736            ..
737        } = info;
738        let job_id = stream_job_fragments.stream_job_id();
739        let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
740        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
741        if actors.is_empty() {
742            // NOTE: This CDC source detection uses hardcoded property checks and should be replaced
743            // with a more reliable identification method in the future.
744            let is_cdc_source = matches!(
745                streaming_job,
746                crate::manager::StreamingJob::Source(source)
747                    if source.info.as_ref().map(|info| info.is_shared()).unwrap_or(false) && source
748                    .get_with_properties()
749                    .get("connector")
750                    .map(|connector| connector.to_lowercase().contains("-cdc"))
751                    .unwrap_or(false)
752            );
753            if is_cdc_source {
754                // Mark CDC source as CdcSourceInit, will be finished when offset is updated
755                return Self {
756                    tracking_job,
757                    status: CreateMviewStatus::CdcSourceInit,
758                };
759            }
760            // The command can be finished immediately.
761            return Self {
762                tracking_job,
763                status: CreateMviewStatus::Finished {
764                    table_ids_to_truncate: vec![],
765                },
766            };
767        }
768
769        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
770        let upstream_total_key_count: u64 =
771            calculate_total_key_count(&upstream_mv_count, version_stats);
772
773        let backfill_order_state = BackfillOrderState::new(
774            fragment_backfill_ordering,
775            fragment_infos,
776            locality_fragment_state_table_mapping.clone(),
777        );
778        let progress = Progress::new(
779            job_id,
780            actors,
781            upstream_mv_count,
782            upstream_total_key_count,
783            backfill_order_state,
784        );
785        let pending_backfill_nodes = progress
786            .backfill_order_state
787            .current_backfill_node_fragment_ids();
788        Self {
789            tracking_job,
790            status: CreateMviewStatus::Backfilling {
791                progress,
792                pending_backfill_nodes,
793                table_ids_to_truncate: vec![],
794            },
795        }
796    }
797}
798
799impl Progress {
800    /// Update the progress of `actor` according to the Pb struct.
801    ///
802    /// If all actors in this MV have finished, return the command.
803    fn apply(
804        &mut self,
805        progress: &CreateMviewProgress,
806        version_stats: &HummockVersionStats,
807    ) -> UpdateProgressResult {
808        tracing::trace!(?progress, "update progress");
809        let actor = progress.backfill_actor_id;
810        let job_id = self.job_id;
811
812        let new_state = if progress.done {
813            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
814        } else {
815            BackfillState::ConsumingUpstream(
816                progress.consumed_epoch.into(),
817                progress.consumed_rows,
818                progress.buffered_rows,
819            )
820        };
821
822        {
823            {
824                let progress_state = self;
825
826                let upstream_total_key_count: u64 =
827                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
828
829                tracing::trace!(%job_id, "updating progress for table");
830                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
831
832                if progress_state.is_done() {
833                    tracing::debug!(
834                        %job_id,
835                        "all actors done for creating mview!",
836                    );
837
838                    let PendingBackfillFragments {
839                        next_backfill_nodes,
840                        truncate_locality_provider_state_tables,
841                    } = pending;
842
843                    assert!(next_backfill_nodes.is_empty());
844                    UpdateProgressResult::Finished {
845                        truncate_locality_provider_state_tables,
846                    }
847                } else if !pending.next_backfill_nodes.is_empty()
848                    || !pending.truncate_locality_provider_state_tables.is_empty()
849                {
850                    UpdateProgressResult::BackfillNodeFinished(pending)
851                } else {
852                    UpdateProgressResult::None
853                }
854            }
855        }
856    }
857}
858
859fn calculate_total_key_count(
860    table_count: &HashMap<TableId, usize>,
861    version_stats: &HummockVersionStats,
862) -> u64 {
863    table_count
864        .iter()
865        .map(|(table_id, count)| {
866            assert_ne!(*count, 0);
867            *count as u64
868                * version_stats
869                    .table_stats
870                    .get(table_id)
871                    .map_or(0, |stat| stat.total_key_count as u64)
872        })
873        .sum()
874}
875
876pub(crate) fn collect_fragment_progress_from_actors(
877    job_id: JobId,
878    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
879    actor_progresses: &[ActorBackfillProgress],
880) -> Vec<FragmentBackfillProgress> {
881    let mut actor_to_fragment = HashMap::new();
882    for (fragment_id, info) in fragment_infos {
883        for actor_id in info.actors.keys() {
884            actor_to_fragment.insert(*actor_id, *fragment_id);
885        }
886    }
887
888    let mut per_fragment: HashMap<FragmentId, (u64, usize, usize, BackfillUpstreamType)> =
889        HashMap::new();
890    for progress in actor_progresses {
891        let Some(fragment_id) = actor_to_fragment.get(&progress.actor_id) else {
892            continue;
893        };
894        let entry = per_fragment
895            .entry(*fragment_id)
896            .or_insert((0, 0, 0, progress.upstream_type));
897        entry.0 = entry.0.saturating_add(progress.consumed_rows);
898        entry.1 += progress.done as usize;
899        entry.2 += 1;
900    }
901
902    per_fragment
903        .into_iter()
904        .map(
905            |(fragment_id, (consumed_rows, done_cnt, total_cnt, upstream_type))| {
906                FragmentBackfillProgress {
907                    job_id,
908                    fragment_id,
909                    consumed_rows,
910                    done: total_cnt > 0 && done_cnt == total_cnt,
911                    upstream_type,
912                }
913            },
914        )
915        .collect()
916}
917
918pub(crate) fn collect_done_fragments(
919    job_id: JobId,
920    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
921) -> Vec<FragmentBackfillProgress> {
922    fragment_infos
923        .iter()
924        .filter(|(_, fragment)| {
925            fragment.fragment_type_mask.contains_any([
926                FragmentTypeFlag::StreamScan,
927                FragmentTypeFlag::SourceScan,
928                FragmentTypeFlag::LocalityProvider,
929            ])
930        })
931        .map(|(fragment_id, fragment)| FragmentBackfillProgress {
932            job_id,
933            fragment_id: *fragment_id,
934            consumed_rows: 0,
935            done: true,
936            upstream_type: BackfillUpstreamType::from_fragment_type_mask(
937                fragment.fragment_type_mask,
938            ),
939        })
940        .collect()
941}
942
943#[cfg(test)]
944mod tests {
945    use std::collections::HashSet;
946
947    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
948    use risingwave_common::id::WorkerId;
949    use risingwave_meta_model::fragment::DistributionType;
950    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
951
952    use super::*;
953    use crate::controller::fragment::InflightActorInfo;
954
955    fn sample_inflight_fragment(
956        fragment_id: FragmentId,
957        actor_ids: &[ActorId],
958        flag: FragmentTypeFlag,
959    ) -> InflightFragmentInfo {
960        let mut fragment_type_mask = FragmentTypeMask::empty();
961        fragment_type_mask.add(flag);
962        InflightFragmentInfo {
963            fragment_id,
964            distribution_type: DistributionType::Single,
965            fragment_type_mask,
966            vnode_count: 0,
967            nodes: PbStreamNode::default(),
968            actors: actor_ids
969                .iter()
970                .map(|actor_id| {
971                    (
972                        *actor_id,
973                        InflightActorInfo {
974                            worker_id: WorkerId::new(1),
975                            vnode_bitmap: None,
976                            splits: vec![],
977                        },
978                    )
979                })
980                .collect(),
981            state_table_ids: HashSet::new(),
982        }
983    }
984
985    fn sample_progress(actor_id: ActorId) -> Progress {
986        Progress {
987            job_id: JobId::new(1),
988            states: HashMap::from([(actor_id, BackfillState::Init)]),
989            backfill_order_state: BackfillOrderState::default(),
990            done_count: 0,
991            backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
992            upstream_mv_count: HashMap::new(),
993            upstream_mvs_total_key_count: 0,
994            mv_backfill_consumed_rows: 0,
995            source_backfill_consumed_rows: 0,
996            mv_backfill_buffered_rows: 0,
997        }
998    }
999
1000    #[test]
1001    fn update_ignores_unknown_actor() {
1002        let actor_known = ActorId::new(1);
1003        let actor_unknown = ActorId::new(2);
1004        let mut progress = sample_progress(actor_known);
1005
1006        let pending = progress.update(
1007            actor_unknown,
1008            BackfillState::Done(0, 0),
1009            progress.upstream_mvs_total_key_count,
1010        );
1011
1012        assert!(pending.next_backfill_nodes.is_empty());
1013        assert_eq!(progress.states.len(), 1);
1014        assert!(progress.states.contains_key(&actor_known));
1015    }
1016
1017    #[test]
1018    fn refresh_rebuilds_tracking_after_reschedule() {
1019        let actor_old = ActorId::new(1);
1020        let actor_new = ActorId::new(2);
1021
1022        let progress = Progress {
1023            job_id: JobId::new(1),
1024            states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
1025            backfill_order_state: BackfillOrderState::default(),
1026            done_count: 1,
1027            backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
1028            upstream_mv_count: HashMap::new(),
1029            upstream_mvs_total_key_count: 0,
1030            mv_backfill_consumed_rows: 5,
1031            source_backfill_consumed_rows: 0,
1032            mv_backfill_buffered_rows: 0,
1033        };
1034
1035        let mut tracker = CreateMviewProgressTracker {
1036            tracking_job: TrackingJob {
1037                job_id: JobId::new(1),
1038                is_recovered: false,
1039                source_change: None,
1040            },
1041            status: CreateMviewStatus::Backfilling {
1042                progress,
1043                pending_backfill_nodes: vec![],
1044                table_ids_to_truncate: vec![],
1045            },
1046        };
1047
1048        let fragment_infos = HashMap::from([(
1049            FragmentId::new(10),
1050            sample_inflight_fragment(
1051                FragmentId::new(10),
1052                &[actor_new],
1053                FragmentTypeFlag::StreamScan,
1054            ),
1055        )]);
1056
1057        tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
1058
1059        let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
1060            panic!("expected backfilling status");
1061        };
1062        assert!(progress.states.contains_key(&actor_new));
1063        assert!(!progress.states.contains_key(&actor_old));
1064        assert_eq!(progress.done_count, 0);
1065        assert_eq!(progress.mv_backfill_consumed_rows, 0);
1066        assert_eq!(progress.source_backfill_consumed_rows, 0);
1067    }
1068
1069    // CDC sources should be initialized as CdcSourceInit
1070    #[test]
1071    fn test_cdc_source_initialized_as_cdc_source_init() {
1072        use std::collections::BTreeMap;
1073
1074        use risingwave_meta_model::streaming_job;
1075        use risingwave_pb::catalog::{CreateType, PbSource, StreamSourceInfo};
1076
1077        use crate::barrier::command::CreateStreamingJobCommandInfo;
1078        use crate::manager::{StreamingJob, StreamingJobType};
1079        use crate::model::StreamJobFragmentsToCreate;
1080
1081        // Create a CDC source with cdc_source_job = true
1082        let source_info = StreamSourceInfo {
1083            cdc_source_job: true,
1084            ..Default::default()
1085        };
1086
1087        let source = PbSource {
1088            id: risingwave_common::id::SourceId::new(100),
1089            info: Some(source_info),
1090            with_properties: BTreeMap::from([("connector".to_owned(), "fake-cdc".to_owned())]),
1091            ..Default::default()
1092        };
1093
1094        // Create empty fragments (no actors to track)
1095        let fragments = StreamJobFragments::for_test(JobId::new(100), BTreeMap::new());
1096        let stream_job_fragments = StreamJobFragmentsToCreate {
1097            inner: fragments,
1098            downstreams: Default::default(),
1099        };
1100
1101        let info = CreateStreamingJobCommandInfo {
1102            stream_job_fragments,
1103            upstream_fragment_downstreams: Default::default(),
1104            init_split_assignment: Default::default(),
1105            definition: "CREATE SOURCE ...".to_owned(),
1106            job_type: StreamingJobType::Source,
1107            create_type: CreateType::Foreground,
1108            streaming_job: StreamingJob::Source(source),
1109            database_resource_group: risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP
1110                .to_owned(),
1111            fragment_backfill_ordering: Default::default(),
1112            cdc_table_snapshot_splits: None,
1113            locality_fragment_state_table_mapping: Default::default(),
1114            is_serverless: false,
1115            streaming_job_model: streaming_job::Model {
1116                job_id: JobId::new(100),
1117                job_status: risingwave_meta_model::JobStatus::Creating,
1118                create_type: risingwave_meta_model::CreateType::Foreground,
1119                timezone: None,
1120                config_override: None,
1121                adaptive_parallelism_strategy: None,
1122                parallelism: risingwave_meta_model::StreamingParallelism::Adaptive,
1123                backfill_parallelism: None,
1124                backfill_orders: None,
1125                max_parallelism: 256,
1126                specific_resource_group: None,
1127                is_serverless_backfill: false,
1128            },
1129        };
1130
1131        let tracker = CreateMviewProgressTracker::new(
1132            &info,
1133            &HummockVersionStats::default(),
1134            &HashMap::new(),
1135        );
1136
1137        // CDC source should be in CdcSourceInit state
1138        assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1139        assert!(!tracker.is_finished());
1140    }
1141
1142    // CDC source should transition from CdcSourceInit to Finished when offset is updated
1143    #[test]
1144    fn test_cdc_source_transitions_to_finished_on_offset_update() {
1145        let mut tracker = CreateMviewProgressTracker {
1146            tracking_job: TrackingJob {
1147                job_id: JobId::new(300),
1148                is_recovered: false,
1149                source_change: None,
1150            },
1151            status: CreateMviewStatus::CdcSourceInit,
1152        };
1153
1154        // Initially in CdcSourceInit state
1155        assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1156        assert!(!tracker.is_finished());
1157
1158        // Mark as finished when offset is updated
1159        tracker.mark_cdc_source_finished();
1160
1161        // Should now be in Finished state
1162        assert!(matches!(tracker.status, CreateMviewStatus::Finished { .. }));
1163        assert!(tracker.is_finished());
1164    }
1165}