risingwave_meta/barrier/
progress.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::{FragmentTypeFlag, TableId};
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
23
24use crate::MetaResult;
25use crate::barrier::backfill_order_control::BackfillOrderState;
26use crate::barrier::info::InflightStreamingJobInfo;
27use crate::barrier::{CreateStreamingJobCommandInfo, FragmentBackfillProgress};
28use crate::controller::fragment::InflightFragmentInfo;
29use crate::manager::MetadataManager;
30use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
31use crate::stream::{SourceChange, SourceManagerRef};
32
33type ConsumedRows = u64;
34type BufferedRows = u64;
35
36#[derive(Debug, Clone, Copy)]
37pub(crate) struct ActorBackfillProgress {
38    pub(crate) actor_id: ActorId,
39    pub(crate) upstream_type: BackfillUpstreamType,
40    pub(crate) consumed_rows: u64,
41    pub(crate) done: bool,
42}
43
44#[derive(Clone, Copy, Debug)]
45enum BackfillState {
46    Init,
47    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
48    Done(ConsumedRows, BufferedRows),
49}
50
51/// Represents the backfill nodes that need to be scheduled or cleaned up.
52#[derive(Debug, Default)]
53pub(super) struct PendingBackfillFragments {
54    /// Fragment IDs that should start backfilling in the next checkpoint
55    pub next_backfill_nodes: Vec<FragmentId>,
56    /// State tables of locality provider fragments that should be truncated
57    pub truncate_locality_provider_state_tables: Vec<TableId>,
58}
59
60/// Progress of all actors containing backfill executors while creating mview.
61#[derive(Debug)]
62pub(super) struct Progress {
63    job_id: JobId,
64    // `states` and `done_count` decides whether the progress is done. See `is_done`.
65    states: HashMap<ActorId, BackfillState>,
66    backfill_order_state: BackfillOrderState,
67    done_count: usize,
68
69    /// Tells whether the backfill is from source or mv.
70    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
71
72    // The following row counts are used to calculate the progress. See `calculate_progress`.
73    /// Upstream mv count.
74    /// Keep track of how many times each upstream MV
75    /// appears in this stream job.
76    upstream_mv_count: HashMap<TableId, usize>,
77    /// Total key count of all the upstream materialized views
78    upstream_mvs_total_key_count: u64,
79    mv_backfill_consumed_rows: u64,
80    source_backfill_consumed_rows: u64,
81    /// Buffered rows (for locality backfill) that are yet to be consumed
82    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
83    mv_backfill_buffered_rows: u64,
84}
85
86impl Progress {
87    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
88    fn new(
89        job_id: JobId,
90        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
91        upstream_mv_count: HashMap<TableId, usize>,
92        upstream_total_key_count: u64,
93        backfill_order_state: BackfillOrderState,
94    ) -> Self {
95        let mut states = HashMap::new();
96        let mut backfill_upstream_types = HashMap::new();
97        for (actor, backfill_upstream_type) in actors {
98            states.insert(actor, BackfillState::Init);
99            backfill_upstream_types.insert(actor, backfill_upstream_type);
100        }
101        assert!(!states.is_empty());
102
103        Self {
104            job_id,
105            states,
106            backfill_upstream_types,
107            done_count: 0,
108            upstream_mv_count,
109            upstream_mvs_total_key_count: upstream_total_key_count,
110            mv_backfill_consumed_rows: 0,
111            source_backfill_consumed_rows: 0,
112            mv_backfill_buffered_rows: 0,
113            backfill_order_state,
114        }
115    }
116
117    /// Update the progress of `actor`.
118    /// Returns the backfill fragments that need to be scheduled or cleaned up.
119    fn update(
120        &mut self,
121        actor: ActorId,
122        new_state: BackfillState,
123        upstream_total_key_count: u64,
124    ) -> PendingBackfillFragments {
125        let mut result = PendingBackfillFragments::default();
126        self.upstream_mvs_total_key_count = upstream_total_key_count;
127        let total_actors = self.states.len();
128        let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
129            tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
130            return result;
131        };
132
133        let mut old_consumed_row = 0;
134        let mut new_consumed_row = 0;
135        let mut old_buffered_row = 0;
136        let mut new_buffered_row = 0;
137        let Some(prev_state) = self.states.remove(&actor) else {
138            tracing::warn!(%actor, "receive progress for actor not in state map");
139            return result;
140        };
141        match prev_state {
142            BackfillState::Init => {}
143            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
144                old_consumed_row = consumed_rows;
145                old_buffered_row = buffered_rows;
146            }
147            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
148        };
149        match &new_state {
150            BackfillState::Init => {}
151            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
152                new_consumed_row = *consumed_rows;
153                new_buffered_row = *buffered_rows;
154            }
155            BackfillState::Done(consumed_rows, buffered_rows) => {
156                tracing::debug!("actor {} done", actor);
157                new_consumed_row = *consumed_rows;
158                new_buffered_row = *buffered_rows;
159                self.done_count += 1;
160                let before_backfill_nodes = self
161                    .backfill_order_state
162                    .current_backfill_node_fragment_ids();
163                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
164                let after_backfill_nodes = self
165                    .backfill_order_state
166                    .current_backfill_node_fragment_ids();
167                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
168                let last_backfill_nodes_iter = before_backfill_nodes
169                    .into_iter()
170                    .filter(|x| !after_backfill_nodes.contains(x));
171                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
172                    .filter_map(|fragment_id| {
173                        self.backfill_order_state
174                            .get_locality_fragment_state_table_mapping()
175                            .get(&fragment_id)
176                    })
177                    .flatten()
178                    .copied()
179                    .collect();
180                tracing::debug!(
181                    "{} actors out of {} complete",
182                    self.done_count,
183                    total_actors,
184                );
185            }
186        };
187        debug_assert!(
188            new_consumed_row >= old_consumed_row,
189            "backfill progress should not go backward"
190        );
191        debug_assert!(
192            new_buffered_row >= old_buffered_row,
193            "backfill progress should not go backward"
194        );
195        match backfill_upstream_type {
196            BackfillUpstreamType::MView => {
197                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
198            }
199            BackfillUpstreamType::Source => {
200                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
201            }
202            BackfillUpstreamType::Values => {
203                // do not consider progress for values
204            }
205            BackfillUpstreamType::LocalityProvider => {
206                // Track LocalityProvider progress similar to MView
207                // Update buffered rows for precise progress calculation
208                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
209                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
210            }
211        }
212        self.states.insert(actor, new_state);
213        result
214    }
215
216    fn iter_actor_progress(&self) -> impl Iterator<Item = ActorBackfillProgress> + '_ {
217        self.states.iter().filter_map(|(actor_id, state)| {
218            let upstream_type = *self.backfill_upstream_types.get(actor_id)?;
219            let (consumed_rows, done) = match *state {
220                BackfillState::Init => (0, false),
221                BackfillState::ConsumingUpstream(_, consumed_rows, _) => (consumed_rows, false),
222                BackfillState::Done(consumed_rows, _) => (consumed_rows, true),
223            };
224            Some(ActorBackfillProgress {
225                actor_id: *actor_id,
226                upstream_type,
227                consumed_rows,
228                done,
229            })
230        })
231    }
232
233    /// Returns whether all backfill executors are done.
234    fn is_done(&self) -> bool {
235        tracing::trace!(
236            "Progress::is_done? {}, {}, {:?}",
237            self.done_count,
238            self.states.len(),
239            self.states
240        );
241        self.done_count == self.states.len()
242    }
243
244    /// `progress` = `consumed_rows` / `upstream_total_key_count`
245    fn calculate_progress(&self) -> String {
246        if self.is_done() || self.states.is_empty() {
247            return "100%".to_owned();
248        }
249        let mut mv_count = 0;
250        let mut source_count = 0;
251        for backfill_upstream_type in self.backfill_upstream_types.values() {
252            match backfill_upstream_type {
253                BackfillUpstreamType::MView => mv_count += 1,
254                BackfillUpstreamType::Source => source_count += 1,
255                BackfillUpstreamType::Values => (),
256                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
257            }
258        }
259
260        let mv_progress = (mv_count > 0).then_some({
261            // Include buffered rows in total for precise progress calculation
262            // Progress = consumed / (upstream_total + buffered)
263            let total_rows_to_consume =
264                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
265            if total_rows_to_consume == 0 {
266                "99.99%".to_owned()
267            } else {
268                let mut progress =
269                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
270                if progress > 1.0 {
271                    progress = 0.9999;
272                }
273                format!(
274                    "{:.2}% ({}/{})",
275                    progress * 100.0,
276                    self.mv_backfill_consumed_rows,
277                    total_rows_to_consume
278                )
279            }
280        });
281        let source_progress = (source_count > 0).then_some(format!(
282            "{} rows consumed",
283            self.source_backfill_consumed_rows
284        ));
285        match (mv_progress, source_progress) {
286            (Some(mv_progress), Some(source_progress)) => {
287                format!(
288                    "MView Backfill: {}, Source Backfill: {}",
289                    mv_progress, source_progress
290                )
291            }
292            (Some(mv_progress), None) => mv_progress,
293            (None, Some(source_progress)) => source_progress,
294            (None, None) => "Unknown".to_owned(),
295        }
296    }
297}
298
299/// There are two kinds of `TrackingJobs`:
300/// 1. if `is_recovered` is false, it is a "New" tracking job.
301///    It is instantiated and managed by the stream manager.
302///    On recovery, the stream manager will stop managing the job.
303/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
304///    On recovery, the barrier manager will recover and start managing the job.
305pub struct TrackingJob {
306    job_id: JobId,
307    is_recovered: bool,
308    source_change: Option<SourceChange>,
309}
310
311impl std::fmt::Display for TrackingJob {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        write!(
314            f,
315            "{}{}",
316            self.job_id,
317            if self.is_recovered { "<recovered>" } else { "" }
318        )
319    }
320}
321
322impl TrackingJob {
323    /// Create a new tracking job.
324    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
325        Self {
326            job_id: stream_job_fragments.stream_job_id,
327            is_recovered: false,
328            source_change: Some(SourceChange::CreateJobFinished {
329                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
330            }),
331        }
332    }
333
334    /// Create a recovered tracking job.
335    pub(crate) fn recovered(
336        job_id: JobId,
337        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
338    ) -> Self {
339        let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
340            fragment_infos
341                .iter()
342                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
343        );
344        let source_change = if source_backfill_fragments.is_empty() {
345            None
346        } else {
347            Some(SourceChange::CreateJobFinished {
348                finished_backfill_fragments: source_backfill_fragments,
349            })
350        };
351        Self {
352            job_id,
353            is_recovered: true,
354            source_change,
355        }
356    }
357
358    pub(crate) fn job_id(&self) -> JobId {
359        self.job_id
360    }
361
362    /// Notify the metadata manager that the job is finished.
363    pub(crate) async fn finish(
364        self,
365        metadata_manager: &MetadataManager,
366        source_manager: &SourceManagerRef,
367    ) -> MetaResult<()> {
368        metadata_manager
369            .catalog_controller
370            .finish_streaming_job(self.job_id)
371            .await?;
372        if let Some(source_change) = self.source_change {
373            source_manager.apply_source_change(source_change).await;
374        }
375        Ok(())
376    }
377}
378
379impl std::fmt::Debug for TrackingJob {
380    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381        if !self.is_recovered {
382            write!(f, "TrackingJob::New({})", self.job_id)
383        } else {
384            write!(f, "TrackingJob::Recovered({})", self.job_id)
385        }
386    }
387}
388
389/// Information collected during barrier completion that needs to be committed.
390#[derive(Debug, Default)]
391pub(super) struct StagingCommitInfo {
392    /// Finished jobs that should be committed
393    pub finished_jobs: Vec<TrackingJob>,
394    /// Table IDs whose locality provider state tables need to be truncated
395    pub table_ids_to_truncate: Vec<TableId>,
396    pub finished_cdc_table_backfill: Vec<JobId>,
397}
398
399pub(super) enum UpdateProgressResult {
400    None,
401    /// The finished job, along with its pending backfill fragments for cleanup.
402    Finished {
403        truncate_locality_provider_state_tables: Vec<TableId>,
404    },
405    /// Backfill nodes have finished and new ones need to be scheduled.
406    BackfillNodeFinished(PendingBackfillFragments),
407}
408
409#[derive(Debug)]
410pub(super) struct CreateMviewProgressTracker {
411    tracking_job: TrackingJob,
412    status: CreateMviewStatus,
413}
414
415#[derive(Debug)]
416enum CreateMviewStatus {
417    Backfilling {
418        /// Progress of the create-mview DDL.
419        progress: Progress,
420
421        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
422        pending_backfill_nodes: Vec<FragmentId>,
423
424        /// Table IDs whose locality provider state tables need to be truncated
425        table_ids_to_truncate: Vec<TableId>,
426    },
427    CdcSourceInit,
428    Finished {
429        table_ids_to_truncate: Vec<TableId>,
430    },
431}
432
433impl CreateMviewProgressTracker {
434    pub fn recover(
435        creating_job_id: JobId,
436        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
437        backfill_order_state: BackfillOrderState,
438        version_stats: &HummockVersionStats,
439    ) -> Self {
440        {
441            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
442            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
443            let status = if actors.is_empty() {
444                CreateMviewStatus::Finished {
445                    table_ids_to_truncate: vec![],
446                }
447            } else {
448                let mut states = HashMap::new();
449                let mut backfill_upstream_types = HashMap::new();
450
451                for (actor, backfill_upstream_type) in actors {
452                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
453                    backfill_upstream_types.insert(actor, backfill_upstream_type);
454                }
455
456                let progress = Self::recover_progress(
457                    creating_job_id,
458                    states,
459                    backfill_upstream_types,
460                    StreamJobFragments::upstream_table_counts_impl(
461                        fragment_infos.values().map(|fragment| &fragment.nodes),
462                    ),
463                    version_stats,
464                    backfill_order_state,
465                );
466                let pending_backfill_nodes = progress
467                    .backfill_order_state
468                    .current_backfill_node_fragment_ids();
469                CreateMviewStatus::Backfilling {
470                    progress,
471                    pending_backfill_nodes,
472                    table_ids_to_truncate: vec![],
473                }
474            };
475            Self {
476                tracking_job,
477                status,
478            }
479        }
480    }
481
482    /// ## How recovery works
483    ///
484    /// The progress (number of rows consumed) is persisted in state tables.
485    /// During recovery, the backfill executor will restore the number of rows consumed,
486    /// and then it will just report progress like newly created executors.
487    fn recover_progress(
488        job_id: JobId,
489        states: HashMap<ActorId, BackfillState>,
490        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
491        upstream_mv_count: HashMap<TableId, usize>,
492        version_stats: &HummockVersionStats,
493        backfill_order_state: BackfillOrderState,
494    ) -> Progress {
495        let upstream_mvs_total_key_count =
496            calculate_total_key_count(&upstream_mv_count, version_stats);
497        Progress {
498            job_id,
499            states,
500            backfill_order_state,
501            backfill_upstream_types,
502            done_count: 0, // Fill only after first barrier pass
503            upstream_mv_count,
504            upstream_mvs_total_key_count,
505            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
506            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
507            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
508        }
509    }
510
511    pub fn gen_backfill_progress(&self) -> String {
512        match &self.status {
513            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
514            CreateMviewStatus::CdcSourceInit => "Initializing CDC source...".to_owned(),
515            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
516        }
517    }
518
519    pub(crate) fn actor_progresses(&self) -> Vec<ActorBackfillProgress> {
520        match &self.status {
521            CreateMviewStatus::Backfilling { progress, .. } => {
522                progress.iter_actor_progress().collect()
523            }
524            CreateMviewStatus::CdcSourceInit | CreateMviewStatus::Finished { .. } => vec![],
525        }
526    }
527
528    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
529    /// Return the table ids whose locality provider state tables need to be truncated.
530    pub(super) fn apply_progress(
531        &mut self,
532        create_mview_progress: &CreateMviewProgress,
533        version_stats: &HummockVersionStats,
534    ) {
535        let CreateMviewStatus::Backfilling {
536            progress,
537            pending_backfill_nodes,
538            table_ids_to_truncate,
539        } = &mut self.status
540        else {
541            tracing::warn!(
542                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
543            );
544            return;
545        };
546        {
547            // Update the progress of all commands.
548            {
549                // Those with actors complete can be finished immediately.
550                match progress.apply(create_mview_progress, version_stats) {
551                    UpdateProgressResult::None => {
552                        tracing::trace!(?progress, "update progress");
553                    }
554                    UpdateProgressResult::Finished {
555                        truncate_locality_provider_state_tables,
556                    } => {
557                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
558                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
559                        tracing::trace!(?progress, "finish progress");
560                        self.status = CreateMviewStatus::Finished {
561                            table_ids_to_truncate,
562                        };
563                    }
564                    UpdateProgressResult::BackfillNodeFinished(pending) => {
565                        table_ids_to_truncate
566                            .extend(pending.truncate_locality_provider_state_tables.clone());
567                        tracing::trace!(
568                            ?progress,
569                            next_backfill_nodes = ?pending.next_backfill_nodes,
570                            "start next backfill node"
571                        );
572                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
573                    }
574                }
575            }
576        }
577    }
578
579    /// Refresh tracker state after reschedule so new actors can report progress correctly.
580    pub fn refresh_after_reschedule(
581        &mut self,
582        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
583        version_stats: &HummockVersionStats,
584    ) {
585        let CreateMviewStatus::Backfilling {
586            progress,
587            pending_backfill_nodes,
588            ..
589        } = &mut self.status
590        else {
591            return;
592        };
593
594        let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
595            fragment_infos
596                .values()
597                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
598        );
599
600        #[cfg(debug_assertions)]
601        {
602            use std::collections::HashSet;
603            let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
604            let new_actor_ids: HashSet<_> = new_tracking_actors
605                .iter()
606                .map(|(actor_id, _)| *actor_id)
607                .collect();
608            debug_assert!(
609                old_actor_ids.is_disjoint(&new_actor_ids),
610                "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
611            );
612        }
613
614        let mut new_states = HashMap::new();
615        let mut new_backfill_types = HashMap::new();
616        for (actor_id, upstream_type) in new_tracking_actors {
617            new_states.insert(actor_id, BackfillState::Init);
618            new_backfill_types.insert(actor_id, upstream_type);
619        }
620
621        let fragment_actors: HashMap<_, _> = fragment_infos
622            .iter()
623            .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
624            .collect();
625
626        let newly_scheduled = progress
627            .backfill_order_state
628            .refresh_actors(&fragment_actors);
629
630        progress.backfill_upstream_types = new_backfill_types;
631        progress.states = new_states;
632        progress.done_count = 0;
633
634        progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
635            fragment_infos.values().map(|fragment| &fragment.nodes),
636        );
637        progress.upstream_mvs_total_key_count =
638            calculate_total_key_count(&progress.upstream_mv_count, version_stats);
639
640        progress.mv_backfill_consumed_rows = 0;
641        progress.source_backfill_consumed_rows = 0;
642        progress.mv_backfill_buffered_rows = 0;
643
644        let mut pending = progress
645            .backfill_order_state
646            .current_backfill_node_fragment_ids();
647        pending.extend(newly_scheduled);
648        pending.sort_unstable();
649        pending.dedup();
650        *pending_backfill_nodes = pending;
651    }
652
653    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
654        match &mut self.status {
655            CreateMviewStatus::Backfilling {
656                pending_backfill_nodes,
657                ..
658            } => Some(pending_backfill_nodes.drain(..)),
659            CreateMviewStatus::CdcSourceInit => None,
660            CreateMviewStatus::Finished { .. } => None,
661        }
662        .into_iter()
663        .flatten()
664    }
665
666    pub(super) fn collect_staging_commit_info(
667        &mut self,
668    ) -> (bool, Box<dyn Iterator<Item = TableId> + '_>) {
669        match &mut self.status {
670            CreateMviewStatus::Backfilling {
671                table_ids_to_truncate,
672                ..
673            } => (false, Box::new(table_ids_to_truncate.drain(..))),
674            CreateMviewStatus::CdcSourceInit => (false, Box::new(std::iter::empty())),
675            CreateMviewStatus::Finished {
676                table_ids_to_truncate,
677                ..
678            } => (true, Box::new(table_ids_to_truncate.drain(..))),
679        }
680    }
681
682    pub(super) fn is_finished(&self) -> bool {
683        matches!(self.status, CreateMviewStatus::Finished { .. })
684    }
685
686    /// Mark CDC source as finished when offset is updated.
687    pub(super) fn mark_cdc_source_finished(&mut self) {
688        if matches!(self.status, CreateMviewStatus::CdcSourceInit) {
689            self.status = CreateMviewStatus::Finished {
690                table_ids_to_truncate: vec![],
691            };
692        }
693    }
694
695    pub(super) fn into_tracking_job(self) -> TrackingJob {
696        let CreateMviewStatus::Finished { .. } = self.status else {
697            panic!("should be called when finished");
698        };
699        self.tracking_job
700    }
701
702    pub(crate) fn job_id(&self) -> JobId {
703        self.tracking_job.job_id
704    }
705
706    pub(crate) fn collect_fragment_progress(
707        &self,
708        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
709        mark_done_when_empty: bool,
710    ) -> Vec<FragmentBackfillProgress> {
711        let actor_progresses = self.actor_progresses();
712        if actor_progresses.is_empty() {
713            if mark_done_when_empty && self.is_finished() {
714                return collect_done_fragments(self.job_id(), fragment_infos);
715            }
716            return vec![];
717        }
718        collect_fragment_progress_from_actors(self.job_id(), fragment_infos, &actor_progresses)
719    }
720
721    /// Add a new create-mview DDL command to track.
722    ///
723    /// If the actors to track are empty, return the given command as it can be finished immediately.
724    /// For CDC sources, mark as `CdcSourceInit` instead of Finished.
725    pub fn new(info: &CreateStreamingJobCommandInfo, version_stats: &HummockVersionStats) -> Self {
726        tracing::trace!(?info, "add job to track");
727        let CreateStreamingJobCommandInfo {
728            stream_job_fragments,
729            fragment_backfill_ordering,
730            locality_fragment_state_table_mapping,
731            streaming_job,
732            ..
733        } = info;
734        let job_id = stream_job_fragments.stream_job_id();
735        let actors = stream_job_fragments.tracking_progress_actor_ids();
736        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
737        if actors.is_empty() {
738            // NOTE: This CDC source detection uses hardcoded property checks and should be replaced
739            // with a more reliable identification method in the future.
740            let is_cdc_source = matches!(
741                streaming_job,
742                crate::manager::StreamingJob::Source(source)
743                    if source.info.as_ref().map(|info| info.is_shared()).unwrap_or(false) && source
744                    .get_with_properties()
745                    .get("connector")
746                    .map(|connector| connector.to_lowercase().contains("-cdc"))
747                    .unwrap_or(false)
748            );
749            if is_cdc_source {
750                // Mark CDC source as CdcSourceInit, will be finished when offset is updated
751                return Self {
752                    tracking_job,
753                    status: CreateMviewStatus::CdcSourceInit,
754                };
755            }
756            // The command can be finished immediately.
757            return Self {
758                tracking_job,
759                status: CreateMviewStatus::Finished {
760                    table_ids_to_truncate: vec![],
761                },
762            };
763        }
764
765        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
766        let upstream_total_key_count: u64 =
767            calculate_total_key_count(&upstream_mv_count, version_stats);
768
769        let backfill_order_state = BackfillOrderState::new(
770            fragment_backfill_ordering,
771            stream_job_fragments,
772            locality_fragment_state_table_mapping.clone(),
773        );
774        let progress = Progress::new(
775            job_id,
776            actors,
777            upstream_mv_count,
778            upstream_total_key_count,
779            backfill_order_state,
780        );
781        let pending_backfill_nodes = progress
782            .backfill_order_state
783            .current_backfill_node_fragment_ids();
784        Self {
785            tracking_job,
786            status: CreateMviewStatus::Backfilling {
787                progress,
788                pending_backfill_nodes,
789                table_ids_to_truncate: vec![],
790            },
791        }
792    }
793}
794
795impl Progress {
796    /// Update the progress of `actor` according to the Pb struct.
797    ///
798    /// If all actors in this MV have finished, return the command.
799    fn apply(
800        &mut self,
801        progress: &CreateMviewProgress,
802        version_stats: &HummockVersionStats,
803    ) -> UpdateProgressResult {
804        tracing::trace!(?progress, "update progress");
805        let actor = progress.backfill_actor_id;
806        let job_id = self.job_id;
807
808        let new_state = if progress.done {
809            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
810        } else {
811            BackfillState::ConsumingUpstream(
812                progress.consumed_epoch.into(),
813                progress.consumed_rows,
814                progress.buffered_rows,
815            )
816        };
817
818        {
819            {
820                let progress_state = self;
821
822                let upstream_total_key_count: u64 =
823                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
824
825                tracing::trace!(%job_id, "updating progress for table");
826                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
827
828                if progress_state.is_done() {
829                    tracing::debug!(
830                        %job_id,
831                        "all actors done for creating mview!",
832                    );
833
834                    let PendingBackfillFragments {
835                        next_backfill_nodes,
836                        truncate_locality_provider_state_tables,
837                    } = pending;
838
839                    assert!(next_backfill_nodes.is_empty());
840                    UpdateProgressResult::Finished {
841                        truncate_locality_provider_state_tables,
842                    }
843                } else if !pending.next_backfill_nodes.is_empty()
844                    || !pending.truncate_locality_provider_state_tables.is_empty()
845                {
846                    UpdateProgressResult::BackfillNodeFinished(pending)
847                } else {
848                    UpdateProgressResult::None
849                }
850            }
851        }
852    }
853}
854
855fn calculate_total_key_count(
856    table_count: &HashMap<TableId, usize>,
857    version_stats: &HummockVersionStats,
858) -> u64 {
859    table_count
860        .iter()
861        .map(|(table_id, count)| {
862            assert_ne!(*count, 0);
863            *count as u64
864                * version_stats
865                    .table_stats
866                    .get(table_id)
867                    .map_or(0, |stat| stat.total_key_count as u64)
868        })
869        .sum()
870}
871
872pub(crate) fn collect_fragment_progress_from_actors(
873    job_id: JobId,
874    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
875    actor_progresses: &[ActorBackfillProgress],
876) -> Vec<FragmentBackfillProgress> {
877    let mut actor_to_fragment = HashMap::new();
878    for (fragment_id, info) in fragment_infos {
879        for actor_id in info.actors.keys() {
880            actor_to_fragment.insert(*actor_id, *fragment_id);
881        }
882    }
883
884    let mut per_fragment: HashMap<FragmentId, (u64, usize, usize, BackfillUpstreamType)> =
885        HashMap::new();
886    for progress in actor_progresses {
887        let Some(fragment_id) = actor_to_fragment.get(&progress.actor_id) else {
888            continue;
889        };
890        let entry = per_fragment
891            .entry(*fragment_id)
892            .or_insert((0, 0, 0, progress.upstream_type));
893        entry.0 = entry.0.saturating_add(progress.consumed_rows);
894        entry.1 += progress.done as usize;
895        entry.2 += 1;
896    }
897
898    per_fragment
899        .into_iter()
900        .map(
901            |(fragment_id, (consumed_rows, done_cnt, total_cnt, upstream_type))| {
902                FragmentBackfillProgress {
903                    job_id,
904                    fragment_id,
905                    consumed_rows,
906                    done: total_cnt > 0 && done_cnt == total_cnt,
907                    upstream_type,
908                }
909            },
910        )
911        .collect()
912}
913
914pub(crate) fn collect_done_fragments(
915    job_id: JobId,
916    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
917) -> Vec<FragmentBackfillProgress> {
918    fragment_infos
919        .iter()
920        .filter(|(_, fragment)| {
921            fragment.fragment_type_mask.contains_any([
922                FragmentTypeFlag::StreamScan,
923                FragmentTypeFlag::SourceScan,
924                FragmentTypeFlag::LocalityProvider,
925            ])
926        })
927        .map(|(fragment_id, fragment)| FragmentBackfillProgress {
928            job_id,
929            fragment_id: *fragment_id,
930            consumed_rows: 0,
931            done: true,
932            upstream_type: BackfillUpstreamType::from_fragment_type_mask(
933                fragment.fragment_type_mask,
934            ),
935        })
936        .collect()
937}
938
939#[cfg(test)]
940mod tests {
941    use std::collections::HashSet;
942
943    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
944    use risingwave_common::id::WorkerId;
945    use risingwave_meta_model::fragment::DistributionType;
946    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
947
948    use super::*;
949    use crate::controller::fragment::InflightActorInfo;
950
951    fn sample_inflight_fragment(
952        fragment_id: FragmentId,
953        actor_ids: &[ActorId],
954        flag: FragmentTypeFlag,
955    ) -> InflightFragmentInfo {
956        let mut fragment_type_mask = FragmentTypeMask::empty();
957        fragment_type_mask.add(flag);
958        InflightFragmentInfo {
959            fragment_id,
960            distribution_type: DistributionType::Single,
961            fragment_type_mask,
962            vnode_count: 0,
963            nodes: PbStreamNode::default(),
964            actors: actor_ids
965                .iter()
966                .map(|actor_id| {
967                    (
968                        *actor_id,
969                        InflightActorInfo {
970                            worker_id: WorkerId::new(1),
971                            vnode_bitmap: None,
972                            splits: vec![],
973                        },
974                    )
975                })
976                .collect(),
977            state_table_ids: HashSet::new(),
978        }
979    }
980
981    fn sample_progress(actor_id: ActorId) -> Progress {
982        Progress {
983            job_id: JobId::new(1),
984            states: HashMap::from([(actor_id, BackfillState::Init)]),
985            backfill_order_state: BackfillOrderState::default(),
986            done_count: 0,
987            backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
988            upstream_mv_count: HashMap::new(),
989            upstream_mvs_total_key_count: 0,
990            mv_backfill_consumed_rows: 0,
991            source_backfill_consumed_rows: 0,
992            mv_backfill_buffered_rows: 0,
993        }
994    }
995
996    #[test]
997    fn update_ignores_unknown_actor() {
998        let actor_known = ActorId::new(1);
999        let actor_unknown = ActorId::new(2);
1000        let mut progress = sample_progress(actor_known);
1001
1002        let pending = progress.update(
1003            actor_unknown,
1004            BackfillState::Done(0, 0),
1005            progress.upstream_mvs_total_key_count,
1006        );
1007
1008        assert!(pending.next_backfill_nodes.is_empty());
1009        assert_eq!(progress.states.len(), 1);
1010        assert!(progress.states.contains_key(&actor_known));
1011    }
1012
1013    #[test]
1014    fn refresh_rebuilds_tracking_after_reschedule() {
1015        let actor_old = ActorId::new(1);
1016        let actor_new = ActorId::new(2);
1017
1018        let progress = Progress {
1019            job_id: JobId::new(1),
1020            states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
1021            backfill_order_state: BackfillOrderState::default(),
1022            done_count: 1,
1023            backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
1024            upstream_mv_count: HashMap::new(),
1025            upstream_mvs_total_key_count: 0,
1026            mv_backfill_consumed_rows: 5,
1027            source_backfill_consumed_rows: 0,
1028            mv_backfill_buffered_rows: 0,
1029        };
1030
1031        let mut tracker = CreateMviewProgressTracker {
1032            tracking_job: TrackingJob {
1033                job_id: JobId::new(1),
1034                is_recovered: false,
1035                source_change: None,
1036            },
1037            status: CreateMviewStatus::Backfilling {
1038                progress,
1039                pending_backfill_nodes: vec![],
1040                table_ids_to_truncate: vec![],
1041            },
1042        };
1043
1044        let fragment_infos = HashMap::from([(
1045            FragmentId::new(10),
1046            sample_inflight_fragment(
1047                FragmentId::new(10),
1048                &[actor_new],
1049                FragmentTypeFlag::StreamScan,
1050            ),
1051        )]);
1052
1053        tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
1054
1055        let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
1056            panic!("expected backfilling status");
1057        };
1058        assert!(progress.states.contains_key(&actor_new));
1059        assert!(!progress.states.contains_key(&actor_old));
1060        assert_eq!(progress.done_count, 0);
1061        assert_eq!(progress.mv_backfill_consumed_rows, 0);
1062        assert_eq!(progress.source_backfill_consumed_rows, 0);
1063    }
1064
1065    // CDC sources should be initialized as CdcSourceInit
1066    #[test]
1067    fn test_cdc_source_initialized_as_cdc_source_init() {
1068        use std::collections::BTreeMap;
1069
1070        use risingwave_pb::catalog::{CreateType, PbSource, StreamSourceInfo};
1071
1072        use crate::barrier::command::CreateStreamingJobCommandInfo;
1073        use crate::manager::{StreamingJob, StreamingJobType};
1074        use crate::model::StreamJobFragmentsToCreate;
1075
1076        // Create a CDC source with cdc_source_job = true
1077        let source_info = StreamSourceInfo {
1078            cdc_source_job: true,
1079            ..Default::default()
1080        };
1081
1082        let source = PbSource {
1083            id: risingwave_common::id::SourceId::new(100),
1084            info: Some(source_info),
1085            with_properties: BTreeMap::from([("connector".to_owned(), "fake-cdc".to_owned())]),
1086            ..Default::default()
1087        };
1088
1089        // Create empty fragments (no actors to track)
1090        let fragments = StreamJobFragments::for_test(JobId::new(100), BTreeMap::new());
1091        let stream_job_fragments = StreamJobFragmentsToCreate {
1092            inner: fragments,
1093            downstreams: Default::default(),
1094        };
1095
1096        let info = CreateStreamingJobCommandInfo {
1097            stream_job_fragments,
1098            upstream_fragment_downstreams: Default::default(),
1099            init_split_assignment: Default::default(),
1100            definition: "CREATE SOURCE ...".to_owned(),
1101            job_type: StreamingJobType::Source,
1102            create_type: CreateType::Foreground,
1103            streaming_job: StreamingJob::Source(source),
1104            fragment_backfill_ordering: Default::default(),
1105            cdc_table_snapshot_splits: None,
1106            locality_fragment_state_table_mapping: Default::default(),
1107            is_serverless: false,
1108        };
1109
1110        let tracker = CreateMviewProgressTracker::new(&info, &HummockVersionStats::default());
1111
1112        // CDC source should be in CdcSourceInit state
1113        assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1114        assert!(!tracker.is_finished());
1115    }
1116
1117    // CDC source should transition from CdcSourceInit to Finished when offset is updated
1118    #[test]
1119    fn test_cdc_source_transitions_to_finished_on_offset_update() {
1120        let mut tracker = CreateMviewProgressTracker {
1121            tracking_job: TrackingJob {
1122                job_id: JobId::new(300),
1123                is_recovered: false,
1124                source_change: None,
1125            },
1126            status: CreateMviewStatus::CdcSourceInit,
1127        };
1128
1129        // Initially in CdcSourceInit state
1130        assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1131        assert!(!tracker.is_finished());
1132
1133        // Mark as finished when offset is updated
1134        tracker.mark_cdc_source_finished();
1135
1136        // Should now be in Finished state
1137        assert!(matches!(tracker.status, CreateMviewStatus::Finished { .. }));
1138        assert!(tracker.is_finished());
1139    }
1140}