risingwave_meta/barrier/checkpoint/creating_job/
status.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18use std::time::Duration;
19
20use risingwave_common::hash::ActorId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::stream_service::barrier_complete_response::{
24    CreateMviewProgress, PbCreateMviewProgress,
25};
26use tracing::warn;
27
28use crate::barrier::progress::CreateMviewProgressTracker;
29use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
30
31#[derive(Debug)]
32pub(super) struct CreateMviewLogStoreProgressTracker {
33    /// `actor_id` -> `pending_epoch_lag`
34    ongoing_actors: HashMap<ActorId, u64>,
35    finished_actors: HashSet<ActorId>,
36}
37
38impl CreateMviewLogStoreProgressTracker {
39    fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
40        Self {
41            ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
42            finished_actors: HashSet::new(),
43        }
44    }
45
46    pub(super) fn gen_ddl_progress(&self) -> String {
47        let sum = self.ongoing_actors.values().sum::<u64>() as f64;
48        let count = if self.ongoing_actors.is_empty() {
49            1
50        } else {
51            self.ongoing_actors.len()
52        } as f64;
53        let avg = sum / count;
54        let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
55        format!(
56            "actor: {}/{}, avg lag {:?}",
57            self.finished_actors.len(),
58            self.ongoing_actors.len() + self.finished_actors.len(),
59            avg_lag_time
60        )
61    }
62
63    fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
64        for progress in progress {
65            match self.ongoing_actors.entry(progress.backfill_actor_id) {
66                Entry::Occupied(mut entry) => {
67                    if progress.done {
68                        entry.remove_entry();
69                        assert!(
70                            self.finished_actors.insert(progress.backfill_actor_id),
71                            "non-duplicate"
72                        );
73                    } else {
74                        *entry.get_mut() = progress.pending_epoch_lag as _;
75                    }
76                }
77                Entry::Vacant(_) => {
78                    if cfg!(debug_assertions) {
79                        panic!(
80                            "reporting progress on non-inflight actor: {:?} {:?}",
81                            progress, self
82                        );
83                    } else {
84                        warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
85                    }
86                }
87            }
88        }
89    }
90
91    pub(super) fn is_finished(&self) -> bool {
92        self.ongoing_actors.is_empty()
93    }
94}
95
96#[derive(Debug)]
97pub(super) enum CreatingStreamingJobStatus {
98    /// The creating job is consuming upstream snapshot.
99    /// Will transit to `ConsumingLogStore` on `update_progress` when
100    /// the snapshot has been fully consumed after `update_progress`.
101    ConsumingSnapshot {
102        prev_epoch_fake_physical_time: u64,
103        pending_upstream_barriers: Vec<BarrierInfo>,
104        version_stats: HummockVersionStats,
105        create_mview_tracker: CreateMviewProgressTracker,
106        snapshot_backfill_actors: HashSet<ActorId>,
107        backfill_epoch: u64,
108        /// The `prev_epoch` of pending non checkpoint barriers
109        pending_non_checkpoint_barriers: Vec<u64>,
110    },
111    /// The creating job is consuming log store.
112    ///
113    /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
114    ConsumingLogStore {
115        log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
116        barriers_to_inject: Option<Vec<BarrierInfo>>,
117    },
118    /// All backfill actors have started consuming upstream, and the job
119    /// will be finished when all previously injected barriers have been collected
120    /// Store the `prev_epoch` that will finish at.
121    Finishing(u64),
122}
123
124impl CreatingStreamingJobStatus {
125    pub(super) fn update_progress(
126        &mut self,
127        create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
128    ) {
129        match self {
130            &mut Self::ConsumingSnapshot {
131                ref mut create_mview_tracker,
132                ref version_stats,
133                ref mut prev_epoch_fake_physical_time,
134                ref mut pending_upstream_barriers,
135                ref mut pending_non_checkpoint_barriers,
136                ref backfill_epoch,
137                ref snapshot_backfill_actors,
138                ..
139            } => {
140                create_mview_tracker.update_tracking_jobs(
141                    None,
142                    create_mview_progress,
143                    version_stats,
144                );
145                if create_mview_tracker.has_pending_finished_jobs() {
146                    pending_non_checkpoint_barriers.push(*backfill_epoch);
147
148                    let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
149                    let barriers_to_inject: Vec<_> = [BarrierInfo {
150                        curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
151                        prev_epoch: TracedEpoch::new(prev_epoch),
152                        kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
153                    }]
154                    .into_iter()
155                    .chain(pending_upstream_barriers.drain(..))
156                    .collect();
157
158                    *self = CreatingStreamingJobStatus::ConsumingLogStore {
159                        log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
160                            snapshot_backfill_actors.iter().cloned(),
161                            barriers_to_inject
162                                .last()
163                                .map(|barrier_info| {
164                                    barrier_info.prev_epoch().saturating_sub(*backfill_epoch)
165                                })
166                                .unwrap_or(0),
167                        ),
168                        barriers_to_inject: Some(barriers_to_inject),
169                    };
170                }
171            }
172            CreatingStreamingJobStatus::ConsumingLogStore {
173                log_store_progress_tracker,
174                ..
175            } => {
176                log_store_progress_tracker.update(create_mview_progress);
177            }
178            CreatingStreamingJobStatus::Finishing(_) => {}
179        }
180    }
181
182    pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) {
183        match self {
184            CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
185                unreachable!(
186                    "should not start consuming upstream for a job that are consuming snapshot"
187                )
188            }
189            CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
190                let prev_epoch = barrier_info.prev_epoch();
191                {
192                    assert!(barrier_info.kind.is_checkpoint());
193                    *self = CreatingStreamingJobStatus::Finishing(prev_epoch);
194                }
195            }
196            CreatingStreamingJobStatus::Finishing { .. } => {
197                unreachable!("should not start consuming upstream for a job again")
198            }
199        }
200    }
201
202    pub(super) fn on_new_upstream_epoch(&mut self, barrier_info: &BarrierInfo) -> Vec<BarrierInfo> {
203        match self {
204            CreatingStreamingJobStatus::ConsumingSnapshot {
205                pending_upstream_barriers,
206                prev_epoch_fake_physical_time,
207                pending_non_checkpoint_barriers,
208                ..
209            } => {
210                pending_upstream_barriers.push(barrier_info.clone());
211                vec![CreatingStreamingJobStatus::new_fake_barrier(
212                    prev_epoch_fake_physical_time,
213                    pending_non_checkpoint_barriers,
214                    barrier_info.kind.is_checkpoint(),
215                )]
216            }
217            CreatingStreamingJobStatus::ConsumingLogStore {
218                barriers_to_inject, ..
219            } => barriers_to_inject
220                .take()
221                .into_iter()
222                .flatten()
223                .chain([barrier_info.clone()])
224                .collect(),
225            CreatingStreamingJobStatus::Finishing { .. } => {
226                vec![]
227            }
228        }
229    }
230
231    pub(super) fn new_fake_barrier(
232        prev_epoch_fake_physical_time: &mut u64,
233        pending_non_checkpoint_barriers: &mut Vec<u64>,
234        is_checkpoint: bool,
235    ) -> BarrierInfo {
236        {
237            {
238                let prev_epoch =
239                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
240                *prev_epoch_fake_physical_time += 1;
241                let curr_epoch =
242                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
243                pending_non_checkpoint_barriers.push(prev_epoch.value().0);
244                let kind = if is_checkpoint {
245                    BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
246                } else {
247                    BarrierKind::Barrier
248                };
249                BarrierInfo {
250                    prev_epoch,
251                    curr_epoch,
252                    kind,
253                }
254            }
255        }
256    }
257
258    pub(super) fn is_finishing(&self) -> bool {
259        matches!(self, Self::Finishing(_))
260    }
261}