risingwave_meta/barrier/checkpoint/creating_job/
status.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::FragmentId;
25use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
26use risingwave_pb::stream_plan::barrier::PbBarrierKind;
27use risingwave_pb::stream_plan::barrier_mutation::Mutation;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    CreateMviewProgress, PbCreateMviewProgress,
30};
31use tracing::warn;
32
33use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
34use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
35use crate::controller::fragment::InflightFragmentInfo;
36
37#[derive(Debug)]
38pub(super) struct CreateMviewLogStoreProgressTracker {
39    /// `actor_id` -> `pending_epoch_lag`
40    ongoing_actors: HashMap<ActorId, u64>,
41    finished_actors: HashSet<ActorId>,
42}
43
44impl CreateMviewLogStoreProgressTracker {
45    pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
46        Self {
47            ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
48            finished_actors: HashSet::new(),
49        }
50    }
51
52    pub(super) fn gen_ddl_progress(&self) -> String {
53        let sum = self.ongoing_actors.values().sum::<u64>() as f64;
54        let count = if self.ongoing_actors.is_empty() {
55            1
56        } else {
57            self.ongoing_actors.len()
58        } as f64;
59        let avg = sum / count;
60        let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
61        format!(
62            "actor: {}/{}, avg lag {:?}",
63            self.finished_actors.len(),
64            self.ongoing_actors.len() + self.finished_actors.len(),
65            avg_lag_time
66        )
67    }
68
69    fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
70        for progress in progress {
71            match self.ongoing_actors.entry(progress.backfill_actor_id) {
72                Entry::Occupied(mut entry) => {
73                    if progress.done {
74                        entry.remove_entry();
75                        assert!(
76                            self.finished_actors.insert(progress.backfill_actor_id),
77                            "non-duplicate"
78                        );
79                    } else {
80                        *entry.get_mut() = progress.pending_epoch_lag as _;
81                    }
82                }
83                Entry::Vacant(_) => {
84                    if cfg!(debug_assertions) {
85                        panic!(
86                            "reporting progress on non-inflight actor: {:?} {:?}",
87                            progress, self
88                        );
89                    } else {
90                        warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
91                    }
92                }
93            }
94        }
95    }
96
97    pub(super) fn is_finished(&self) -> bool {
98        self.ongoing_actors.is_empty()
99    }
100}
101
102#[derive(Debug)]
103pub(super) enum CreatingStreamingJobStatus {
104    /// The creating job is consuming upstream snapshot.
105    /// Will transit to `ConsumingLogStore` on `update_progress` when
106    /// the snapshot has been fully consumed after `update_progress`.
107    ConsumingSnapshot {
108        prev_epoch_fake_physical_time: u64,
109        pending_upstream_barriers: Vec<BarrierInfo>,
110        version_stats: HummockVersionStats,
111        create_mview_tracker: CreateMviewProgressTracker,
112        snapshot_backfill_actors: HashSet<ActorId>,
113        backfill_epoch: u64,
114        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
115        /// The `prev_epoch` of pending non checkpoint barriers
116        pending_non_checkpoint_barriers: Vec<u64>,
117    },
118    /// The creating job is consuming log store.
119    ///
120    /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
121    ConsumingLogStore {
122        tracking_job: TrackingJob,
123        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
124        log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
125        barriers_to_inject: Option<Vec<BarrierInfo>>,
126    },
127    /// All backfill actors have started consuming upstream, and the job
128    /// will be finished when all previously injected barriers have been collected
129    /// Store the `prev_epoch` that will finish at.
130    Finishing(u64, TrackingJob),
131    PlaceHolder,
132}
133
134impl CreatingStreamingJobStatus {
135    pub(super) fn update_progress(
136        &mut self,
137        create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
138    ) {
139        match self {
140            &mut Self::ConsumingSnapshot {
141                ref mut create_mview_tracker,
142                ref version_stats,
143                ref mut prev_epoch_fake_physical_time,
144                ref mut pending_upstream_barriers,
145                ref mut pending_non_checkpoint_barriers,
146                ref backfill_epoch,
147                ..
148            } => {
149                for progress in create_mview_progress {
150                    create_mview_tracker.apply_progress(progress, version_stats);
151                }
152                if create_mview_tracker.is_finished() {
153                    pending_non_checkpoint_barriers.push(*backfill_epoch);
154
155                    let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
156                    let barriers_to_inject: Vec<_> = [BarrierInfo {
157                        curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
158                        prev_epoch: TracedEpoch::new(prev_epoch),
159                        kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
160                    }]
161                    .into_iter()
162                    .chain(pending_upstream_barriers.drain(..))
163                    .collect();
164
165                    let CreatingStreamingJobStatus::ConsumingSnapshot {
166                        create_mview_tracker,
167                        fragment_infos,
168                        backfill_epoch,
169                        snapshot_backfill_actors,
170                        ..
171                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
172                    else {
173                        unreachable!()
174                    };
175
176                    let tracking_job = create_mview_tracker.into_tracking_job();
177
178                    *self = CreatingStreamingJobStatus::ConsumingLogStore {
179                        tracking_job,
180                        fragment_infos,
181                        log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
182                            snapshot_backfill_actors.iter().cloned(),
183                            barriers_to_inject
184                                .last()
185                                .map(|barrier_info| {
186                                    barrier_info.prev_epoch().saturating_sub(backfill_epoch)
187                                })
188                                .unwrap_or(0),
189                        ),
190                        barriers_to_inject: Some(barriers_to_inject),
191                    };
192                }
193            }
194            CreatingStreamingJobStatus::ConsumingLogStore {
195                log_store_progress_tracker,
196                ..
197            } => {
198                log_store_progress_tracker.update(create_mview_progress);
199            }
200            CreatingStreamingJobStatus::Finishing(..) => {}
201            CreatingStreamingJobStatus::PlaceHolder => {
202                unreachable!()
203            }
204        }
205    }
206
207    pub(super) fn start_consume_upstream(
208        &mut self,
209        barrier_info: &BarrierInfo,
210    ) -> HashMap<FragmentId, InflightFragmentInfo> {
211        match self {
212            CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
213                unreachable!(
214                    "should not start consuming upstream for a job that are consuming snapshot"
215                )
216            }
217            CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
218                let prev_epoch = barrier_info.prev_epoch();
219                {
220                    assert!(barrier_info.kind.is_checkpoint());
221                    let CreatingStreamingJobStatus::ConsumingLogStore {
222                        fragment_infos,
223                        tracking_job,
224                        ..
225                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
226                    else {
227                        unreachable!()
228                    };
229                    *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
230                    fragment_infos
231                }
232            }
233            CreatingStreamingJobStatus::Finishing { .. } => {
234                unreachable!("should not start consuming upstream for a job again")
235            }
236            CreatingStreamingJobStatus::PlaceHolder => {
237                unreachable!()
238            }
239        }
240    }
241
242    pub(super) fn on_new_upstream_epoch(
243        &mut self,
244        barrier_info: &BarrierInfo,
245    ) -> Vec<(BarrierInfo, Option<Mutation>)> {
246        match self {
247            CreatingStreamingJobStatus::ConsumingSnapshot {
248                pending_upstream_barriers,
249                prev_epoch_fake_physical_time,
250                pending_non_checkpoint_barriers,
251                create_mview_tracker,
252                ..
253            } => {
254                let mutation = {
255                    let pending_backfill_nodes = create_mview_tracker
256                        .take_pending_backfill_nodes()
257                        .collect_vec();
258                    if pending_backfill_nodes.is_empty() {
259                        None
260                    } else {
261                        Some(Mutation::StartFragmentBackfill(
262                            StartFragmentBackfillMutation {
263                                fragment_ids: pending_backfill_nodes,
264                            },
265                        ))
266                    }
267                };
268                pending_upstream_barriers.push(barrier_info.clone());
269                vec![(
270                    CreatingStreamingJobStatus::new_fake_barrier(
271                        prev_epoch_fake_physical_time,
272                        pending_non_checkpoint_barriers,
273                        match barrier_info.kind {
274                            BarrierKind::Barrier => PbBarrierKind::Barrier,
275                            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
276                            BarrierKind::Initial => {
277                                unreachable!("upstream new epoch should not be initial")
278                            }
279                        },
280                    ),
281                    mutation,
282                )]
283            }
284            CreatingStreamingJobStatus::ConsumingLogStore {
285                barriers_to_inject, ..
286            } => barriers_to_inject
287                .take()
288                .into_iter()
289                .flatten()
290                .chain([barrier_info.clone()])
291                .map(|barrier_info| (barrier_info, None))
292                .collect(),
293            CreatingStreamingJobStatus::Finishing { .. } => {
294                vec![]
295            }
296            CreatingStreamingJobStatus::PlaceHolder => {
297                unreachable!()
298            }
299        }
300    }
301
302    pub(super) fn new_fake_barrier(
303        prev_epoch_fake_physical_time: &mut u64,
304        pending_non_checkpoint_barriers: &mut Vec<u64>,
305        kind: PbBarrierKind,
306    ) -> BarrierInfo {
307        {
308            {
309                let prev_epoch =
310                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
311                *prev_epoch_fake_physical_time += 1;
312                let curr_epoch =
313                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
314                pending_non_checkpoint_barriers.push(prev_epoch.value().0);
315                let kind = match kind {
316                    PbBarrierKind::Unspecified => {
317                        unreachable!()
318                    }
319                    PbBarrierKind::Initial => {
320                        pending_non_checkpoint_barriers.clear();
321                        BarrierKind::Initial
322                    }
323                    PbBarrierKind::Barrier => BarrierKind::Barrier,
324                    PbBarrierKind::Checkpoint => {
325                        BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
326                    }
327                };
328                BarrierInfo {
329                    prev_epoch,
330                    curr_epoch,
331                    kind,
332                }
333            }
334        }
335    }
336
337    pub(super) fn is_finishing(&self) -> bool {
338        matches!(self, Self::Finishing(..))
339    }
340
341    pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
342        match self {
343            CreatingStreamingJobStatus::ConsumingSnapshot { fragment_infos, .. } => {
344                Some(fragment_infos)
345            }
346            CreatingStreamingJobStatus::ConsumingLogStore { fragment_infos, .. } => {
347                Some(fragment_infos)
348            }
349            CreatingStreamingJobStatus::Finishing(..) => None,
350            CreatingStreamingJobStatus::PlaceHolder => {
351                unreachable!()
352            }
353        }
354    }
355}