risingwave_meta/barrier/checkpoint/creating_job/
status.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
25use risingwave_pb::stream_plan::barrier::PbBarrierKind;
26use risingwave_pb::stream_plan::barrier_mutation::Mutation;
27use risingwave_pb::stream_service::barrier_complete_response::{
28    CreateMviewProgress, PbCreateMviewProgress,
29};
30use tracing::warn;
31
32use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
33use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
34
35#[derive(Debug)]
36pub(super) struct CreateMviewLogStoreProgressTracker {
37    /// `actor_id` -> `pending_epoch_lag`
38    ongoing_actors: HashMap<ActorId, u64>,
39    finished_actors: HashSet<ActorId>,
40}
41
42impl CreateMviewLogStoreProgressTracker {
43    pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
44        Self {
45            ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
46            finished_actors: HashSet::new(),
47        }
48    }
49
50    pub(super) fn gen_ddl_progress(&self) -> String {
51        let sum = self.ongoing_actors.values().sum::<u64>() as f64;
52        let count = if self.ongoing_actors.is_empty() {
53            1
54        } else {
55            self.ongoing_actors.len()
56        } as f64;
57        let avg = sum / count;
58        let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
59        format!(
60            "actor: {}/{}, avg lag {:?}",
61            self.finished_actors.len(),
62            self.ongoing_actors.len() + self.finished_actors.len(),
63            avg_lag_time
64        )
65    }
66
67    fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
68        for progress in progress {
69            match self.ongoing_actors.entry(progress.backfill_actor_id) {
70                Entry::Occupied(mut entry) => {
71                    if progress.done {
72                        entry.remove_entry();
73                        assert!(
74                            self.finished_actors.insert(progress.backfill_actor_id),
75                            "non-duplicate"
76                        );
77                    } else {
78                        *entry.get_mut() = progress.pending_epoch_lag as _;
79                    }
80                }
81                Entry::Vacant(_) => {
82                    if cfg!(debug_assertions) {
83                        panic!(
84                            "reporting progress on non-inflight actor: {:?} {:?}",
85                            progress, self
86                        );
87                    } else {
88                        warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
89                    }
90                }
91            }
92        }
93    }
94
95    pub(super) fn is_finished(&self) -> bool {
96        self.ongoing_actors.is_empty()
97    }
98}
99
100#[derive(Debug)]
101pub(super) enum CreatingStreamingJobStatus {
102    /// The creating job is consuming upstream snapshot.
103    /// Will transit to `ConsumingLogStore` on `update_progress` when
104    /// the snapshot has been fully consumed after `update_progress`.
105    ConsumingSnapshot {
106        prev_epoch_fake_physical_time: u64,
107        pending_upstream_barriers: Vec<BarrierInfo>,
108        version_stats: HummockVersionStats,
109        create_mview_tracker: CreateMviewProgressTracker,
110        snapshot_backfill_actors: HashSet<ActorId>,
111        backfill_epoch: u64,
112        /// The `prev_epoch` of pending non checkpoint barriers
113        pending_non_checkpoint_barriers: Vec<u64>,
114    },
115    /// The creating job is consuming log store.
116    ///
117    /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
118    ConsumingLogStore {
119        tracking_job: TrackingJob,
120        log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
121        barriers_to_inject: Option<Vec<BarrierInfo>>,
122    },
123    /// All backfill actors have started consuming upstream, and the job
124    /// will be finished when all previously injected barriers have been collected
125    /// Store the `prev_epoch` that will finish at.
126    Finishing(u64, TrackingJob),
127    PlaceHolder,
128}
129
130impl CreatingStreamingJobStatus {
131    pub(super) fn update_progress(
132        &mut self,
133        create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
134    ) {
135        match self {
136            &mut Self::ConsumingSnapshot {
137                ref mut create_mview_tracker,
138                ref version_stats,
139                ref mut prev_epoch_fake_physical_time,
140                ref mut pending_upstream_barriers,
141                ref mut pending_non_checkpoint_barriers,
142                ref backfill_epoch,
143                ..
144            } => {
145                for progress in create_mview_progress {
146                    create_mview_tracker.apply_progress(progress, version_stats);
147                }
148                if create_mview_tracker.is_finished() {
149                    pending_non_checkpoint_barriers.push(*backfill_epoch);
150
151                    let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
152                    let barriers_to_inject: Vec<_> = [BarrierInfo {
153                        curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
154                        prev_epoch: TracedEpoch::new(prev_epoch),
155                        kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
156                    }]
157                    .into_iter()
158                    .chain(pending_upstream_barriers.drain(..))
159                    .collect();
160
161                    let CreatingStreamingJobStatus::ConsumingSnapshot {
162                        create_mview_tracker,
163                        backfill_epoch,
164                        snapshot_backfill_actors,
165                        ..
166                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
167                    else {
168                        unreachable!()
169                    };
170
171                    let tracking_job = create_mview_tracker.into_tracking_job();
172
173                    *self = CreatingStreamingJobStatus::ConsumingLogStore {
174                        tracking_job,
175                        log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
176                            snapshot_backfill_actors.iter().cloned(),
177                            barriers_to_inject
178                                .last()
179                                .map(|barrier_info| {
180                                    barrier_info.prev_epoch().saturating_sub(backfill_epoch)
181                                })
182                                .unwrap_or(0),
183                        ),
184                        barriers_to_inject: Some(barriers_to_inject),
185                    };
186                }
187            }
188            CreatingStreamingJobStatus::ConsumingLogStore {
189                log_store_progress_tracker,
190                ..
191            } => {
192                log_store_progress_tracker.update(create_mview_progress);
193            }
194            CreatingStreamingJobStatus::Finishing(..) => {}
195            CreatingStreamingJobStatus::PlaceHolder => {
196                unreachable!()
197            }
198        }
199    }
200
201    pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) {
202        match self {
203            CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
204                unreachable!(
205                    "should not start consuming upstream for a job that are consuming snapshot"
206                )
207            }
208            CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
209                let prev_epoch = barrier_info.prev_epoch();
210                {
211                    assert!(barrier_info.kind.is_checkpoint());
212                    let CreatingStreamingJobStatus::ConsumingLogStore { tracking_job, .. } =
213                        replace(self, CreatingStreamingJobStatus::PlaceHolder)
214                    else {
215                        unreachable!()
216                    };
217                    *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
218                }
219            }
220            CreatingStreamingJobStatus::Finishing { .. } => {
221                unreachable!("should not start consuming upstream for a job again")
222            }
223            CreatingStreamingJobStatus::PlaceHolder => {
224                unreachable!()
225            }
226        }
227    }
228
229    pub(super) fn on_new_upstream_epoch(
230        &mut self,
231        barrier_info: &BarrierInfo,
232    ) -> Vec<(BarrierInfo, Option<Mutation>)> {
233        match self {
234            CreatingStreamingJobStatus::ConsumingSnapshot {
235                pending_upstream_barriers,
236                prev_epoch_fake_physical_time,
237                pending_non_checkpoint_barriers,
238                create_mview_tracker,
239                ..
240            } => {
241                let mutation = {
242                    let pending_backfill_nodes = create_mview_tracker
243                        .take_pending_backfill_nodes()
244                        .collect_vec();
245                    if pending_backfill_nodes.is_empty() {
246                        None
247                    } else {
248                        Some(Mutation::StartFragmentBackfill(
249                            StartFragmentBackfillMutation {
250                                fragment_ids: pending_backfill_nodes,
251                            },
252                        ))
253                    }
254                };
255                pending_upstream_barriers.push(barrier_info.clone());
256                vec![(
257                    CreatingStreamingJobStatus::new_fake_barrier(
258                        prev_epoch_fake_physical_time,
259                        pending_non_checkpoint_barriers,
260                        match barrier_info.kind {
261                            BarrierKind::Barrier => PbBarrierKind::Barrier,
262                            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
263                            BarrierKind::Initial => {
264                                unreachable!("upstream new epoch should not be initial")
265                            }
266                        },
267                    ),
268                    mutation,
269                )]
270            }
271            CreatingStreamingJobStatus::ConsumingLogStore {
272                barriers_to_inject, ..
273            } => barriers_to_inject
274                .take()
275                .into_iter()
276                .flatten()
277                .chain([barrier_info.clone()])
278                .map(|barrier_info| (barrier_info, None))
279                .collect(),
280            CreatingStreamingJobStatus::Finishing { .. } => {
281                vec![]
282            }
283            CreatingStreamingJobStatus::PlaceHolder => {
284                unreachable!()
285            }
286        }
287    }
288
289    pub(super) fn new_fake_barrier(
290        prev_epoch_fake_physical_time: &mut u64,
291        pending_non_checkpoint_barriers: &mut Vec<u64>,
292        kind: PbBarrierKind,
293    ) -> BarrierInfo {
294        {
295            {
296                let prev_epoch =
297                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
298                *prev_epoch_fake_physical_time += 1;
299                let curr_epoch =
300                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
301                pending_non_checkpoint_barriers.push(prev_epoch.value().0);
302                let kind = match kind {
303                    PbBarrierKind::Unspecified => {
304                        unreachable!()
305                    }
306                    PbBarrierKind::Initial => {
307                        pending_non_checkpoint_barriers.clear();
308                        BarrierKind::Initial
309                    }
310                    PbBarrierKind::Barrier => BarrierKind::Barrier,
311                    PbBarrierKind::Checkpoint => {
312                        BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
313                    }
314                };
315                BarrierInfo {
316                    prev_epoch,
317                    curr_epoch,
318                    kind,
319                }
320            }
321        }
322    }
323
324    pub(super) fn is_finishing(&self) -> bool {
325        matches!(self, Self::Finishing(..))
326    }
327}