risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::{CreateType, ObjectId};
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::PbBarrierCompleteResponse;
25use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
26
27use crate::MetaResult;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::BarrierInfo;
30use crate::barrier::{Command, CreateStreamingJobCommandInfo, CreateStreamingJobType};
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33
34type ConsumedRows = u64;
35
36#[derive(Clone, Copy, Debug)]
37enum BackfillState {
38    Init,
39    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
40    Done(ConsumedRows),
41}
42
43/// Progress of all actors containing backfill executors while creating mview.
44#[derive(Debug)]
45pub(super) struct Progress {
46    // `states` and `done_count` decides whether the progress is done. See `is_done`.
47    states: HashMap<ActorId, BackfillState>,
48    backfill_order_state: BackfillOrderState,
49    done_count: usize,
50
51    /// Tells whether the backfill is from source or mv.
52    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
53
54    // The following row counts are used to calculate the progress. See `calculate_progress`.
55    /// Upstream mv count.
56    /// Keep track of how many times each upstream MV
57    /// appears in this stream job.
58    upstream_mv_count: HashMap<TableId, usize>,
59    /// Total key count of all the upstream materialized views
60    upstream_mvs_total_key_count: u64,
61    mv_backfill_consumed_rows: u64,
62    source_backfill_consumed_rows: u64,
63
64    /// DDL definition
65    definition: String,
66    /// Create type
67    create_type: CreateType,
68}
69
70impl Progress {
71    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
72    fn new(
73        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
74        upstream_mv_count: HashMap<TableId, usize>,
75        upstream_total_key_count: u64,
76        definition: String,
77        create_type: CreateType,
78        backfill_order_state: BackfillOrderState,
79    ) -> Self {
80        let mut states = HashMap::new();
81        let mut backfill_upstream_types = HashMap::new();
82        for (actor, backfill_upstream_type) in actors {
83            states.insert(actor, BackfillState::Init);
84            backfill_upstream_types.insert(actor, backfill_upstream_type);
85        }
86        assert!(!states.is_empty());
87
88        Self {
89            states,
90            backfill_upstream_types,
91            done_count: 0,
92            upstream_mv_count,
93            upstream_mvs_total_key_count: upstream_total_key_count,
94            mv_backfill_consumed_rows: 0,
95            source_backfill_consumed_rows: 0,
96            definition,
97            create_type,
98            backfill_order_state,
99        }
100    }
101
102    /// Update the progress of `actor`.
103    fn update(
104        &mut self,
105        actor: ActorId,
106        new_state: BackfillState,
107        upstream_total_key_count: u64,
108    ) -> Vec<FragmentId> {
109        let mut next_backfill_nodes = vec![];
110        self.upstream_mvs_total_key_count = upstream_total_key_count;
111        let total_actors = self.states.len();
112        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
113        tracing::debug!(?actor, states = ?self.states, "update progress for actor");
114
115        let mut old = 0;
116        let mut new = 0;
117        match self.states.remove(&actor).unwrap() {
118            BackfillState::Init => {}
119            BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
120                old = old_consumed_rows;
121            }
122            BackfillState::Done(_) => panic!("should not report done multiple times"),
123        };
124        match &new_state {
125            BackfillState::Init => {}
126            BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
127                new = *new_consumed_rows;
128            }
129            BackfillState::Done(new_consumed_rows) => {
130                tracing::debug!("actor {} done", actor);
131                new = *new_consumed_rows;
132                self.done_count += 1;
133                next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
134                tracing::debug!(
135                    "{} actors out of {} complete",
136                    self.done_count,
137                    total_actors,
138                );
139            }
140        };
141        debug_assert!(new >= old, "backfill progress should not go backward");
142        match backfill_upstream_type {
143            BackfillUpstreamType::MView => {
144                self.mv_backfill_consumed_rows += new - old;
145            }
146            BackfillUpstreamType::Source => {
147                self.source_backfill_consumed_rows += new - old;
148            }
149            BackfillUpstreamType::Values => {
150                // do not consider progress for values
151            }
152        }
153        self.states.insert(actor, new_state);
154        next_backfill_nodes
155    }
156
157    /// Returns whether all backfill executors are done.
158    fn is_done(&self) -> bool {
159        tracing::trace!(
160            "Progress::is_done? {}, {}, {:?}",
161            self.done_count,
162            self.states.len(),
163            self.states
164        );
165        self.done_count == self.states.len()
166    }
167
168    /// Returns the ids of all actors containing the backfill executors for the mview tracked by this
169    /// [`Progress`].
170    fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
171        self.states.keys().cloned()
172    }
173
174    /// `progress` = `consumed_rows` / `upstream_total_key_count`
175    fn calculate_progress(&self) -> String {
176        if self.is_done() || self.states.is_empty() {
177            return "100%".to_owned();
178        }
179        let mut mv_count = 0;
180        let mut source_count = 0;
181        for backfill_upstream_type in self.backfill_upstream_types.values() {
182            match backfill_upstream_type {
183                BackfillUpstreamType::MView => mv_count += 1,
184                BackfillUpstreamType::Source => source_count += 1,
185                BackfillUpstreamType::Values => (),
186            }
187        }
188
189        let mv_progress = (mv_count > 0).then_some({
190            if self.upstream_mvs_total_key_count == 0 {
191                "99.99%".to_owned()
192            } else {
193                let mut progress = self.mv_backfill_consumed_rows as f64
194                    / (self.upstream_mvs_total_key_count as f64);
195                if progress > 1.0 {
196                    progress = 0.9999;
197                }
198                format!(
199                    "{:.2}% ({}/{})",
200                    progress * 100.0,
201                    self.mv_backfill_consumed_rows,
202                    self.upstream_mvs_total_key_count
203                )
204            }
205        });
206        let source_progress = (source_count > 0).then_some(format!(
207            "{} rows consumed",
208            self.source_backfill_consumed_rows
209        ));
210        match (mv_progress, source_progress) {
211            (Some(mv_progress), Some(source_progress)) => {
212                format!(
213                    "MView Backfill: {}, Source Backfill: {}",
214                    mv_progress, source_progress
215                )
216            }
217            (Some(mv_progress), None) => mv_progress,
218            (None, Some(source_progress)) => source_progress,
219            (None, None) => "Unknown".to_owned(),
220        }
221    }
222}
223
224/// There are two kinds of `TrackingJobs`:
225/// 1. `New`. This refers to the "New" type of tracking job.
226///    It is instantiated and managed by the stream manager.
227///    On recovery, the stream manager will stop managing the job.
228/// 2. `Recovered`. This refers to the "Recovered" type of tracking job.
229///    On recovery, the barrier manager will recover and start managing the job.
230pub enum TrackingJob {
231    New(TrackingCommand),
232    Recovered(RecoveredTrackingJob),
233}
234
235impl std::fmt::Display for TrackingJob {
236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        match self {
238            TrackingJob::New(command) => write!(f, "{}", command.job_id),
239            TrackingJob::Recovered(recovered) => write!(f, "{}<recovered>", recovered.id),
240        }
241    }
242}
243
244impl TrackingJob {
245    /// Notify the metadata manager that the job is finished.
246    pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
247        match self {
248            TrackingJob::New(command) => {
249                metadata_manager
250                    .catalog_controller
251                    .finish_streaming_job(command.job_id.table_id as i32)
252                    .await?;
253                Ok(())
254            }
255            TrackingJob::Recovered(recovered) => {
256                metadata_manager
257                    .catalog_controller
258                    .finish_streaming_job(recovered.id)
259                    .await?;
260                Ok(())
261            }
262        }
263    }
264
265    pub(crate) fn table_to_create(&self) -> TableId {
266        match self {
267            TrackingJob::New(command) => command.job_id,
268            TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
269        }
270    }
271}
272
273impl std::fmt::Debug for TrackingJob {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        match self {
276            TrackingJob::New(command) => write!(f, "TrackingJob::New({:?})", command.job_id),
277            TrackingJob::Recovered(recovered) => {
278                write!(f, "TrackingJob::Recovered({:?})", recovered.id)
279            }
280        }
281    }
282}
283
284pub struct RecoveredTrackingJob {
285    pub id: ObjectId,
286}
287
288/// The command tracking by the [`CreateMviewProgressTracker`].
289pub(super) struct TrackingCommand {
290    pub job_id: TableId,
291}
292
293pub(super) enum UpdateProgressResult {
294    None,
295    Finished(TrackingJob),
296    BackfillNodeFinished(Vec<FragmentId>),
297}
298
299/// Tracking is done as follows:
300/// 1. We identify a `StreamJob` by its `TableId` of its `Materialized` table.
301/// 2. For each stream job, there are several actors which run its tasks.
302/// 3. With `progress_map` we can use the ID of the `StreamJob` to view its progress.
303/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
304#[derive(Default, Debug)]
305pub(super) struct CreateMviewProgressTracker {
306    /// Progress of the create-mview DDL indicated by the `TableId`.
307    progress_map: HashMap<TableId, (Progress, TrackingJob)>,
308
309    actor_map: HashMap<ActorId, TableId>,
310
311    /// Stash of finished jobs. They will be finally finished on checkpoint.
312    pending_finished_jobs: Vec<TrackingJob>,
313
314    /// Stash of pending backfill nodes. They will start backfilling on checkpoint.
315    pending_backfill_nodes: Vec<FragmentId>,
316}
317
318impl CreateMviewProgressTracker {
319    /// This step recovers state from the meta side:
320    /// 1. `Tables`.
321    /// 2. `TableFragments`.
322    ///
323    /// Other state are persisted by the `BackfillExecutor`, such as:
324    /// 1. `CreateMviewProgress`.
325    /// 2. `Backfill` position.
326    pub fn recover(
327        jobs: impl IntoIterator<Item = (TableId, (String, &StreamJobFragments, BackfillOrderState))>,
328        version_stats: &HummockVersionStats,
329    ) -> Self {
330        let mut actor_map = HashMap::new();
331        let mut progress_map = HashMap::new();
332        for (creating_table_id, (definition, table_fragments, backfill_order_state)) in jobs {
333            let mut states = HashMap::new();
334            let mut backfill_upstream_types = HashMap::new();
335            let actors = table_fragments.tracking_progress_actor_ids();
336            for (actor, backfill_upstream_type) in actors {
337                actor_map.insert(actor, creating_table_id);
338                states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
339                backfill_upstream_types.insert(actor, backfill_upstream_type);
340            }
341
342            let progress = Self::recover_progress(
343                states,
344                backfill_upstream_types,
345                table_fragments.upstream_table_counts(),
346                definition,
347                version_stats,
348                backfill_order_state,
349            );
350            let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob {
351                id: creating_table_id.table_id as i32,
352            });
353            progress_map.insert(creating_table_id, (progress, tracking_job));
354        }
355        Self {
356            progress_map,
357            actor_map,
358            pending_finished_jobs: Vec::new(),
359            pending_backfill_nodes: Vec::new(),
360        }
361    }
362
363    /// ## How recovery works
364    ///
365    /// The progress (number of rows consumed) is persisted in state tables.
366    /// During recovery, the backfill executor will restore the number of rows consumed,
367    /// and then it will just report progress like newly created executors.
368    fn recover_progress(
369        states: HashMap<ActorId, BackfillState>,
370        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
371        upstream_mv_count: HashMap<TableId, usize>,
372        definition: String,
373        version_stats: &HummockVersionStats,
374        backfill_order_state: BackfillOrderState,
375    ) -> Progress {
376        let upstream_mvs_total_key_count =
377            calculate_total_key_count(&upstream_mv_count, version_stats);
378        Progress {
379            states,
380            backfill_order_state,
381            backfill_upstream_types,
382            done_count: 0, // Fill only after first barrier pass
383            upstream_mv_count,
384            upstream_mvs_total_key_count,
385            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
386            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
387            definition,
388            create_type: CreateType::Background,
389        }
390    }
391
392    pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
393        self.progress_map
394            .iter()
395            .map(|(table_id, (x, _))| {
396                let table_id = table_id.table_id;
397                let ddl_progress = DdlProgress {
398                    id: table_id as u64,
399                    statement: x.definition.clone(),
400                    create_type: x.create_type.as_str().to_owned(),
401                    progress: x.calculate_progress(),
402                };
403                (table_id, ddl_progress)
404            })
405            .collect()
406    }
407
408    pub(super) fn update_tracking_jobs<'a>(
409        &mut self,
410        info: Option<&CreateStreamingJobCommandInfo>,
411        create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
412        version_stats: &HummockVersionStats,
413    ) {
414        {
415            {
416                // Save `finished_commands` for Create MVs.
417                let finished_commands = {
418                    let mut commands = vec![];
419                    // Add the command to tracker.
420                    if let Some(create_job_info) = info
421                        && let Some(command) = self.add(create_job_info, version_stats)
422                    {
423                        // Those with no actors to track can be finished immediately.
424                        commands.push(command);
425                    }
426                    // Update the progress of all commands.
427                    for progress in create_mview_progress {
428                        // Those with actors complete can be finished immediately.
429                        match self.update(progress, version_stats) {
430                            UpdateProgressResult::None => {
431                                tracing::trace!(?progress, "update progress");
432                            }
433                            UpdateProgressResult::Finished(command) => {
434                                tracing::trace!(?progress, "finish progress");
435                                commands.push(command);
436                            }
437                            UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes) => {
438                                tracing::trace!(
439                                    ?progress,
440                                    ?next_backfill_nodes,
441                                    "start next backfill node"
442                                );
443                                self.queue_backfill(next_backfill_nodes);
444                            }
445                        }
446                    }
447                    commands
448                };
449
450                for command in finished_commands {
451                    self.stash_command_to_finish(command);
452                }
453            }
454        }
455    }
456
457    /// Apply a collected epoch node command to the tracker
458    /// Return the finished jobs when the barrier kind is `Checkpoint`
459    pub(super) fn apply_collected_command(
460        &mut self,
461        command: Option<&Command>,
462        barrier_info: &BarrierInfo,
463        resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
464        version_stats: &HummockVersionStats,
465    ) -> Vec<TrackingJob> {
466        let new_tracking_job_info =
467            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
468                match job_type {
469                    CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
470                        Some(info)
471                    }
472                    CreateStreamingJobType::SnapshotBackfill(_) => {
473                        // The progress of SnapshotBackfill won't be tracked here
474                        None
475                    }
476                }
477            } else {
478                None
479            };
480        self.update_tracking_jobs(
481            new_tracking_job_info,
482            resps
483                .into_iter()
484                .flat_map(|resp| resp.create_mview_progress.iter()),
485            version_stats,
486        );
487        for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
488            // the cancelled command is possibly stashed in `finished_commands` and waiting
489            // for checkpoint, we should also clear it.
490            self.cancel_command(table_id);
491        }
492        if barrier_info.kind.is_checkpoint() {
493            self.take_finished_jobs()
494        } else {
495            vec![]
496        }
497    }
498
499    /// Stash a command to finish later.
500    pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
501        self.pending_finished_jobs.push(finished_job);
502    }
503
504    fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
505        self.pending_backfill_nodes.extend(backfill_nodes);
506    }
507
508    /// Finish stashed jobs on checkpoint.
509    pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
510        tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
511        take(&mut self.pending_finished_jobs)
512    }
513
514    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
515        take(&mut self.pending_backfill_nodes)
516    }
517
518    pub(super) fn has_pending_finished_jobs(&self) -> bool {
519        !self.pending_finished_jobs.is_empty()
520    }
521
522    pub(super) fn cancel_command(&mut self, id: TableId) {
523        let _ = self.progress_map.remove(&id);
524        self.pending_finished_jobs
525            .retain(|x| x.table_to_create() != id);
526        self.actor_map.retain(|_, table_id| *table_id != id);
527    }
528
529    /// Notify all tracked commands that error encountered and clear them.
530    pub fn abort_all(&mut self) {
531        self.actor_map.clear();
532        self.pending_finished_jobs.clear();
533        self.progress_map.clear();
534    }
535
536    /// Add a new create-mview DDL command to track.
537    ///
538    /// If the actors to track are empty, return the given command as it can be finished immediately.
539    pub fn add(
540        &mut self,
541        info: &CreateStreamingJobCommandInfo,
542        version_stats: &HummockVersionStats,
543    ) -> Option<TrackingJob> {
544        tracing::trace!(?info, "add job to track");
545        let (info, actors) = {
546            let CreateStreamingJobCommandInfo {
547                stream_job_fragments,
548                ..
549            } = info;
550            let actors = stream_job_fragments.tracking_progress_actor_ids();
551            if actors.is_empty() {
552                // The command can be finished immediately.
553                return Some(TrackingJob::New(TrackingCommand {
554                    job_id: info.stream_job_fragments.stream_job_id,
555                }));
556            }
557            (info.clone(), actors)
558        };
559
560        let CreateStreamingJobCommandInfo {
561            stream_job_fragments: table_fragments,
562            definition,
563            create_type,
564            fragment_backfill_ordering,
565            ..
566        } = info;
567
568        let creating_job_id = table_fragments.stream_job_id();
569        let upstream_mv_count = table_fragments.upstream_table_counts();
570        let upstream_total_key_count: u64 =
571            calculate_total_key_count(&upstream_mv_count, version_stats);
572
573        for (actor, _backfill_upstream_type) in &actors {
574            self.actor_map.insert(*actor, creating_job_id);
575        }
576
577        let backfill_order_state =
578            BackfillOrderState::new(fragment_backfill_ordering, &table_fragments);
579        let progress = Progress::new(
580            actors,
581            upstream_mv_count,
582            upstream_total_key_count,
583            definition.clone(),
584            create_type.into(),
585            backfill_order_state,
586        );
587        let old = self.progress_map.insert(
588            creating_job_id,
589            (
590                progress,
591                TrackingJob::New(TrackingCommand {
592                    job_id: creating_job_id,
593                }),
594            ),
595        );
596        assert!(old.is_none());
597        None
598    }
599
600    /// Update the progress of `actor` according to the Pb struct.
601    ///
602    /// If all actors in this MV have finished, return the command.
603    pub fn update(
604        &mut self,
605        progress: &CreateMviewProgress,
606        version_stats: &HummockVersionStats,
607    ) -> UpdateProgressResult {
608        tracing::trace!(?progress, "update progress");
609        let actor = progress.backfill_actor_id;
610        let Some(table_id) = self.actor_map.get(&actor).copied() else {
611            // On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
612            // even if backfill is finished on recovery.
613            // This is because we don't know if only this actor is finished,
614            // OR the entire stream job is finished.
615            // For the first case, we must notify meta.
616            // For the second case, we can still notify meta, but ignore it here.
617            tracing::info!(
618                "no tracked progress for actor {}, the stream job could already be finished",
619                actor
620            );
621            return UpdateProgressResult::None;
622        };
623
624        let new_state = if progress.done {
625            BackfillState::Done(progress.consumed_rows)
626        } else {
627            BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
628        };
629
630        match self.progress_map.entry(table_id) {
631            Entry::Occupied(mut o) => {
632                let progress = &mut o.get_mut().0;
633
634                let upstream_total_key_count: u64 =
635                    calculate_total_key_count(&progress.upstream_mv_count, version_stats);
636
637                tracing::debug!(?table_id, "updating progress for table");
638                let next_backfill_nodes =
639                    progress.update(actor, new_state, upstream_total_key_count);
640
641                if progress.is_done() {
642                    tracing::debug!(
643                        "all actors done for creating mview with table_id {}!",
644                        table_id
645                    );
646
647                    // Clean-up the mapping from actors to DDL table_id.
648                    for actor in o.get().0.actors() {
649                        self.actor_map.remove(&actor);
650                    }
651                    assert!(next_backfill_nodes.is_empty());
652                    UpdateProgressResult::Finished(o.remove().1)
653                } else if !next_backfill_nodes.is_empty() {
654                    tracing::debug!("scheduling next backfill nodes: {:?}", next_backfill_nodes);
655                    UpdateProgressResult::BackfillNodeFinished(next_backfill_nodes)
656                } else {
657                    UpdateProgressResult::None
658                }
659            }
660            Entry::Vacant(_) => {
661                tracing::warn!(
662                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
663                );
664                UpdateProgressResult::None
665            }
666        }
667    }
668}
669
670fn calculate_total_key_count(
671    table_count: &HashMap<TableId, usize>,
672    version_stats: &HummockVersionStats,
673) -> u64 {
674    table_count
675        .iter()
676        .map(|(table_id, count)| {
677            assert_ne!(*count, 0);
678            *count as u64
679                * version_stats
680                    .table_stats
681                    .get(&table_id.table_id)
682                    .map_or(0, |stat| stat.total_key_count as u64)
683        })
684        .sum()
685}