Skip to main content

risingwave_meta/barrier/
progress.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::take;
17
18use risingwave_common::catalog::{FragmentTypeFlag, TableId};
19use risingwave_common::id::JobId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
24
25use crate::MetaResult;
26use crate::barrier::backfill_order_control::BackfillOrderState;
27use crate::barrier::info::InflightStreamingJobInfo;
28use crate::barrier::{CreateStreamingJobCommandInfo, FragmentBackfillProgress};
29use crate::controller::fragment::InflightFragmentInfo;
30use crate::manager::MetadataManager;
31use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
32use crate::stream::{SourceChange, SourceManagerRef};
33
34type ConsumedRows = u64;
35type BufferedRows = u64;
36
37#[derive(Debug, Clone, Copy)]
38pub(crate) struct ActorBackfillProgress {
39    pub(crate) actor_id: ActorId,
40    pub(crate) upstream_type: BackfillUpstreamType,
41    pub(crate) consumed_rows: u64,
42    pub(crate) done: bool,
43}
44
45#[derive(Clone, Copy, Debug)]
46enum BackfillState {
47    Init,
48    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
49    Done(ConsumedRows, BufferedRows),
50}
51
52/// Represents the backfill nodes that need to be scheduled or cleaned up.
53#[derive(Debug, Default)]
54pub(super) struct PendingBackfillFragments {
55    /// Fragment IDs that should start backfilling in the next checkpoint
56    pub next_backfill_nodes: Vec<FragmentId>,
57    /// State tables of locality provider fragments that should be truncated
58    pub truncate_locality_provider_state_tables: Vec<TableId>,
59}
60
61/// Progress of all actors containing backfill executors while creating mview.
62#[derive(Debug)]
63pub(super) struct Progress {
64    job_id: JobId,
65    // `states` and `done_count` decides whether the progress is done. See `is_done`.
66    states: HashMap<ActorId, BackfillState>,
67    backfill_order_state: BackfillOrderState,
68    done_count: usize,
69
70    /// Tells whether the backfill is from source or mv.
71    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
72
73    // The following row counts are used to calculate the progress. See `calculate_progress`.
74    /// Upstream mv count.
75    /// Keep track of how many times each upstream MV
76    /// appears in this stream job.
77    upstream_mv_count: HashMap<TableId, usize>,
78    /// Total key count of all the upstream materialized views
79    upstream_mvs_total_key_count: u64,
80    mv_backfill_consumed_rows: u64,
81    source_backfill_consumed_rows: u64,
82    /// Buffered rows (for locality backfill) that are yet to be consumed
83    /// This is used to calculate precise progress: consumed / (`upstream_total` + buffered)
84    mv_backfill_buffered_rows: u64,
85}
86
87impl Progress {
88    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
89    fn new(
90        job_id: JobId,
91        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
92        upstream_mv_count: HashMap<TableId, usize>,
93        upstream_total_key_count: u64,
94        backfill_order_state: BackfillOrderState,
95    ) -> Self {
96        let mut states = HashMap::new();
97        let mut backfill_upstream_types = HashMap::new();
98        for (actor, backfill_upstream_type) in actors {
99            states.insert(actor, BackfillState::Init);
100            backfill_upstream_types.insert(actor, backfill_upstream_type);
101        }
102        assert!(!states.is_empty());
103
104        Self {
105            job_id,
106            states,
107            backfill_upstream_types,
108            done_count: 0,
109            upstream_mv_count,
110            upstream_mvs_total_key_count: upstream_total_key_count,
111            mv_backfill_consumed_rows: 0,
112            source_backfill_consumed_rows: 0,
113            mv_backfill_buffered_rows: 0,
114            backfill_order_state,
115        }
116    }
117
118    /// Update the progress of `actor`.
119    /// Returns the backfill fragments that need to be scheduled or cleaned up.
120    fn update(
121        &mut self,
122        actor: ActorId,
123        new_state: BackfillState,
124        upstream_total_key_count: u64,
125    ) -> PendingBackfillFragments {
126        let mut result = PendingBackfillFragments::default();
127        self.upstream_mvs_total_key_count = upstream_total_key_count;
128        let total_actors = self.states.len();
129        let Some(backfill_upstream_type) = self.backfill_upstream_types.get(&actor) else {
130            tracing::warn!(%actor, "receive progress from unknown actor, likely removed after reschedule");
131            return result;
132        };
133
134        let mut old_consumed_row = 0;
135        let mut new_consumed_row = 0;
136        let mut old_buffered_row = 0;
137        let mut new_buffered_row = 0;
138        let Some(prev_state) = self.states.remove(&actor) else {
139            tracing::warn!(%actor, "receive progress for actor not in state map");
140            return result;
141        };
142        match prev_state {
143            BackfillState::Init => {}
144            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
145                old_consumed_row = consumed_rows;
146                old_buffered_row = buffered_rows;
147            }
148            BackfillState::Done(_, _) => panic!("should not report done multiple times"),
149        };
150        match &new_state {
151            BackfillState::Init => {}
152            BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
153                new_consumed_row = *consumed_rows;
154                new_buffered_row = *buffered_rows;
155            }
156            BackfillState::Done(consumed_rows, buffered_rows) => {
157                tracing::debug!("actor {} done", actor);
158                new_consumed_row = *consumed_rows;
159                new_buffered_row = *buffered_rows;
160                self.done_count += 1;
161                let before_backfill_nodes = self
162                    .backfill_order_state
163                    .current_backfill_node_fragment_ids();
164                result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
165                let after_backfill_nodes = self
166                    .backfill_order_state
167                    .current_backfill_node_fragment_ids();
168                // last_backfill_nodes = before_backfill_nodes - after_backfill_nodes
169                let last_backfill_nodes_iter = before_backfill_nodes
170                    .into_iter()
171                    .filter(|x| !after_backfill_nodes.contains(x));
172                result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
173                    .filter_map(|fragment_id| {
174                        self.backfill_order_state
175                            .get_locality_fragment_state_table_mapping()
176                            .get(&fragment_id)
177                    })
178                    .flatten()
179                    .copied()
180                    .collect();
181                tracing::debug!(
182                    "{} actors out of {} complete",
183                    self.done_count,
184                    total_actors,
185                );
186            }
187        };
188        debug_assert!(
189            new_consumed_row >= old_consumed_row,
190            "backfill progress should not go backward"
191        );
192        debug_assert!(
193            new_buffered_row >= old_buffered_row,
194            "backfill progress should not go backward"
195        );
196        match backfill_upstream_type {
197            BackfillUpstreamType::MView => {
198                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
199            }
200            BackfillUpstreamType::Source => {
201                self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
202            }
203            BackfillUpstreamType::Values => {
204                // do not consider progress for values
205            }
206            BackfillUpstreamType::LocalityProvider => {
207                // Track LocalityProvider progress similar to MView
208                // Update buffered rows for precise progress calculation
209                self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
210                self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
211            }
212        }
213        self.states.insert(actor, new_state);
214        result
215    }
216
217    fn iter_actor_progress(&self) -> impl Iterator<Item = ActorBackfillProgress> + '_ {
218        self.states.iter().filter_map(|(actor_id, state)| {
219            let upstream_type = *self.backfill_upstream_types.get(actor_id)?;
220            let (consumed_rows, done) = match *state {
221                BackfillState::Init => (0, false),
222                BackfillState::ConsumingUpstream(_, consumed_rows, _) => (consumed_rows, false),
223                BackfillState::Done(consumed_rows, _) => (consumed_rows, true),
224            };
225            Some(ActorBackfillProgress {
226                actor_id: *actor_id,
227                upstream_type,
228                consumed_rows,
229                done,
230            })
231        })
232    }
233
234    /// Returns whether all backfill executors are done.
235    fn is_done(&self) -> bool {
236        tracing::trace!(
237            "Progress::is_done? {}, {}, {:?}",
238            self.done_count,
239            self.states.len(),
240            self.states
241        );
242        self.done_count == self.states.len()
243    }
244
245    /// `progress` = `consumed_rows` / `upstream_total_key_count`
246    fn calculate_progress(&self) -> String {
247        if self.is_done() || self.states.is_empty() {
248            return "100%".to_owned();
249        }
250        let mut mv_count = 0;
251        let mut source_count = 0;
252        for backfill_upstream_type in self.backfill_upstream_types.values() {
253            match backfill_upstream_type {
254                BackfillUpstreamType::MView => mv_count += 1,
255                BackfillUpstreamType::Source => source_count += 1,
256                BackfillUpstreamType::Values => (),
257                BackfillUpstreamType::LocalityProvider => mv_count += 1, /* Count LocalityProvider as an MView for progress */
258            }
259        }
260
261        let mv_progress = (mv_count > 0).then_some({
262            // Include buffered rows in total for precise progress calculation
263            // Progress = consumed / (upstream_total + buffered)
264            let total_rows_to_consume =
265                self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
266            if total_rows_to_consume == 0 {
267                "99.99%".to_owned()
268            } else {
269                let mut progress =
270                    self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
271                if progress > 1.0 {
272                    progress = 0.9999;
273                }
274                format!(
275                    "{:.2}% ({}/{})",
276                    progress * 100.0,
277                    self.mv_backfill_consumed_rows,
278                    total_rows_to_consume
279                )
280            }
281        });
282        let source_progress = (source_count > 0).then_some(format!(
283            "{} rows consumed",
284            self.source_backfill_consumed_rows
285        ));
286        match (mv_progress, source_progress) {
287            (Some(mv_progress), Some(source_progress)) => {
288                format!(
289                    "MView Backfill: {}, Source Backfill: {}",
290                    mv_progress, source_progress
291                )
292            }
293            (Some(mv_progress), None) => mv_progress,
294            (None, Some(source_progress)) => source_progress,
295            (None, None) => "Unknown".to_owned(),
296        }
297    }
298}
299
300/// There are two kinds of `TrackingJobs`:
301/// 1. if `is_recovered` is false, it is a "New" tracking job.
302///    It is instantiated and managed by the stream manager.
303///    On recovery, the stream manager will stop managing the job.
304/// 2. if `is_recovered` is true, it is a "Recovered" tracking job.
305///    On recovery, the barrier manager will recover and start managing the job.
306pub struct TrackingJob {
307    job_id: JobId,
308    is_recovered: bool,
309    source_change: Option<SourceChange>,
310}
311
312impl std::fmt::Display for TrackingJob {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        write!(
315            f,
316            "{}{}",
317            self.job_id,
318            if self.is_recovered { "<recovered>" } else { "" }
319        )
320    }
321}
322
323impl TrackingJob {
324    /// Create a new tracking job.
325    pub(crate) fn new(stream_job_fragments: &StreamJobFragments) -> Self {
326        Self {
327            job_id: stream_job_fragments.stream_job_id,
328            is_recovered: false,
329            source_change: Some(SourceChange::CreateJobFinished {
330                finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
331            }),
332        }
333    }
334
335    /// Create a recovered tracking job.
336    pub(crate) fn recovered(
337        job_id: JobId,
338        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
339    ) -> Self {
340        Self::recovered_from_fragment_nodes(
341            job_id,
342            fragment_infos
343                .iter()
344                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
345        )
346    }
347
348    pub(crate) fn recovered_from_fragment_nodes<'a>(
349        job_id: JobId,
350        fragment_nodes: impl Iterator<Item = (FragmentId, &'a StreamNode)>,
351    ) -> Self {
352        let source_backfill_fragments =
353            StreamJobFragments::source_backfill_fragments_impl(fragment_nodes);
354        let source_change = if source_backfill_fragments.is_empty() {
355            None
356        } else {
357            Some(SourceChange::CreateJobFinished {
358                finished_backfill_fragments: source_backfill_fragments,
359            })
360        };
361        Self {
362            job_id,
363            is_recovered: true,
364            source_change,
365        }
366    }
367
368    pub(crate) fn job_id(&self) -> JobId {
369        self.job_id
370    }
371
372    /// Notify the metadata manager that the job is finished.
373    pub(crate) async fn finish(
374        self,
375        metadata_manager: &MetadataManager,
376        source_manager: &SourceManagerRef,
377    ) -> MetaResult<()> {
378        metadata_manager
379            .catalog_controller
380            .finish_streaming_job(self.job_id)
381            .await?;
382        if let Some(source_change) = self.source_change {
383            source_manager.apply_source_change(source_change).await;
384        }
385        Ok(())
386    }
387}
388
389impl std::fmt::Debug for TrackingJob {
390    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
391        if !self.is_recovered {
392            write!(f, "TrackingJob::New({})", self.job_id)
393        } else {
394            write!(f, "TrackingJob::Recovered({})", self.job_id)
395        }
396    }
397}
398
399/// Information collected during barrier completion that needs to be committed.
400#[derive(Debug, Default)]
401pub(super) struct StagingCommitInfo {
402    /// Finished jobs that should be committed
403    pub finished_jobs: Vec<TrackingJob>,
404    /// Table IDs whose locality provider state tables need to be truncated
405    pub table_ids_to_truncate: Vec<TableId>,
406    pub finished_cdc_table_backfill: Vec<JobId>,
407}
408
409pub(super) enum UpdateProgressResult {
410    None,
411    /// The finished job, along with its pending backfill fragments for cleanup.
412    Finished {
413        truncate_locality_provider_state_tables: Vec<TableId>,
414    },
415    /// Backfill nodes have finished and new ones need to be scheduled.
416    BackfillNodeFinished(PendingBackfillFragments),
417}
418
419#[derive(Debug)]
420pub(super) struct CreateMviewProgressTracker {
421    tracking_job: TrackingJob,
422    status: CreateMviewStatus,
423}
424
425#[derive(Debug)]
426enum CreateMviewStatus {
427    Backfilling {
428        /// Progress of the create-mview DDL.
429        progress: Progress,
430
431        /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
432        pending_backfill_nodes: Vec<FragmentId>,
433
434        /// Table IDs whose locality provider state tables need to be truncated
435        table_ids_to_truncate: Vec<TableId>,
436    },
437    CdcSourceInit,
438    Finished {
439        table_ids_to_truncate: Vec<TableId>,
440    },
441}
442
443impl CreateMviewProgressTracker {
444    pub fn recover(
445        creating_job_id: JobId,
446        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
447        backfill_order_state: BackfillOrderState,
448        version_stats: &HummockVersionStats,
449    ) -> Self {
450        {
451            let tracking_job = TrackingJob::recovered(creating_job_id, fragment_infos);
452            let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
453            let status = if actors.is_empty() {
454                CreateMviewStatus::Finished {
455                    table_ids_to_truncate: vec![],
456                }
457            } else {
458                let mut states = HashMap::new();
459                let mut backfill_upstream_types = HashMap::new();
460
461                for (actor, backfill_upstream_type) in actors {
462                    states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
463                    backfill_upstream_types.insert(actor, backfill_upstream_type);
464                }
465
466                let progress = Self::recover_progress(
467                    creating_job_id,
468                    states,
469                    backfill_upstream_types,
470                    StreamJobFragments::upstream_table_counts_impl(
471                        fragment_infos.values().map(|fragment| &fragment.nodes),
472                    ),
473                    version_stats,
474                    backfill_order_state,
475                );
476                let pending_backfill_nodes = progress
477                    .backfill_order_state
478                    .current_backfill_node_fragment_ids();
479                CreateMviewStatus::Backfilling {
480                    progress,
481                    pending_backfill_nodes,
482                    table_ids_to_truncate: vec![],
483                }
484            };
485            Self {
486                tracking_job,
487                status,
488            }
489        }
490    }
491
492    /// ## How recovery works
493    ///
494    /// The progress (number of rows consumed) is persisted in state tables.
495    /// During recovery, the backfill executor will restore the number of rows consumed,
496    /// and then it will just report progress like newly created executors.
497    fn recover_progress(
498        job_id: JobId,
499        states: HashMap<ActorId, BackfillState>,
500        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
501        upstream_mv_count: HashMap<TableId, usize>,
502        version_stats: &HummockVersionStats,
503        backfill_order_state: BackfillOrderState,
504    ) -> Progress {
505        let upstream_mvs_total_key_count =
506            calculate_total_key_count(&upstream_mv_count, version_stats);
507        Progress {
508            job_id,
509            states,
510            backfill_order_state,
511            backfill_upstream_types,
512            done_count: 0, // Fill only after first barrier pass
513            upstream_mv_count,
514            upstream_mvs_total_key_count,
515            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
516            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
517            mv_backfill_buffered_rows: 0, // Fill only after first barrier pass
518        }
519    }
520
521    pub fn gen_backfill_progress(&self) -> String {
522        match &self.status {
523            CreateMviewStatus::Backfilling { progress, .. } => progress.calculate_progress(),
524            CreateMviewStatus::CdcSourceInit => "Initializing CDC source...".to_owned(),
525            CreateMviewStatus::Finished { .. } => "100%".to_owned(),
526        }
527    }
528
529    pub(crate) fn actor_progresses(&self) -> Vec<ActorBackfillProgress> {
530        match &self.status {
531            CreateMviewStatus::Backfilling { progress, .. } => {
532                progress.iter_actor_progress().collect()
533            }
534            CreateMviewStatus::CdcSourceInit | CreateMviewStatus::Finished { .. } => vec![],
535        }
536    }
537
538    /// Update the progress of tracked jobs, and add a new job to track if `info` is `Some`.
539    /// Return the table ids whose locality provider state tables need to be truncated.
540    pub(super) fn apply_progress(
541        &mut self,
542        create_mview_progress: &CreateMviewProgress,
543        version_stats: &HummockVersionStats,
544    ) {
545        let CreateMviewStatus::Backfilling {
546            progress,
547            pending_backfill_nodes,
548            table_ids_to_truncate,
549        } = &mut self.status
550        else {
551            tracing::warn!(
552                "update the progress of an backfill finished streaming job: {create_mview_progress:?}"
553            );
554            return;
555        };
556        {
557            // Update the progress of all commands.
558            {
559                // Those with actors complete can be finished immediately.
560                match progress.apply(create_mview_progress, version_stats) {
561                    UpdateProgressResult::None => {
562                        tracing::trace!(?progress, "update progress");
563                    }
564                    UpdateProgressResult::Finished {
565                        truncate_locality_provider_state_tables,
566                    } => {
567                        let mut table_ids_to_truncate = take(table_ids_to_truncate);
568                        table_ids_to_truncate.extend(truncate_locality_provider_state_tables);
569                        tracing::trace!(?progress, "finish progress");
570                        self.status = CreateMviewStatus::Finished {
571                            table_ids_to_truncate,
572                        };
573                    }
574                    UpdateProgressResult::BackfillNodeFinished(pending) => {
575                        table_ids_to_truncate
576                            .extend(pending.truncate_locality_provider_state_tables.clone());
577                        tracing::trace!(
578                            ?progress,
579                            next_backfill_nodes = ?pending.next_backfill_nodes,
580                            "start next backfill node"
581                        );
582                        pending_backfill_nodes.extend(pending.next_backfill_nodes);
583                    }
584                }
585            }
586        }
587    }
588
589    /// Refresh tracker state after reschedule so new actors can report progress correctly.
590    pub fn refresh_after_reschedule(
591        &mut self,
592        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
593        version_stats: &HummockVersionStats,
594    ) {
595        let CreateMviewStatus::Backfilling {
596            progress,
597            pending_backfill_nodes,
598            ..
599        } = &mut self.status
600        else {
601            return;
602        };
603
604        let new_tracking_actors = StreamJobFragments::tracking_progress_actor_ids_impl(
605            fragment_infos
606                .values()
607                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
608        );
609
610        #[cfg(debug_assertions)]
611        {
612            use std::collections::HashSet;
613            let old_actor_ids: HashSet<_> = progress.states.keys().copied().collect();
614            let new_actor_ids: HashSet<_> = new_tracking_actors
615                .iter()
616                .map(|(actor_id, _)| *actor_id)
617                .collect();
618            debug_assert!(
619                old_actor_ids.is_disjoint(&new_actor_ids),
620                "reschedule should rebuild backfill actors; old={old_actor_ids:?}, new={new_actor_ids:?}"
621            );
622        }
623
624        let mut new_states = HashMap::new();
625        let mut new_backfill_types = HashMap::new();
626        for (actor_id, upstream_type) in new_tracking_actors {
627            new_states.insert(actor_id, BackfillState::Init);
628            new_backfill_types.insert(actor_id, upstream_type);
629        }
630
631        let fragment_actors: HashMap<_, _> = fragment_infos
632            .iter()
633            .map(|(fragment_id, info)| (*fragment_id, info.actors.keys().copied().collect()))
634            .collect();
635
636        let newly_scheduled = progress
637            .backfill_order_state
638            .refresh_actors(&fragment_actors);
639
640        progress.backfill_upstream_types = new_backfill_types;
641        progress.states = new_states;
642        progress.done_count = 0;
643
644        progress.upstream_mv_count = StreamJobFragments::upstream_table_counts_impl(
645            fragment_infos.values().map(|fragment| &fragment.nodes),
646        );
647        progress.upstream_mvs_total_key_count =
648            calculate_total_key_count(&progress.upstream_mv_count, version_stats);
649
650        progress.mv_backfill_consumed_rows = 0;
651        progress.source_backfill_consumed_rows = 0;
652        progress.mv_backfill_buffered_rows = 0;
653
654        let mut pending = progress
655            .backfill_order_state
656            .current_backfill_node_fragment_ids();
657        pending.extend(newly_scheduled);
658        pending.sort_unstable();
659        pending.dedup();
660        *pending_backfill_nodes = pending;
661    }
662
663    pub(super) fn take_pending_backfill_nodes(&mut self) -> impl Iterator<Item = FragmentId> + '_ {
664        match &mut self.status {
665            CreateMviewStatus::Backfilling {
666                pending_backfill_nodes,
667                ..
668            } => Some(pending_backfill_nodes.drain(..)),
669            CreateMviewStatus::CdcSourceInit => None,
670            CreateMviewStatus::Finished { .. } => None,
671        }
672        .into_iter()
673        .flatten()
674    }
675
676    pub(super) fn collect_staging_commit_info(
677        &mut self,
678    ) -> (bool, Box<dyn Iterator<Item = TableId> + '_>) {
679        match &mut self.status {
680            CreateMviewStatus::Backfilling {
681                table_ids_to_truncate,
682                ..
683            } => (false, Box::new(table_ids_to_truncate.drain(..))),
684            CreateMviewStatus::CdcSourceInit => (false, Box::new(std::iter::empty())),
685            CreateMviewStatus::Finished {
686                table_ids_to_truncate,
687                ..
688            } => (true, Box::new(table_ids_to_truncate.drain(..))),
689        }
690    }
691
692    pub(super) fn is_finished(&self) -> bool {
693        matches!(self.status, CreateMviewStatus::Finished { .. })
694    }
695
696    /// Mark CDC source as finished when offset is updated.
697    pub(super) fn mark_cdc_source_finished(&mut self) {
698        if matches!(self.status, CreateMviewStatus::CdcSourceInit) {
699            self.status = CreateMviewStatus::Finished {
700                table_ids_to_truncate: vec![],
701            };
702        }
703    }
704
705    pub(super) fn into_tracking_job(self) -> TrackingJob {
706        let CreateMviewStatus::Finished { .. } = self.status else {
707            panic!("should be called when finished");
708        };
709        self.tracking_job
710    }
711
712    pub(crate) fn job_id(&self) -> JobId {
713        self.tracking_job.job_id
714    }
715
716    pub(crate) fn collect_fragment_progress(
717        &self,
718        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
719        mark_done_when_empty: bool,
720    ) -> Vec<FragmentBackfillProgress> {
721        let actor_progresses = self.actor_progresses();
722        if actor_progresses.is_empty() {
723            if mark_done_when_empty && self.is_finished() {
724                return collect_done_fragments(self.job_id(), fragment_infos);
725            }
726            return vec![];
727        }
728        collect_fragment_progress_from_actors(self.job_id(), fragment_infos, &actor_progresses)
729    }
730
731    /// Add a new create-mview DDL command to track.
732    ///
733    /// If the actors to track are empty, return the given command as it can be finished immediately.
734    /// For CDC sources, mark as `CdcSourceInit` instead of Finished.
735    pub fn new(
736        info: &CreateStreamingJobCommandInfo,
737        version_stats: &HummockVersionStats,
738        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
739    ) -> Self {
740        tracing::trace!(?info, "add job to track");
741        let CreateStreamingJobCommandInfo {
742            stream_job_fragments,
743            fragment_backfill_ordering,
744            locality_fragment_state_table_mapping,
745            streaming_job,
746            ..
747        } = info;
748        let job_id = stream_job_fragments.stream_job_id();
749        let actors = InflightStreamingJobInfo::tracking_progress_actor_ids(fragment_infos);
750        let tracking_job = TrackingJob::new(&info.stream_job_fragments);
751        if actors.is_empty() {
752            // NOTE: This CDC source detection uses hardcoded property checks and should be replaced
753            // with a more reliable identification method in the future.
754            let is_cdc_source = matches!(
755                streaming_job,
756                crate::manager::StreamingJob::Source(source)
757                    if source.info.as_ref().map(|info| info.is_shared()).unwrap_or(false) && source
758                    .get_with_properties()
759                    .get("connector")
760                    .map(|connector| connector.to_lowercase().contains("-cdc"))
761                    .unwrap_or(false)
762            );
763            if is_cdc_source {
764                // Mark CDC source as CdcSourceInit, will be finished when offset is updated
765                return Self {
766                    tracking_job,
767                    status: CreateMviewStatus::CdcSourceInit,
768                };
769            }
770            // The command can be finished immediately.
771            return Self {
772                tracking_job,
773                status: CreateMviewStatus::Finished {
774                    table_ids_to_truncate: vec![],
775                },
776            };
777        }
778
779        let upstream_mv_count = stream_job_fragments.upstream_table_counts();
780        let upstream_total_key_count: u64 =
781            calculate_total_key_count(&upstream_mv_count, version_stats);
782
783        let backfill_order_state = BackfillOrderState::new(
784            fragment_backfill_ordering,
785            fragment_infos,
786            locality_fragment_state_table_mapping.clone(),
787        );
788        let progress = Progress::new(
789            job_id,
790            actors,
791            upstream_mv_count,
792            upstream_total_key_count,
793            backfill_order_state,
794        );
795        let pending_backfill_nodes = progress
796            .backfill_order_state
797            .current_backfill_node_fragment_ids();
798        Self {
799            tracking_job,
800            status: CreateMviewStatus::Backfilling {
801                progress,
802                pending_backfill_nodes,
803                table_ids_to_truncate: vec![],
804            },
805        }
806    }
807}
808
809impl Progress {
810    /// Update the progress of `actor` according to the Pb struct.
811    ///
812    /// If all actors in this MV have finished, return the command.
813    fn apply(
814        &mut self,
815        progress: &CreateMviewProgress,
816        version_stats: &HummockVersionStats,
817    ) -> UpdateProgressResult {
818        tracing::trace!(?progress, "update progress");
819        let actor = progress.backfill_actor_id;
820        let job_id = self.job_id;
821
822        let new_state = if progress.done {
823            BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
824        } else {
825            BackfillState::ConsumingUpstream(
826                progress.consumed_epoch.into(),
827                progress.consumed_rows,
828                progress.buffered_rows,
829            )
830        };
831
832        {
833            {
834                let progress_state = self;
835
836                let upstream_total_key_count: u64 =
837                    calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
838
839                tracing::trace!(%job_id, "updating progress for table");
840                let pending = progress_state.update(actor, new_state, upstream_total_key_count);
841
842                if progress_state.is_done() {
843                    tracing::debug!(
844                        %job_id,
845                        "all actors done for creating mview!",
846                    );
847
848                    let PendingBackfillFragments {
849                        next_backfill_nodes,
850                        truncate_locality_provider_state_tables,
851                    } = pending;
852
853                    assert!(next_backfill_nodes.is_empty());
854                    UpdateProgressResult::Finished {
855                        truncate_locality_provider_state_tables,
856                    }
857                } else if !pending.next_backfill_nodes.is_empty()
858                    || !pending.truncate_locality_provider_state_tables.is_empty()
859                {
860                    UpdateProgressResult::BackfillNodeFinished(pending)
861                } else {
862                    UpdateProgressResult::None
863                }
864            }
865        }
866    }
867}
868
869fn calculate_total_key_count(
870    table_count: &HashMap<TableId, usize>,
871    version_stats: &HummockVersionStats,
872) -> u64 {
873    table_count
874        .iter()
875        .map(|(table_id, count)| {
876            assert_ne!(*count, 0);
877            *count as u64
878                * version_stats
879                    .table_stats
880                    .get(table_id)
881                    .map_or(0, |stat| stat.total_key_count as u64)
882        })
883        .sum()
884}
885
886pub(crate) fn collect_fragment_progress_from_actors(
887    job_id: JobId,
888    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
889    actor_progresses: &[ActorBackfillProgress],
890) -> Vec<FragmentBackfillProgress> {
891    let mut actor_to_fragment = HashMap::new();
892    for (fragment_id, info) in fragment_infos {
893        for actor_id in info.actors.keys() {
894            actor_to_fragment.insert(*actor_id, *fragment_id);
895        }
896    }
897
898    let mut per_fragment: HashMap<FragmentId, (u64, usize, usize, BackfillUpstreamType)> =
899        HashMap::new();
900    for progress in actor_progresses {
901        let Some(fragment_id) = actor_to_fragment.get(&progress.actor_id) else {
902            continue;
903        };
904        let entry = per_fragment
905            .entry(*fragment_id)
906            .or_insert((0, 0, 0, progress.upstream_type));
907        entry.0 = entry.0.saturating_add(progress.consumed_rows);
908        entry.1 += progress.done as usize;
909        entry.2 += 1;
910    }
911
912    per_fragment
913        .into_iter()
914        .map(
915            |(fragment_id, (consumed_rows, done_cnt, total_cnt, upstream_type))| {
916                FragmentBackfillProgress {
917                    job_id,
918                    fragment_id,
919                    consumed_rows,
920                    done: total_cnt > 0 && done_cnt == total_cnt,
921                    upstream_type,
922                }
923            },
924        )
925        .collect()
926}
927
928pub(crate) fn collect_done_fragments(
929    job_id: JobId,
930    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
931) -> Vec<FragmentBackfillProgress> {
932    fragment_infos
933        .iter()
934        .filter(|(_, fragment)| {
935            fragment.fragment_type_mask.contains_any([
936                FragmentTypeFlag::StreamScan,
937                FragmentTypeFlag::SourceScan,
938                FragmentTypeFlag::LocalityProvider,
939            ])
940        })
941        .map(|(fragment_id, fragment)| FragmentBackfillProgress {
942            job_id,
943            fragment_id: *fragment_id,
944            consumed_rows: 0,
945            done: true,
946            upstream_type: BackfillUpstreamType::from_fragment_type_mask(
947                fragment.fragment_type_mask,
948            ),
949        })
950        .collect()
951}
952
953#[cfg(test)]
954mod tests {
955    use std::collections::HashSet;
956
957    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
958    use risingwave_common::id::WorkerId;
959    use risingwave_meta_model::fragment::DistributionType;
960    use risingwave_pb::stream_plan::StreamNode as PbStreamNode;
961
962    use super::*;
963    use crate::controller::fragment::InflightActorInfo;
964
965    fn sample_inflight_fragment(
966        fragment_id: FragmentId,
967        actor_ids: &[ActorId],
968        flag: FragmentTypeFlag,
969    ) -> InflightFragmentInfo {
970        let mut fragment_type_mask = FragmentTypeMask::empty();
971        fragment_type_mask.add(flag);
972        InflightFragmentInfo {
973            fragment_id,
974            distribution_type: DistributionType::Single,
975            fragment_type_mask,
976            vnode_count: 0,
977            nodes: PbStreamNode::default(),
978            actors: actor_ids
979                .iter()
980                .map(|actor_id| {
981                    (
982                        *actor_id,
983                        InflightActorInfo {
984                            worker_id: WorkerId::new(1),
985                            vnode_bitmap: None,
986                            splits: vec![],
987                        },
988                    )
989                })
990                .collect(),
991            state_table_ids: HashSet::new(),
992        }
993    }
994
995    fn sample_progress(actor_id: ActorId) -> Progress {
996        Progress {
997            job_id: JobId::new(1),
998            states: HashMap::from([(actor_id, BackfillState::Init)]),
999            backfill_order_state: BackfillOrderState::default(),
1000            done_count: 0,
1001            backfill_upstream_types: HashMap::from([(actor_id, BackfillUpstreamType::MView)]),
1002            upstream_mv_count: HashMap::new(),
1003            upstream_mvs_total_key_count: 0,
1004            mv_backfill_consumed_rows: 0,
1005            source_backfill_consumed_rows: 0,
1006            mv_backfill_buffered_rows: 0,
1007        }
1008    }
1009
1010    #[test]
1011    fn update_ignores_unknown_actor() {
1012        let actor_known = ActorId::new(1);
1013        let actor_unknown = ActorId::new(2);
1014        let mut progress = sample_progress(actor_known);
1015
1016        let pending = progress.update(
1017            actor_unknown,
1018            BackfillState::Done(0, 0),
1019            progress.upstream_mvs_total_key_count,
1020        );
1021
1022        assert!(pending.next_backfill_nodes.is_empty());
1023        assert_eq!(progress.states.len(), 1);
1024        assert!(progress.states.contains_key(&actor_known));
1025    }
1026
1027    #[test]
1028    fn refresh_rebuilds_tracking_after_reschedule() {
1029        let actor_old = ActorId::new(1);
1030        let actor_new = ActorId::new(2);
1031
1032        let progress = Progress {
1033            job_id: JobId::new(1),
1034            states: HashMap::from([(actor_old, BackfillState::Done(5, 0))]),
1035            backfill_order_state: BackfillOrderState::default(),
1036            done_count: 1,
1037            backfill_upstream_types: HashMap::from([(actor_old, BackfillUpstreamType::MView)]),
1038            upstream_mv_count: HashMap::new(),
1039            upstream_mvs_total_key_count: 0,
1040            mv_backfill_consumed_rows: 5,
1041            source_backfill_consumed_rows: 0,
1042            mv_backfill_buffered_rows: 0,
1043        };
1044
1045        let mut tracker = CreateMviewProgressTracker {
1046            tracking_job: TrackingJob {
1047                job_id: JobId::new(1),
1048                is_recovered: false,
1049                source_change: None,
1050            },
1051            status: CreateMviewStatus::Backfilling {
1052                progress,
1053                pending_backfill_nodes: vec![],
1054                table_ids_to_truncate: vec![],
1055            },
1056        };
1057
1058        let fragment_infos = HashMap::from([(
1059            FragmentId::new(10),
1060            sample_inflight_fragment(
1061                FragmentId::new(10),
1062                &[actor_new],
1063                FragmentTypeFlag::StreamScan,
1064            ),
1065        )]);
1066
1067        tracker.refresh_after_reschedule(&fragment_infos, &HummockVersionStats::default());
1068
1069        let CreateMviewStatus::Backfilling { progress, .. } = tracker.status else {
1070            panic!("expected backfilling status");
1071        };
1072        assert!(progress.states.contains_key(&actor_new));
1073        assert!(!progress.states.contains_key(&actor_old));
1074        assert_eq!(progress.done_count, 0);
1075        assert_eq!(progress.mv_backfill_consumed_rows, 0);
1076        assert_eq!(progress.source_backfill_consumed_rows, 0);
1077    }
1078
1079    // CDC sources should be initialized as CdcSourceInit
1080    #[test]
1081    fn test_cdc_source_initialized_as_cdc_source_init() {
1082        use std::collections::BTreeMap;
1083
1084        use risingwave_meta_model::streaming_job;
1085        use risingwave_pb::catalog::{CreateType, PbSource, StreamSourceInfo};
1086
1087        use crate::barrier::command::CreateStreamingJobCommandInfo;
1088        use crate::manager::{StreamingJob, StreamingJobType};
1089        use crate::model::StreamJobFragmentsToCreate;
1090
1091        // Create a CDC source with cdc_source_job = true
1092        let source_info = StreamSourceInfo {
1093            cdc_source_job: true,
1094            ..Default::default()
1095        };
1096
1097        let source = PbSource {
1098            id: risingwave_common::id::SourceId::new(100),
1099            info: Some(source_info),
1100            with_properties: BTreeMap::from([("connector".to_owned(), "fake-cdc".to_owned())]),
1101            ..Default::default()
1102        };
1103
1104        // Create empty fragments (no actors to track)
1105        let fragments = StreamJobFragments::for_test(JobId::new(100), BTreeMap::new());
1106        let stream_job_fragments = StreamJobFragmentsToCreate {
1107            inner: fragments,
1108            downstreams: Default::default(),
1109        };
1110
1111        let info = CreateStreamingJobCommandInfo {
1112            stream_job_fragments,
1113            upstream_fragment_downstreams: Default::default(),
1114            init_split_assignment: Default::default(),
1115            definition: "CREATE SOURCE ...".to_owned(),
1116            job_type: StreamingJobType::Source,
1117            create_type: CreateType::Foreground,
1118            streaming_job: StreamingJob::Source(source),
1119            database_resource_group: risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP
1120                .to_owned(),
1121            fragment_backfill_ordering: Default::default(),
1122            cdc_table_snapshot_splits: None,
1123            locality_fragment_state_table_mapping: Default::default(),
1124            is_serverless: false,
1125            refresh_interval_sec: None,
1126            streaming_job_model: streaming_job::Model {
1127                job_id: JobId::new(100),
1128                job_status: risingwave_meta_model::JobStatus::Creating,
1129                create_type: risingwave_meta_model::CreateType::Foreground,
1130                timezone: None,
1131                config_override: None,
1132                adaptive_parallelism_strategy: None,
1133                parallelism: risingwave_meta_model::StreamingParallelism::Adaptive,
1134                backfill_parallelism: None,
1135                backfill_adaptive_parallelism_strategy: None,
1136                backfill_orders: None,
1137                max_parallelism: 256,
1138                specific_resource_group: None,
1139                is_serverless_backfill: false,
1140                refresh_interval_sec: None,
1141            },
1142        };
1143
1144        let tracker = CreateMviewProgressTracker::new(
1145            &info,
1146            &HummockVersionStats::default(),
1147            &HashMap::new(),
1148        );
1149
1150        // CDC source should be in CdcSourceInit state
1151        assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1152        assert!(!tracker.is_finished());
1153    }
1154
1155    // CDC source should transition from CdcSourceInit to Finished when offset is updated
1156    #[test]
1157    fn test_cdc_source_transitions_to_finished_on_offset_update() {
1158        let mut tracker = CreateMviewProgressTracker {
1159            tracking_job: TrackingJob {
1160                job_id: JobId::new(300),
1161                is_recovered: false,
1162                source_change: None,
1163            },
1164            status: CreateMviewStatus::CdcSourceInit,
1165        };
1166
1167        // Initially in CdcSourceInit state
1168        assert!(matches!(tracker.status, CreateMviewStatus::CdcSourceInit));
1169        assert!(!tracker.is_finished());
1170
1171        // Mark as finished when offset is updated
1172        tracker.mark_cdc_source_finished();
1173
1174        // Should now be in Finished state
1175        assert!(matches!(tracker.status, CreateMviewStatus::Finished { .. }));
1176        assert!(tracker.is_finished());
1177    }
1178}