risingwave_meta/barrier/checkpoint/creating_job/
status.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18use std::time::Duration;
19
20use risingwave_common::hash::ActorId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::stream_plan::barrier::PbBarrierKind;
24use risingwave_pb::stream_service::barrier_complete_response::{
25    CreateMviewProgress, PbCreateMviewProgress,
26};
27use tracing::warn;
28
29use crate::barrier::progress::CreateMviewProgressTracker;
30use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
31
32#[derive(Debug)]
33pub(super) struct CreateMviewLogStoreProgressTracker {
34    /// `actor_id` -> `pending_epoch_lag`
35    ongoing_actors: HashMap<ActorId, u64>,
36    finished_actors: HashSet<ActorId>,
37}
38
39impl CreateMviewLogStoreProgressTracker {
40    pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
41        Self {
42            ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
43            finished_actors: HashSet::new(),
44        }
45    }
46
47    pub(super) fn gen_ddl_progress(&self) -> String {
48        let sum = self.ongoing_actors.values().sum::<u64>() as f64;
49        let count = if self.ongoing_actors.is_empty() {
50            1
51        } else {
52            self.ongoing_actors.len()
53        } as f64;
54        let avg = sum / count;
55        let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
56        format!(
57            "actor: {}/{}, avg lag {:?}",
58            self.finished_actors.len(),
59            self.ongoing_actors.len() + self.finished_actors.len(),
60            avg_lag_time
61        )
62    }
63
64    fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
65        for progress in progress {
66            match self.ongoing_actors.entry(progress.backfill_actor_id) {
67                Entry::Occupied(mut entry) => {
68                    if progress.done {
69                        entry.remove_entry();
70                        assert!(
71                            self.finished_actors.insert(progress.backfill_actor_id),
72                            "non-duplicate"
73                        );
74                    } else {
75                        *entry.get_mut() = progress.pending_epoch_lag as _;
76                    }
77                }
78                Entry::Vacant(_) => {
79                    if cfg!(debug_assertions) {
80                        panic!(
81                            "reporting progress on non-inflight actor: {:?} {:?}",
82                            progress, self
83                        );
84                    } else {
85                        warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
86                    }
87                }
88            }
89        }
90    }
91
92    pub(super) fn is_finished(&self) -> bool {
93        self.ongoing_actors.is_empty()
94    }
95}
96
97#[derive(Debug)]
98pub(super) enum CreatingStreamingJobStatus {
99    /// The creating job is consuming upstream snapshot.
100    /// Will transit to `ConsumingLogStore` on `update_progress` when
101    /// the snapshot has been fully consumed after `update_progress`.
102    ConsumingSnapshot {
103        prev_epoch_fake_physical_time: u64,
104        pending_upstream_barriers: Vec<BarrierInfo>,
105        version_stats: HummockVersionStats,
106        create_mview_tracker: CreateMviewProgressTracker,
107        snapshot_backfill_actors: HashSet<ActorId>,
108        backfill_epoch: u64,
109        /// The `prev_epoch` of pending non checkpoint barriers
110        pending_non_checkpoint_barriers: Vec<u64>,
111    },
112    /// The creating job is consuming log store.
113    ///
114    /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
115    ConsumingLogStore {
116        log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
117        barriers_to_inject: Option<Vec<BarrierInfo>>,
118    },
119    /// All backfill actors have started consuming upstream, and the job
120    /// will be finished when all previously injected barriers have been collected
121    /// Store the `prev_epoch` that will finish at.
122    Finishing(u64),
123}
124
125impl CreatingStreamingJobStatus {
126    pub(super) fn update_progress(
127        &mut self,
128        create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
129    ) {
130        match self {
131            &mut Self::ConsumingSnapshot {
132                ref mut create_mview_tracker,
133                ref version_stats,
134                ref mut prev_epoch_fake_physical_time,
135                ref mut pending_upstream_barriers,
136                ref mut pending_non_checkpoint_barriers,
137                ref backfill_epoch,
138                ref snapshot_backfill_actors,
139                ..
140            } => {
141                create_mview_tracker.update_tracking_jobs(
142                    None,
143                    create_mview_progress,
144                    version_stats,
145                );
146                if create_mview_tracker.has_pending_finished_jobs() {
147                    pending_non_checkpoint_barriers.push(*backfill_epoch);
148
149                    let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
150                    let barriers_to_inject: Vec<_> = [BarrierInfo {
151                        curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
152                        prev_epoch: TracedEpoch::new(prev_epoch),
153                        kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
154                    }]
155                    .into_iter()
156                    .chain(pending_upstream_barriers.drain(..))
157                    .collect();
158
159                    *self = CreatingStreamingJobStatus::ConsumingLogStore {
160                        log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
161                            snapshot_backfill_actors.iter().cloned(),
162                            barriers_to_inject
163                                .last()
164                                .map(|barrier_info| {
165                                    barrier_info.prev_epoch().saturating_sub(*backfill_epoch)
166                                })
167                                .unwrap_or(0),
168                        ),
169                        barriers_to_inject: Some(barriers_to_inject),
170                    };
171                }
172            }
173            CreatingStreamingJobStatus::ConsumingLogStore {
174                log_store_progress_tracker,
175                ..
176            } => {
177                log_store_progress_tracker.update(create_mview_progress);
178            }
179            CreatingStreamingJobStatus::Finishing(_) => {}
180        }
181    }
182
183    pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) {
184        match self {
185            CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
186                unreachable!(
187                    "should not start consuming upstream for a job that are consuming snapshot"
188                )
189            }
190            CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
191                let prev_epoch = barrier_info.prev_epoch();
192                {
193                    assert!(barrier_info.kind.is_checkpoint());
194                    *self = CreatingStreamingJobStatus::Finishing(prev_epoch);
195                }
196            }
197            CreatingStreamingJobStatus::Finishing { .. } => {
198                unreachable!("should not start consuming upstream for a job again")
199            }
200        }
201    }
202
203    pub(super) fn on_new_upstream_epoch(&mut self, barrier_info: &BarrierInfo) -> Vec<BarrierInfo> {
204        match self {
205            CreatingStreamingJobStatus::ConsumingSnapshot {
206                pending_upstream_barriers,
207                prev_epoch_fake_physical_time,
208                pending_non_checkpoint_barriers,
209                ..
210            } => {
211                pending_upstream_barriers.push(barrier_info.clone());
212                vec![CreatingStreamingJobStatus::new_fake_barrier(
213                    prev_epoch_fake_physical_time,
214                    pending_non_checkpoint_barriers,
215                    match barrier_info.kind {
216                        BarrierKind::Barrier => PbBarrierKind::Barrier,
217                        BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
218                        BarrierKind::Initial => {
219                            unreachable!("upstream new epoch should not be initial")
220                        }
221                    },
222                )]
223            }
224            CreatingStreamingJobStatus::ConsumingLogStore {
225                barriers_to_inject, ..
226            } => barriers_to_inject
227                .take()
228                .into_iter()
229                .flatten()
230                .chain([barrier_info.clone()])
231                .collect(),
232            CreatingStreamingJobStatus::Finishing { .. } => {
233                vec![]
234            }
235        }
236    }
237
238    pub(super) fn new_fake_barrier(
239        prev_epoch_fake_physical_time: &mut u64,
240        pending_non_checkpoint_barriers: &mut Vec<u64>,
241        kind: PbBarrierKind,
242    ) -> BarrierInfo {
243        {
244            {
245                let prev_epoch =
246                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
247                *prev_epoch_fake_physical_time += 1;
248                let curr_epoch =
249                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
250                pending_non_checkpoint_barriers.push(prev_epoch.value().0);
251                let kind = match kind {
252                    PbBarrierKind::Unspecified => {
253                        unreachable!()
254                    }
255                    PbBarrierKind::Initial => {
256                        pending_non_checkpoint_barriers.clear();
257                        BarrierKind::Initial
258                    }
259                    PbBarrierKind::Barrier => BarrierKind::Barrier,
260                    PbBarrierKind::Checkpoint => {
261                        BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
262                    }
263                };
264                BarrierInfo {
265                    prev_epoch,
266                    curr_epoch,
267                    kind,
268                }
269            }
270        }
271    }
272
273    pub(super) fn is_finishing(&self) -> bool {
274        matches!(self, Self::Finishing(_))
275    }
276}