risingwave_meta/barrier/checkpoint/creating_job/
status.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18use std::time::Duration;
19
20use risingwave_common::hash::ActorId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
24use risingwave_pb::stream_plan::barrier::PbBarrierKind;
25use risingwave_pb::stream_plan::barrier_mutation::Mutation;
26use risingwave_pb::stream_service::barrier_complete_response::{
27    CreateMviewProgress, PbCreateMviewProgress,
28};
29use tracing::warn;
30
31use crate::barrier::progress::CreateMviewProgressTracker;
32use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
33
34#[derive(Debug)]
35pub(super) struct CreateMviewLogStoreProgressTracker {
36    /// `actor_id` -> `pending_epoch_lag`
37    ongoing_actors: HashMap<ActorId, u64>,
38    finished_actors: HashSet<ActorId>,
39}
40
41impl CreateMviewLogStoreProgressTracker {
42    pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
43        Self {
44            ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
45            finished_actors: HashSet::new(),
46        }
47    }
48
49    pub(super) fn gen_ddl_progress(&self) -> String {
50        let sum = self.ongoing_actors.values().sum::<u64>() as f64;
51        let count = if self.ongoing_actors.is_empty() {
52            1
53        } else {
54            self.ongoing_actors.len()
55        } as f64;
56        let avg = sum / count;
57        let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
58        format!(
59            "actor: {}/{}, avg lag {:?}",
60            self.finished_actors.len(),
61            self.ongoing_actors.len() + self.finished_actors.len(),
62            avg_lag_time
63        )
64    }
65
66    fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
67        for progress in progress {
68            match self.ongoing_actors.entry(progress.backfill_actor_id) {
69                Entry::Occupied(mut entry) => {
70                    if progress.done {
71                        entry.remove_entry();
72                        assert!(
73                            self.finished_actors.insert(progress.backfill_actor_id),
74                            "non-duplicate"
75                        );
76                    } else {
77                        *entry.get_mut() = progress.pending_epoch_lag as _;
78                    }
79                }
80                Entry::Vacant(_) => {
81                    if cfg!(debug_assertions) {
82                        panic!(
83                            "reporting progress on non-inflight actor: {:?} {:?}",
84                            progress, self
85                        );
86                    } else {
87                        warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
88                    }
89                }
90            }
91        }
92    }
93
94    pub(super) fn is_finished(&self) -> bool {
95        self.ongoing_actors.is_empty()
96    }
97}
98
99#[derive(Debug)]
100pub(super) enum CreatingStreamingJobStatus {
101    /// The creating job is consuming upstream snapshot.
102    /// Will transit to `ConsumingLogStore` on `update_progress` when
103    /// the snapshot has been fully consumed after `update_progress`.
104    ConsumingSnapshot {
105        prev_epoch_fake_physical_time: u64,
106        pending_upstream_barriers: Vec<BarrierInfo>,
107        version_stats: HummockVersionStats,
108        create_mview_tracker: CreateMviewProgressTracker,
109        snapshot_backfill_actors: HashSet<ActorId>,
110        backfill_epoch: u64,
111        /// The `prev_epoch` of pending non checkpoint barriers
112        pending_non_checkpoint_barriers: Vec<u64>,
113    },
114    /// The creating job is consuming log store.
115    ///
116    /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
117    ConsumingLogStore {
118        log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
119        barriers_to_inject: Option<Vec<BarrierInfo>>,
120    },
121    /// All backfill actors have started consuming upstream, and the job
122    /// will be finished when all previously injected barriers have been collected
123    /// Store the `prev_epoch` that will finish at.
124    Finishing(u64),
125}
126
127impl CreatingStreamingJobStatus {
128    pub(super) fn update_progress(
129        &mut self,
130        create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
131    ) {
132        match self {
133            &mut Self::ConsumingSnapshot {
134                ref mut create_mview_tracker,
135                ref version_stats,
136                ref mut prev_epoch_fake_physical_time,
137                ref mut pending_upstream_barriers,
138                ref mut pending_non_checkpoint_barriers,
139                ref backfill_epoch,
140                ref snapshot_backfill_actors,
141                ..
142            } => {
143                create_mview_tracker.update_tracking_jobs(
144                    None,
145                    create_mview_progress,
146                    version_stats,
147                );
148                if create_mview_tracker.has_pending_finished_jobs() {
149                    pending_non_checkpoint_barriers.push(*backfill_epoch);
150
151                    let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
152                    let barriers_to_inject: Vec<_> = [BarrierInfo {
153                        curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
154                        prev_epoch: TracedEpoch::new(prev_epoch),
155                        kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
156                    }]
157                    .into_iter()
158                    .chain(pending_upstream_barriers.drain(..))
159                    .collect();
160
161                    *self = CreatingStreamingJobStatus::ConsumingLogStore {
162                        log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
163                            snapshot_backfill_actors.iter().cloned(),
164                            barriers_to_inject
165                                .last()
166                                .map(|barrier_info| {
167                                    barrier_info.prev_epoch().saturating_sub(*backfill_epoch)
168                                })
169                                .unwrap_or(0),
170                        ),
171                        barriers_to_inject: Some(barriers_to_inject),
172                    };
173                }
174            }
175            CreatingStreamingJobStatus::ConsumingLogStore {
176                log_store_progress_tracker,
177                ..
178            } => {
179                log_store_progress_tracker.update(create_mview_progress);
180            }
181            CreatingStreamingJobStatus::Finishing(_) => {}
182        }
183    }
184
185    pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) {
186        match self {
187            CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
188                unreachable!(
189                    "should not start consuming upstream for a job that are consuming snapshot"
190                )
191            }
192            CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
193                let prev_epoch = barrier_info.prev_epoch();
194                {
195                    assert!(barrier_info.kind.is_checkpoint());
196                    *self = CreatingStreamingJobStatus::Finishing(prev_epoch);
197                }
198            }
199            CreatingStreamingJobStatus::Finishing { .. } => {
200                unreachable!("should not start consuming upstream for a job again")
201            }
202        }
203    }
204
205    pub(super) fn on_new_upstream_epoch(
206        &mut self,
207        barrier_info: &BarrierInfo,
208    ) -> Vec<(BarrierInfo, Option<Mutation>)> {
209        match self {
210            CreatingStreamingJobStatus::ConsumingSnapshot {
211                pending_upstream_barriers,
212                prev_epoch_fake_physical_time,
213                pending_non_checkpoint_barriers,
214                create_mview_tracker,
215                ..
216            } => {
217                let mutation = {
218                    let pending_backfill_nodes = create_mview_tracker.take_pending_backfill_nodes();
219                    if pending_backfill_nodes.is_empty() {
220                        None
221                    } else {
222                        Some(Mutation::StartFragmentBackfill(
223                            StartFragmentBackfillMutation {
224                                fragment_ids: pending_backfill_nodes
225                                    .into_iter()
226                                    .map(|fragment_id| fragment_id as _)
227                                    .collect(),
228                            },
229                        ))
230                    }
231                };
232                pending_upstream_barriers.push(barrier_info.clone());
233                vec![(
234                    CreatingStreamingJobStatus::new_fake_barrier(
235                        prev_epoch_fake_physical_time,
236                        pending_non_checkpoint_barriers,
237                        match barrier_info.kind {
238                            BarrierKind::Barrier => PbBarrierKind::Barrier,
239                            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
240                            BarrierKind::Initial => {
241                                unreachable!("upstream new epoch should not be initial")
242                            }
243                        },
244                    ),
245                    mutation,
246                )]
247            }
248            CreatingStreamingJobStatus::ConsumingLogStore {
249                barriers_to_inject, ..
250            } => barriers_to_inject
251                .take()
252                .into_iter()
253                .flatten()
254                .chain([barrier_info.clone()])
255                .map(|barrier_info| (barrier_info, None))
256                .collect(),
257            CreatingStreamingJobStatus::Finishing { .. } => {
258                vec![]
259            }
260        }
261    }
262
263    pub(super) fn new_fake_barrier(
264        prev_epoch_fake_physical_time: &mut u64,
265        pending_non_checkpoint_barriers: &mut Vec<u64>,
266        kind: PbBarrierKind,
267    ) -> BarrierInfo {
268        {
269            {
270                let prev_epoch =
271                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
272                *prev_epoch_fake_physical_time += 1;
273                let curr_epoch =
274                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
275                pending_non_checkpoint_barriers.push(prev_epoch.value().0);
276                let kind = match kind {
277                    PbBarrierKind::Unspecified => {
278                        unreachable!()
279                    }
280                    PbBarrierKind::Initial => {
281                        pending_non_checkpoint_barriers.clear();
282                        BarrierKind::Initial
283                    }
284                    PbBarrierKind::Barrier => BarrierKind::Barrier,
285                    PbBarrierKind::Checkpoint => {
286                        BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
287                    }
288                };
289                BarrierInfo {
290                    prev_epoch,
291                    curr_epoch,
292                    kind,
293                }
294            }
295        }
296    }
297
298    pub(super) fn is_finishing(&self) -> bool {
299        matches!(self, Self::Finishing(_))
300    }
301}