risingwave_meta/barrier/checkpoint/creating_job/
status.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::FragmentId;
25use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
26use risingwave_pb::stream_plan::barrier::PbBarrierKind;
27use risingwave_pb::stream_plan::barrier_mutation::Mutation;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    CreateMviewProgress, PbCreateMviewProgress,
30};
31use tracing::warn;
32
33use crate::barrier::checkpoint::creating_job::CreatingJobInfo;
34use crate::barrier::notifier::Notifier;
35use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
36use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
37use crate::controller::fragment::InflightFragmentInfo;
38
39#[derive(Debug)]
40pub(super) struct CreateMviewLogStoreProgressTracker {
41    /// `actor_id` -> `pending_epoch_lag`
42    ongoing_actors: HashMap<ActorId, u64>,
43    finished_actors: HashSet<ActorId>,
44}
45
46impl CreateMviewLogStoreProgressTracker {
47    pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
48        Self {
49            ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
50            finished_actors: HashSet::new(),
51        }
52    }
53
54    pub(super) fn gen_backfill_progress(&self) -> String {
55        let sum = self.ongoing_actors.values().sum::<u64>() as f64;
56        let count = if self.ongoing_actors.is_empty() {
57            1
58        } else {
59            self.ongoing_actors.len()
60        } as f64;
61        let avg = sum / count;
62        let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
63        format!(
64            "actor: {}/{}, avg lag {:?}",
65            self.finished_actors.len(),
66            self.ongoing_actors.len() + self.finished_actors.len(),
67            avg_lag_time
68        )
69    }
70
71    fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
72        for progress in progress {
73            match self.ongoing_actors.entry(progress.backfill_actor_id) {
74                Entry::Occupied(mut entry) => {
75                    if progress.done {
76                        entry.remove_entry();
77                        assert!(
78                            self.finished_actors.insert(progress.backfill_actor_id),
79                            "non-duplicate"
80                        );
81                    } else {
82                        *entry.get_mut() = progress.pending_epoch_lag as _;
83                    }
84                }
85                Entry::Vacant(_) => {
86                    if cfg!(debug_assertions) {
87                        panic!(
88                            "reporting progress on non-inflight actor: {:?} {:?}",
89                            progress, self
90                        );
91                    } else {
92                        warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
93                    }
94                }
95            }
96        }
97    }
98
99    pub(super) fn is_finished(&self) -> bool {
100        self.ongoing_actors.is_empty()
101    }
102}
103
104#[derive(Debug)]
105pub(super) enum CreatingStreamingJobStatus {
106    /// The creating job is consuming upstream snapshot.
107    /// Will transit to `ConsumingLogStore` on `update_progress` when
108    /// the snapshot has been fully consumed after `update_progress`.
109    ConsumingSnapshot {
110        prev_epoch_fake_physical_time: u64,
111        pending_upstream_barriers: Vec<BarrierInfo>,
112        version_stats: HummockVersionStats,
113        create_mview_tracker: CreateMviewProgressTracker,
114        snapshot_backfill_actors: HashSet<ActorId>,
115        snapshot_epoch: u64,
116        info: CreatingJobInfo,
117        /// The `prev_epoch` of pending non checkpoint barriers
118        pending_non_checkpoint_barriers: Vec<u64>,
119    },
120    /// The creating job is consuming log store.
121    ///
122    /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
123    ConsumingLogStore {
124        tracking_job: TrackingJob,
125        info: CreatingJobInfo,
126        log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
127        barriers_to_inject: Option<Vec<BarrierInfo>>,
128    },
129    /// All backfill actors have started consuming upstream, and the job
130    /// will be finished when all previously injected barriers have been collected
131    /// Store the `prev_epoch` that will finish at.
132    Finishing(u64, TrackingJob),
133    Resetting(Vec<Notifier>),
134    PlaceHolder,
135}
136
137impl CreatingStreamingJobStatus {
138    pub(super) fn update_progress(
139        &mut self,
140        create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
141    ) {
142        match self {
143            &mut Self::ConsumingSnapshot {
144                ref mut create_mview_tracker,
145                ref version_stats,
146                ref mut prev_epoch_fake_physical_time,
147                ref mut pending_upstream_barriers,
148                ref mut pending_non_checkpoint_barriers,
149                ref snapshot_epoch,
150                ..
151            } => {
152                for progress in create_mview_progress {
153                    create_mview_tracker.apply_progress(progress, version_stats);
154                }
155                if create_mview_tracker.is_finished() {
156                    pending_non_checkpoint_barriers.push(*snapshot_epoch);
157
158                    let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
159                    let barriers_to_inject: Vec<_> = [BarrierInfo {
160                        curr_epoch: TracedEpoch::new(Epoch(*snapshot_epoch)),
161                        prev_epoch: TracedEpoch::new(prev_epoch),
162                        kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
163                    }]
164                    .into_iter()
165                    .chain(pending_upstream_barriers.drain(..))
166                    .collect();
167
168                    let CreatingStreamingJobStatus::ConsumingSnapshot {
169                        create_mview_tracker,
170                        info,
171                        snapshot_epoch,
172                        snapshot_backfill_actors,
173                        ..
174                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
175                    else {
176                        unreachable!()
177                    };
178
179                    let tracking_job = create_mview_tracker.into_tracking_job();
180
181                    *self = CreatingStreamingJobStatus::ConsumingLogStore {
182                        tracking_job,
183                        info,
184                        log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
185                            snapshot_backfill_actors.iter().cloned(),
186                            barriers_to_inject
187                                .last()
188                                .map(|barrier_info| {
189                                    barrier_info.prev_epoch().saturating_sub(snapshot_epoch)
190                                })
191                                .unwrap_or(0),
192                        ),
193                        barriers_to_inject: Some(barriers_to_inject),
194                    };
195                }
196            }
197            CreatingStreamingJobStatus::ConsumingLogStore {
198                log_store_progress_tracker,
199                ..
200            } => {
201                log_store_progress_tracker.update(create_mview_progress);
202            }
203            CreatingStreamingJobStatus::Finishing(..)
204            | CreatingStreamingJobStatus::Resetting(..) => {}
205            CreatingStreamingJobStatus::PlaceHolder => {
206                unreachable!()
207            }
208        }
209    }
210
211    pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) -> CreatingJobInfo {
212        match self {
213            CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
214                unreachable!(
215                    "should not start consuming upstream for a job that are consuming snapshot"
216                )
217            }
218            CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
219                let prev_epoch = barrier_info.prev_epoch();
220                {
221                    assert!(barrier_info.kind.is_checkpoint());
222                    let CreatingStreamingJobStatus::ConsumingLogStore {
223                        info, tracking_job, ..
224                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
225                    else {
226                        unreachable!()
227                    };
228                    *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
229                    info
230                }
231            }
232            CreatingStreamingJobStatus::Finishing { .. } => {
233                unreachable!("should not start consuming upstream for a job again")
234            }
235            CreatingStreamingJobStatus::Resetting(..) => {
236                unreachable!("unlikely to start consume upstream when resetting")
237            }
238            CreatingStreamingJobStatus::PlaceHolder => {
239                unreachable!()
240            }
241        }
242    }
243
244    pub(super) fn on_new_upstream_epoch(
245        &mut self,
246        barrier_info: &BarrierInfo,
247        mutation: Option<Mutation>, // mutation to be set for the first barrier to inject
248    ) -> Vec<(BarrierInfo, Option<Mutation>)> {
249        match self {
250            CreatingStreamingJobStatus::ConsumingSnapshot {
251                pending_upstream_barriers,
252                prev_epoch_fake_physical_time,
253                pending_non_checkpoint_barriers,
254                create_mview_tracker,
255                ..
256            } => {
257                let mutation = mutation.or_else(|| {
258                    let pending_backfill_nodes = create_mview_tracker
259                        .take_pending_backfill_nodes()
260                        .collect_vec();
261                    if pending_backfill_nodes.is_empty() {
262                        None
263                    } else {
264                        Some(Mutation::StartFragmentBackfill(
265                            StartFragmentBackfillMutation {
266                                fragment_ids: pending_backfill_nodes,
267                            },
268                        ))
269                    }
270                });
271                pending_upstream_barriers.push(barrier_info.clone());
272                vec![(
273                    CreatingStreamingJobStatus::new_fake_barrier(
274                        prev_epoch_fake_physical_time,
275                        pending_non_checkpoint_barriers,
276                        match barrier_info.kind {
277                            BarrierKind::Barrier => PbBarrierKind::Barrier,
278                            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
279                            BarrierKind::Initial => {
280                                unreachable!("upstream new epoch should not be initial")
281                            }
282                        },
283                    ),
284                    mutation,
285                )]
286            }
287            CreatingStreamingJobStatus::ConsumingLogStore {
288                barriers_to_inject, ..
289            } => barriers_to_inject
290                .take()
291                .into_iter()
292                .flatten()
293                .chain([barrier_info.clone()])
294                .map(|barrier_info| (barrier_info, None))
295                .collect(),
296            CreatingStreamingJobStatus::Finishing { .. }
297            | CreatingStreamingJobStatus::Resetting(..) => {
298                vec![]
299            }
300            CreatingStreamingJobStatus::PlaceHolder => {
301                unreachable!()
302            }
303        }
304    }
305
306    pub(super) fn new_fake_barrier(
307        prev_epoch_fake_physical_time: &mut u64,
308        pending_non_checkpoint_barriers: &mut Vec<u64>,
309        kind: PbBarrierKind,
310    ) -> BarrierInfo {
311        {
312            {
313                let prev_epoch =
314                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
315                *prev_epoch_fake_physical_time += 1;
316                let curr_epoch =
317                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
318                let kind = match kind {
319                    PbBarrierKind::Unspecified => {
320                        unreachable!()
321                    }
322                    PbBarrierKind::Initial => {
323                        assert!(pending_non_checkpoint_barriers.is_empty());
324                        BarrierKind::Initial
325                    }
326                    PbBarrierKind::Barrier => {
327                        pending_non_checkpoint_barriers.push(prev_epoch.value().0);
328                        BarrierKind::Barrier
329                    }
330                    PbBarrierKind::Checkpoint => {
331                        pending_non_checkpoint_barriers.push(prev_epoch.value().0);
332                        BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
333                    }
334                };
335                BarrierInfo {
336                    prev_epoch,
337                    curr_epoch,
338                    kind,
339                }
340            }
341        }
342    }
343
344    pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
345        match self {
346            CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
347            | CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
348                Some(&info.fragment_infos)
349            }
350            CreatingStreamingJobStatus::Finishing(..)
351            | CreatingStreamingJobStatus::Resetting(..) => None,
352            CreatingStreamingJobStatus::PlaceHolder => {
353                unreachable!()
354            }
355        }
356    }
357
358    pub(super) fn creating_job_info(&self) -> Option<&CreatingJobInfo> {
359        match self {
360            CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
361            | CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => Some(info),
362            CreatingStreamingJobStatus::Finishing(..)
363            | CreatingStreamingJobStatus::Resetting(_) => None,
364            CreatingStreamingJobStatus::PlaceHolder => {
365                unreachable!()
366            }
367        }
368    }
369}