risingwave_meta/barrier/checkpoint/creating_job/
status.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::FragmentId;
25use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
26use risingwave_pb::stream_plan::barrier::PbBarrierKind;
27use risingwave_pb::stream_plan::barrier_mutation::Mutation;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    CreateMviewProgress, PbCreateMviewProgress,
30};
31use tracing::warn;
32
33use crate::barrier::checkpoint::creating_job::CreatingJobInfo;
34use crate::barrier::checkpoint::recovery::ResetPartialGraphCollector;
35use crate::barrier::notifier::Notifier;
36use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
37use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
38use crate::controller::fragment::InflightFragmentInfo;
39
40#[derive(Debug)]
41pub(super) struct CreateMviewLogStoreProgressTracker {
42    /// `actor_id` -> `pending_epoch_lag`
43    ongoing_actors: HashMap<ActorId, u64>,
44    finished_actors: HashSet<ActorId>,
45}
46
47impl CreateMviewLogStoreProgressTracker {
48    pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
49        Self {
50            ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
51            finished_actors: HashSet::new(),
52        }
53    }
54
55    pub(super) fn gen_backfill_progress(&self) -> String {
56        let sum = self.ongoing_actors.values().sum::<u64>() as f64;
57        let count = if self.ongoing_actors.is_empty() {
58            1
59        } else {
60            self.ongoing_actors.len()
61        } as f64;
62        let avg = sum / count;
63        let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
64        format!(
65            "actor: {}/{}, avg lag {:?}",
66            self.finished_actors.len(),
67            self.ongoing_actors.len() + self.finished_actors.len(),
68            avg_lag_time
69        )
70    }
71
72    fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
73        for progress in progress {
74            match self.ongoing_actors.entry(progress.backfill_actor_id) {
75                Entry::Occupied(mut entry) => {
76                    if progress.done {
77                        entry.remove_entry();
78                        assert!(
79                            self.finished_actors.insert(progress.backfill_actor_id),
80                            "non-duplicate"
81                        );
82                    } else {
83                        *entry.get_mut() = progress.pending_epoch_lag as _;
84                    }
85                }
86                Entry::Vacant(_) => {
87                    if cfg!(debug_assertions) {
88                        panic!(
89                            "reporting progress on non-inflight actor: {:?} {:?}",
90                            progress, self
91                        );
92                    } else {
93                        warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
94                    }
95                }
96            }
97        }
98    }
99
100    pub(super) fn is_finished(&self) -> bool {
101        self.ongoing_actors.is_empty()
102    }
103}
104
105#[derive(Debug)]
106pub(super) enum CreatingStreamingJobStatus {
107    /// The creating job is consuming upstream snapshot.
108    /// Will transit to `ConsumingLogStore` on `update_progress` when
109    /// the snapshot has been fully consumed after `update_progress`.
110    ConsumingSnapshot {
111        prev_epoch_fake_physical_time: u64,
112        pending_upstream_barriers: Vec<BarrierInfo>,
113        version_stats: HummockVersionStats,
114        create_mview_tracker: CreateMviewProgressTracker,
115        snapshot_backfill_actors: HashSet<ActorId>,
116        snapshot_epoch: u64,
117        info: CreatingJobInfo,
118        /// The `prev_epoch` of pending non checkpoint barriers
119        pending_non_checkpoint_barriers: Vec<u64>,
120    },
121    /// The creating job is consuming log store.
122    ///
123    /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
124    ConsumingLogStore {
125        tracking_job: TrackingJob,
126        info: CreatingJobInfo,
127        log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
128        barriers_to_inject: Option<Vec<BarrierInfo>>,
129    },
130    /// All backfill actors have started consuming upstream, and the job
131    /// will be finished when all previously injected barriers have been collected
132    /// Store the `prev_epoch` that will finish at.
133    Finishing(u64, TrackingJob),
134    Resetting(ResetPartialGraphCollector, Vec<Notifier>),
135    PlaceHolder,
136}
137
138impl CreatingStreamingJobStatus {
139    pub(super) fn update_progress(
140        &mut self,
141        create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
142    ) {
143        match self {
144            &mut Self::ConsumingSnapshot {
145                ref mut create_mview_tracker,
146                ref version_stats,
147                ref mut prev_epoch_fake_physical_time,
148                ref mut pending_upstream_barriers,
149                ref mut pending_non_checkpoint_barriers,
150                ref snapshot_epoch,
151                ..
152            } => {
153                for progress in create_mview_progress {
154                    create_mview_tracker.apply_progress(progress, version_stats);
155                }
156                if create_mview_tracker.is_finished() {
157                    pending_non_checkpoint_barriers.push(*snapshot_epoch);
158
159                    let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
160                    let barriers_to_inject: Vec<_> = [BarrierInfo {
161                        curr_epoch: TracedEpoch::new(Epoch(*snapshot_epoch)),
162                        prev_epoch: TracedEpoch::new(prev_epoch),
163                        kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
164                    }]
165                    .into_iter()
166                    .chain(pending_upstream_barriers.drain(..))
167                    .collect();
168
169                    let CreatingStreamingJobStatus::ConsumingSnapshot {
170                        create_mview_tracker,
171                        info,
172                        snapshot_epoch,
173                        snapshot_backfill_actors,
174                        ..
175                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
176                    else {
177                        unreachable!()
178                    };
179
180                    let tracking_job = create_mview_tracker.into_tracking_job();
181
182                    *self = CreatingStreamingJobStatus::ConsumingLogStore {
183                        tracking_job,
184                        info,
185                        log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
186                            snapshot_backfill_actors.iter().cloned(),
187                            barriers_to_inject
188                                .last()
189                                .map(|barrier_info| {
190                                    barrier_info.prev_epoch().saturating_sub(snapshot_epoch)
191                                })
192                                .unwrap_or(0),
193                        ),
194                        barriers_to_inject: Some(barriers_to_inject),
195                    };
196                }
197            }
198            CreatingStreamingJobStatus::ConsumingLogStore {
199                log_store_progress_tracker,
200                ..
201            } => {
202                log_store_progress_tracker.update(create_mview_progress);
203            }
204            CreatingStreamingJobStatus::Finishing(..)
205            | CreatingStreamingJobStatus::Resetting(_, _) => {}
206            CreatingStreamingJobStatus::PlaceHolder => {
207                unreachable!()
208            }
209        }
210    }
211
212    pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) -> CreatingJobInfo {
213        match self {
214            CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
215                unreachable!(
216                    "should not start consuming upstream for a job that are consuming snapshot"
217                )
218            }
219            CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
220                let prev_epoch = barrier_info.prev_epoch();
221                {
222                    assert!(barrier_info.kind.is_checkpoint());
223                    let CreatingStreamingJobStatus::ConsumingLogStore {
224                        info, tracking_job, ..
225                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
226                    else {
227                        unreachable!()
228                    };
229                    *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
230                    info
231                }
232            }
233            CreatingStreamingJobStatus::Finishing { .. } => {
234                unreachable!("should not start consuming upstream for a job again")
235            }
236            CreatingStreamingJobStatus::Resetting(_, _) => {
237                unreachable!("unlikely to start consume upstream when resetting")
238            }
239            CreatingStreamingJobStatus::PlaceHolder => {
240                unreachable!()
241            }
242        }
243    }
244
245    pub(super) fn on_new_upstream_epoch(
246        &mut self,
247        barrier_info: &BarrierInfo,
248        mutation: Option<Mutation>, // mutation to be set for the first barrier to inject
249    ) -> Vec<(BarrierInfo, Option<Mutation>)> {
250        match self {
251            CreatingStreamingJobStatus::ConsumingSnapshot {
252                pending_upstream_barriers,
253                prev_epoch_fake_physical_time,
254                pending_non_checkpoint_barriers,
255                create_mview_tracker,
256                ..
257            } => {
258                let mutation = mutation.or_else(|| {
259                    let pending_backfill_nodes = create_mview_tracker
260                        .take_pending_backfill_nodes()
261                        .collect_vec();
262                    if pending_backfill_nodes.is_empty() {
263                        None
264                    } else {
265                        Some(Mutation::StartFragmentBackfill(
266                            StartFragmentBackfillMutation {
267                                fragment_ids: pending_backfill_nodes,
268                            },
269                        ))
270                    }
271                });
272                pending_upstream_barriers.push(barrier_info.clone());
273                vec![(
274                    CreatingStreamingJobStatus::new_fake_barrier(
275                        prev_epoch_fake_physical_time,
276                        pending_non_checkpoint_barriers,
277                        match barrier_info.kind {
278                            BarrierKind::Barrier => PbBarrierKind::Barrier,
279                            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
280                            BarrierKind::Initial => {
281                                unreachable!("upstream new epoch should not be initial")
282                            }
283                        },
284                    ),
285                    mutation,
286                )]
287            }
288            CreatingStreamingJobStatus::ConsumingLogStore {
289                barriers_to_inject, ..
290            } => barriers_to_inject
291                .take()
292                .into_iter()
293                .flatten()
294                .chain([barrier_info.clone()])
295                .map(|barrier_info| (barrier_info, None))
296                .collect(),
297            CreatingStreamingJobStatus::Finishing { .. }
298            | CreatingStreamingJobStatus::Resetting(_, _) => {
299                vec![]
300            }
301            CreatingStreamingJobStatus::PlaceHolder => {
302                unreachable!()
303            }
304        }
305    }
306
307    pub(super) fn new_fake_barrier(
308        prev_epoch_fake_physical_time: &mut u64,
309        pending_non_checkpoint_barriers: &mut Vec<u64>,
310        kind: PbBarrierKind,
311    ) -> BarrierInfo {
312        {
313            {
314                let prev_epoch =
315                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
316                *prev_epoch_fake_physical_time += 1;
317                let curr_epoch =
318                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
319                pending_non_checkpoint_barriers.push(prev_epoch.value().0);
320                let kind = match kind {
321                    PbBarrierKind::Unspecified => {
322                        unreachable!()
323                    }
324                    PbBarrierKind::Initial => {
325                        pending_non_checkpoint_barriers.clear();
326                        BarrierKind::Initial
327                    }
328                    PbBarrierKind::Barrier => BarrierKind::Barrier,
329                    PbBarrierKind::Checkpoint => {
330                        BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
331                    }
332                };
333                BarrierInfo {
334                    prev_epoch,
335                    curr_epoch,
336                    kind,
337                }
338            }
339        }
340    }
341
342    pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
343        match self {
344            CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
345            | CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
346                Some(&info.fragment_infos)
347            }
348            CreatingStreamingJobStatus::Finishing(..)
349            | CreatingStreamingJobStatus::Resetting(_, _) => None,
350            CreatingStreamingJobStatus::PlaceHolder => {
351                unreachable!()
352            }
353        }
354    }
355}