risingwave_meta/barrier/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::ObjectId;
22use risingwave_pb::catalog::CreateType;
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::PbBarrierCompleteResponse;
26use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
27
28use crate::MetaResult;
29use crate::barrier::info::BarrierInfo;
30use crate::barrier::{
31    Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
32};
33use crate::manager::{MetadataManager, StreamingJobType};
34use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments};
35
36type ConsumedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40    Init,
41    ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
42    Done(ConsumedRows),
43}
44
45/// Progress of all actors containing backfill executors while creating mview.
46#[derive(Debug)]
47pub(super) struct Progress {
48    // `states` and `done_count` decides whether the progress is done. See `is_done`.
49    states: HashMap<ActorId, BackfillState>,
50    done_count: usize,
51
52    /// Tells whether the backfill is from source or mv.
53    backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
54
55    // The following row counts are used to calculate the progress. See `calculate_progress`.
56    /// Upstream mv count.
57    /// Keep track of how many times each upstream MV
58    /// appears in this stream job.
59    upstream_mv_count: HashMap<TableId, usize>,
60    /// Total key count of all the upstream materialized views
61    upstream_mvs_total_key_count: u64,
62    mv_backfill_consumed_rows: u64,
63    source_backfill_consumed_rows: u64,
64
65    /// DDL definition
66    definition: String,
67}
68
69impl Progress {
70    /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
71    fn new(
72        actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
73        upstream_mv_count: HashMap<TableId, usize>,
74        upstream_total_key_count: u64,
75        definition: String,
76    ) -> Self {
77        let mut states = HashMap::new();
78        let mut backfill_upstream_types = HashMap::new();
79        for (actor, backfill_upstream_type) in actors {
80            states.insert(actor, BackfillState::Init);
81            backfill_upstream_types.insert(actor, backfill_upstream_type);
82        }
83        assert!(!states.is_empty());
84
85        Self {
86            states,
87            backfill_upstream_types,
88            done_count: 0,
89            upstream_mv_count,
90            upstream_mvs_total_key_count: upstream_total_key_count,
91            mv_backfill_consumed_rows: 0,
92            source_backfill_consumed_rows: 0,
93            definition,
94        }
95    }
96
97    /// Update the progress of `actor`.
98    fn update(&mut self, actor: ActorId, new_state: BackfillState, upstream_total_key_count: u64) {
99        self.upstream_mvs_total_key_count = upstream_total_key_count;
100        let total_actors = self.states.len();
101        let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
102        tracing::debug!(?actor, states = ?self.states, "update progress for actor");
103
104        let mut old = 0;
105        let mut new = 0;
106        match self.states.remove(&actor).unwrap() {
107            BackfillState::Init => {}
108            BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
109                old = old_consumed_rows;
110            }
111            BackfillState::Done(_) => panic!("should not report done multiple times"),
112        };
113        match &new_state {
114            BackfillState::Init => {}
115            BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
116                new = *new_consumed_rows;
117            }
118            BackfillState::Done(new_consumed_rows) => {
119                tracing::debug!("actor {} done", actor);
120                new = *new_consumed_rows;
121                self.done_count += 1;
122                tracing::debug!(
123                    "{} actors out of {} complete",
124                    self.done_count,
125                    total_actors,
126                );
127            }
128        };
129        debug_assert!(new >= old, "backfill progress should not go backward");
130        match backfill_upstream_type {
131            BackfillUpstreamType::MView => {
132                self.mv_backfill_consumed_rows += new - old;
133            }
134            BackfillUpstreamType::Source => {
135                self.source_backfill_consumed_rows += new - old;
136            }
137            BackfillUpstreamType::Values => {
138                // do not consider progress for values
139            }
140        }
141        self.states.insert(actor, new_state);
142    }
143
144    /// Returns whether all backfill executors are done.
145    fn is_done(&self) -> bool {
146        tracing::trace!(
147            "Progress::is_done? {}, {}, {:?}",
148            self.done_count,
149            self.states.len(),
150            self.states
151        );
152        self.done_count == self.states.len()
153    }
154
155    /// Returns the ids of all actors containing the backfill executors for the mview tracked by this
156    /// [`Progress`].
157    fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
158        self.states.keys().cloned()
159    }
160
161    /// `progress` = `consumed_rows` / `upstream_total_key_count`
162    fn calculate_progress(&self) -> String {
163        if self.is_done() || self.states.is_empty() {
164            return "100%".to_owned();
165        }
166        let mut mv_count = 0;
167        let mut source_count = 0;
168        for backfill_upstream_type in self.backfill_upstream_types.values() {
169            match backfill_upstream_type {
170                BackfillUpstreamType::MView => mv_count += 1,
171                BackfillUpstreamType::Source => source_count += 1,
172                BackfillUpstreamType::Values => (),
173            }
174        }
175
176        let mv_progress = (mv_count > 0).then_some({
177            if self.upstream_mvs_total_key_count == 0 {
178                "99.99%".to_owned()
179            } else {
180                let mut progress = self.mv_backfill_consumed_rows as f64
181                    / (self.upstream_mvs_total_key_count as f64);
182                if progress > 1.0 {
183                    progress = 0.9999;
184                }
185                format!(
186                    "{:.2}% ({}/{})",
187                    progress * 100.0,
188                    self.mv_backfill_consumed_rows,
189                    self.upstream_mvs_total_key_count
190                )
191            }
192        });
193        let source_progress = (source_count > 0).then_some(format!(
194            "{} rows consumed",
195            self.source_backfill_consumed_rows
196        ));
197        match (mv_progress, source_progress) {
198            (Some(mv_progress), Some(source_progress)) => {
199                format!(
200                    "MView Backfill: {}, Source Backfill: {}",
201                    mv_progress, source_progress
202                )
203            }
204            (Some(mv_progress), None) => mv_progress,
205            (None, Some(source_progress)) => source_progress,
206            (None, None) => "Unknown".to_owned(),
207        }
208    }
209}
210
211/// There are 2 kinds of `TrackingJobs`:
212/// 1. `New`. This refers to the "New" type of tracking job.
213///    It is instantiated and managed by the stream manager.
214///    On recovery, the stream manager will stop managing the job.
215/// 2. `Recovered`. This refers to the "Recovered" type of tracking job.
216///    On recovery, the barrier manager will recover and start managing the job.
217pub enum TrackingJob {
218    New(TrackingCommand),
219    Recovered(RecoveredTrackingJob),
220}
221
222impl TrackingJob {
223    /// Notify metadata manager that the job is finished.
224    pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
225        match self {
226            TrackingJob::New(command) => {
227                metadata_manager
228                    .catalog_controller
229                    .finish_streaming_job(
230                        command.job_id.table_id as i32,
231                        command.replace_stream_job.clone(),
232                    )
233                    .await?;
234                Ok(())
235            }
236            TrackingJob::Recovered(recovered) => {
237                metadata_manager
238                    .catalog_controller
239                    .finish_streaming_job(recovered.id, None)
240                    .await?;
241                Ok(())
242            }
243        }
244    }
245
246    pub(crate) fn table_to_create(&self) -> TableId {
247        match self {
248            TrackingJob::New(command) => command.job_id,
249            TrackingJob::Recovered(recovered) => (recovered.id as u32).into(),
250        }
251    }
252}
253
254impl std::fmt::Debug for TrackingJob {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        match self {
257            TrackingJob::New(command) => write!(f, "TrackingJob::New({:?})", command.job_id),
258            TrackingJob::Recovered(recovered) => {
259                write!(f, "TrackingJob::RecoveredV2({:?})", recovered.id)
260            }
261        }
262    }
263}
264
265pub struct RecoveredTrackingJob {
266    pub id: ObjectId,
267}
268
269/// The command tracking by the [`CreateMviewProgressTracker`].
270pub(super) struct TrackingCommand {
271    pub job_id: TableId,
272    pub replace_stream_job: Option<ReplaceStreamJobPlan>,
273}
274
275/// Tracking is done as follows:
276/// 1. We identify a `StreamJob` by its `TableId` of its `Materialized` table.
277/// 2. For each stream job, there are several actors which run its tasks.
278/// 3. With `progress_map` we can use the ID of the `StreamJob` to view its progress.
279/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
280#[derive(Default, Debug)]
281pub(super) struct CreateMviewProgressTracker {
282    /// Progress of the create-mview DDL indicated by the `TableId`.
283    progress_map: HashMap<TableId, (Progress, TrackingJob)>,
284
285    actor_map: HashMap<ActorId, TableId>,
286
287    /// Stash of finished jobs. They will be finally finished on checkpoint.
288    pending_finished_jobs: Vec<TrackingJob>,
289}
290
291impl CreateMviewProgressTracker {
292    /// This step recovers state from the meta side:
293    /// 1. `Tables`.
294    /// 2. `TableFragments`.
295    ///
296    /// Other state are persisted by the `BackfillExecutor`, such as:
297    /// 1. `CreateMviewProgress`.
298    /// 2. `Backfill` position.
299    pub fn recover(
300        mviews: impl IntoIterator<Item = (TableId, (String, &StreamJobFragments))>,
301        version_stats: &HummockVersionStats,
302    ) -> Self {
303        let mut actor_map = HashMap::new();
304        let mut progress_map = HashMap::new();
305        for (creating_table_id, (definition, table_fragments)) in mviews {
306            let mut states = HashMap::new();
307            let mut backfill_upstream_types = HashMap::new();
308            let actors = table_fragments.tracking_progress_actor_ids();
309            for (actor, backfill_upstream_type) in actors {
310                actor_map.insert(actor, creating_table_id);
311                states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
312                backfill_upstream_types.insert(actor, backfill_upstream_type);
313            }
314
315            let progress = Self::recover_progress(
316                states,
317                backfill_upstream_types,
318                table_fragments.upstream_table_counts(),
319                definition,
320                version_stats,
321            );
322            let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob {
323                id: creating_table_id.table_id as i32,
324            });
325            progress_map.insert(creating_table_id, (progress, tracking_job));
326        }
327        Self {
328            progress_map,
329            actor_map,
330            pending_finished_jobs: Vec::new(),
331        }
332    }
333
334    /// ## How recovery works
335    ///
336    /// The progress (number of rows consumed) is persisted in state tables.
337    /// During recovery, the backfill executor will restore the number of rows consumed,
338    /// and then it will just report progress like newly created executors.
339    fn recover_progress(
340        states: HashMap<ActorId, BackfillState>,
341        backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
342        upstream_mv_count: HashMap<TableId, usize>,
343        definition: String,
344        version_stats: &HummockVersionStats,
345    ) -> Progress {
346        let upstream_mvs_total_key_count =
347            calculate_total_key_count(&upstream_mv_count, version_stats);
348        Progress {
349            states,
350            backfill_upstream_types,
351            done_count: 0, // Fill only after first barrier pass
352            upstream_mv_count,
353            upstream_mvs_total_key_count,
354            mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
355            source_backfill_consumed_rows: 0, // Fill only after first barrier pass
356            definition,
357        }
358    }
359
360    pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
361        self.progress_map
362            .iter()
363            .map(|(table_id, (x, _))| {
364                let table_id = table_id.table_id;
365                let ddl_progress = DdlProgress {
366                    id: table_id as u64,
367                    statement: x.definition.clone(),
368                    progress: x.calculate_progress(),
369                };
370                (table_id, ddl_progress)
371            })
372            .collect()
373    }
374
375    pub(super) fn update_tracking_jobs<'a>(
376        &mut self,
377        info: Option<(
378            &CreateStreamingJobCommandInfo,
379            Option<&ReplaceStreamJobPlan>,
380        )>,
381        create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
382        version_stats: &HummockVersionStats,
383    ) {
384        {
385            {
386                // Save `finished_commands` for Create MVs.
387                let finished_commands = {
388                    let mut commands = vec![];
389                    // Add the command to tracker.
390                    if let Some((create_job_info, replace_stream_job)) = info
391                        && let Some(command) =
392                            self.add(create_job_info, replace_stream_job, version_stats)
393                    {
394                        // Those with no actors to track can be finished immediately.
395                        commands.push(command);
396                    }
397                    // Update the progress of all commands.
398                    for progress in create_mview_progress {
399                        // Those with actors complete can be finished immediately.
400                        match self.update(progress, version_stats) {
401                            Some(command) => {
402                                tracing::trace!(?progress, "finish progress");
403                                commands.push(command);
404                            }
405                            _ => {
406                                tracing::trace!(?progress, "update progress");
407                            }
408                        }
409                    }
410                    commands
411                };
412
413                for command in finished_commands {
414                    self.stash_command_to_finish(command);
415                }
416            }
417        }
418    }
419
420    /// Apply a collected epoch node command to the tracker
421    /// Return the finished jobs when the barrier kind is `Checkpoint`
422    pub(super) fn apply_collected_command(
423        &mut self,
424        command: Option<&Command>,
425        barrier_info: &BarrierInfo,
426        resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
427        version_stats: &HummockVersionStats,
428    ) -> Vec<TrackingJob> {
429        let new_tracking_job_info =
430            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
431                match job_type {
432                    CreateStreamingJobType::Normal => Some((info, None)),
433                    CreateStreamingJobType::SinkIntoTable(replace_stream_job) => {
434                        Some((info, Some(replace_stream_job)))
435                    }
436                    CreateStreamingJobType::SnapshotBackfill(_) => {
437                        // The progress of SnapshotBackfill won't be tracked here
438                        None
439                    }
440                }
441            } else {
442                None
443            };
444        self.update_tracking_jobs(
445            new_tracking_job_info,
446            resps
447                .into_iter()
448                .flat_map(|resp| resp.create_mview_progress.iter()),
449            version_stats,
450        );
451        for table_id in command.map(Command::tables_to_drop).into_iter().flatten() {
452            // the cancelled command is possibly stashed in `finished_commands` and waiting
453            // for checkpoint, we should also clear it.
454            self.cancel_command(table_id);
455        }
456        if barrier_info.kind.is_checkpoint() {
457            self.take_finished_jobs()
458        } else {
459            vec![]
460        }
461    }
462
463    /// Stash a command to finish later.
464    pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
465        self.pending_finished_jobs.push(finished_job);
466    }
467
468    /// Finish stashed jobs on checkpoint.
469    pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
470        tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
471        take(&mut self.pending_finished_jobs)
472    }
473
474    pub(super) fn has_pending_finished_jobs(&self) -> bool {
475        !self.pending_finished_jobs.is_empty()
476    }
477
478    pub(super) fn cancel_command(&mut self, id: TableId) {
479        let _ = self.progress_map.remove(&id);
480        self.pending_finished_jobs
481            .retain(|x| x.table_to_create() != id);
482        self.actor_map.retain(|_, table_id| *table_id != id);
483    }
484
485    /// Notify all tracked commands that error encountered and clear them.
486    pub fn abort_all(&mut self) {
487        self.actor_map.clear();
488        self.pending_finished_jobs.clear();
489        self.progress_map.clear();
490    }
491
492    /// Add a new create-mview DDL command to track.
493    ///
494    /// If the actors to track is empty, return the given command as it can be finished immediately.
495    pub fn add(
496        &mut self,
497        info: &CreateStreamingJobCommandInfo,
498        replace_stream_job: Option<&ReplaceStreamJobPlan>,
499        version_stats: &HummockVersionStats,
500    ) -> Option<TrackingJob> {
501        tracing::trace!(?info, "add job to track");
502        let (info, actors, replace_table_info) = {
503            let CreateStreamingJobCommandInfo {
504                stream_job_fragments,
505                ..
506            } = info;
507            let actors = stream_job_fragments.tracking_progress_actor_ids();
508            if actors.is_empty() {
509                // The command can be finished immediately.
510                return Some(TrackingJob::New(TrackingCommand {
511                    job_id: info.stream_job_fragments.stream_job_id,
512                    replace_stream_job: replace_stream_job.cloned(),
513                }));
514            }
515            (info.clone(), actors, replace_stream_job.cloned())
516        };
517
518        let CreateStreamingJobCommandInfo {
519            stream_job_fragments: table_fragments,
520            definition,
521            job_type,
522            create_type,
523            ..
524        } = &info;
525
526        let creating_mv_id = table_fragments.stream_job_id();
527        let upstream_mv_count = table_fragments.upstream_table_counts();
528        let upstream_total_key_count: u64 =
529            calculate_total_key_count(&upstream_mv_count, version_stats);
530
531        for (actor, _backfill_upstream_type) in &actors {
532            self.actor_map.insert(*actor, creating_mv_id);
533        }
534
535        let progress = Progress::new(
536            actors,
537            upstream_mv_count,
538            upstream_total_key_count,
539            definition.clone(),
540        );
541        if *job_type == StreamingJobType::Sink && *create_type == CreateType::Background {
542            // We return the original tracking job immediately.
543            // This is because sink can be decoupled with backfill progress.
544            // We don't need to wait for sink to finish backfill.
545            // This still contains the notifiers, so we can tell listeners
546            // that the sink job has been created.
547            Some(TrackingJob::New(TrackingCommand {
548                job_id: creating_mv_id,
549                replace_stream_job: replace_table_info,
550            }))
551        } else {
552            let old = self.progress_map.insert(
553                creating_mv_id,
554                (
555                    progress,
556                    TrackingJob::New(TrackingCommand {
557                        job_id: creating_mv_id,
558                        replace_stream_job: replace_table_info,
559                    }),
560                ),
561            );
562            assert!(old.is_none());
563            None
564        }
565    }
566
567    /// Update the progress of `actor` according to the Pb struct.
568    ///
569    /// If all actors in this MV have finished, returns the command.
570    pub fn update(
571        &mut self,
572        progress: &CreateMviewProgress,
573        version_stats: &HummockVersionStats,
574    ) -> Option<TrackingJob> {
575        tracing::trace!(?progress, "update progress");
576        let actor = progress.backfill_actor_id;
577        let Some(table_id) = self.actor_map.get(&actor).copied() else {
578            // On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
579            // even if backfill is finished on recovery.
580            // This is because we don't know if only this actor is finished,
581            // OR the entire stream job is finished.
582            // For the first case, we must notify meta.
583            // For the second case, we can still notify meta, but ignore it here.
584            tracing::info!(
585                "no tracked progress for actor {}, the stream job could already be finished",
586                actor
587            );
588            return None;
589        };
590
591        let new_state = if progress.done {
592            BackfillState::Done(progress.consumed_rows)
593        } else {
594            BackfillState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows)
595        };
596
597        match self.progress_map.entry(table_id) {
598            Entry::Occupied(mut o) => {
599                let progress = &mut o.get_mut().0;
600
601                let upstream_total_key_count: u64 =
602                    calculate_total_key_count(&progress.upstream_mv_count, version_stats);
603
604                tracing::debug!(?table_id, "updating progress for table");
605                progress.update(actor, new_state, upstream_total_key_count);
606
607                if progress.is_done() {
608                    tracing::debug!(
609                        "all actors done for creating mview with table_id {}!",
610                        table_id
611                    );
612
613                    // Clean-up the mapping from actors to DDL table_id.
614                    for actor in o.get().0.actors() {
615                        self.actor_map.remove(&actor);
616                    }
617                    Some(o.remove().1)
618                } else {
619                    None
620                }
621            }
622            Entry::Vacant(_) => {
623                tracing::warn!(
624                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
625                );
626                None
627            }
628        }
629    }
630}
631
632fn calculate_total_key_count(
633    table_count: &HashMap<TableId, usize>,
634    version_stats: &HummockVersionStats,
635) -> u64 {
636    table_count
637        .iter()
638        .map(|(table_id, count)| {
639            assert_ne!(*count, 0);
640            *count as u64
641                * version_stats
642                    .table_stats
643                    .get(&table_id.table_id)
644                    .map_or(0, |stat| stat.total_key_count as u64)
645        })
646        .sum()
647}