risingwave_meta/barrier/checkpoint/creating_job/
status.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::FragmentId;
25use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
26use risingwave_pb::stream_plan::barrier::PbBarrierKind;
27use risingwave_pb::stream_plan::barrier_mutation::Mutation;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    CreateMviewProgress, PbCreateMviewProgress,
30};
31use tracing::warn;
32
33use crate::barrier::checkpoint::creating_job::CreatingJobInfo;
34use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
35use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
36use crate::controller::fragment::InflightFragmentInfo;
37
38#[derive(Debug)]
39pub(super) struct CreateMviewLogStoreProgressTracker {
40    /// `actor_id` -> `pending_epoch_lag`
41    ongoing_actors: HashMap<ActorId, u64>,
42    finished_actors: HashSet<ActorId>,
43}
44
45impl CreateMviewLogStoreProgressTracker {
46    pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
47        Self {
48            ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
49            finished_actors: HashSet::new(),
50        }
51    }
52
53    pub(super) fn gen_ddl_progress(&self) -> String {
54        let sum = self.ongoing_actors.values().sum::<u64>() as f64;
55        let count = if self.ongoing_actors.is_empty() {
56            1
57        } else {
58            self.ongoing_actors.len()
59        } as f64;
60        let avg = sum / count;
61        let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
62        format!(
63            "actor: {}/{}, avg lag {:?}",
64            self.finished_actors.len(),
65            self.ongoing_actors.len() + self.finished_actors.len(),
66            avg_lag_time
67        )
68    }
69
70    fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
71        for progress in progress {
72            match self.ongoing_actors.entry(progress.backfill_actor_id) {
73                Entry::Occupied(mut entry) => {
74                    if progress.done {
75                        entry.remove_entry();
76                        assert!(
77                            self.finished_actors.insert(progress.backfill_actor_id),
78                            "non-duplicate"
79                        );
80                    } else {
81                        *entry.get_mut() = progress.pending_epoch_lag as _;
82                    }
83                }
84                Entry::Vacant(_) => {
85                    if cfg!(debug_assertions) {
86                        panic!(
87                            "reporting progress on non-inflight actor: {:?} {:?}",
88                            progress, self
89                        );
90                    } else {
91                        warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
92                    }
93                }
94            }
95        }
96    }
97
98    pub(super) fn is_finished(&self) -> bool {
99        self.ongoing_actors.is_empty()
100    }
101}
102
103#[derive(Debug)]
104pub(super) enum CreatingStreamingJobStatus {
105    /// The creating job is consuming upstream snapshot.
106    /// Will transit to `ConsumingLogStore` on `update_progress` when
107    /// the snapshot has been fully consumed after `update_progress`.
108    ConsumingSnapshot {
109        prev_epoch_fake_physical_time: u64,
110        pending_upstream_barriers: Vec<BarrierInfo>,
111        version_stats: HummockVersionStats,
112        create_mview_tracker: CreateMviewProgressTracker,
113        snapshot_backfill_actors: HashSet<ActorId>,
114        snapshot_epoch: u64,
115        info: CreatingJobInfo,
116        /// The `prev_epoch` of pending non checkpoint barriers
117        pending_non_checkpoint_barriers: Vec<u64>,
118    },
119    /// The creating job is consuming log store.
120    ///
121    /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
122    ConsumingLogStore {
123        tracking_job: TrackingJob,
124        info: CreatingJobInfo,
125        log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
126        barriers_to_inject: Option<Vec<BarrierInfo>>,
127    },
128    /// All backfill actors have started consuming upstream, and the job
129    /// will be finished when all previously injected barriers have been collected
130    /// Store the `prev_epoch` that will finish at.
131    Finishing(u64, TrackingJob),
132    PlaceHolder,
133}
134
135impl CreatingStreamingJobStatus {
136    pub(super) fn update_progress(
137        &mut self,
138        create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
139    ) {
140        match self {
141            &mut Self::ConsumingSnapshot {
142                ref mut create_mview_tracker,
143                ref version_stats,
144                ref mut prev_epoch_fake_physical_time,
145                ref mut pending_upstream_barriers,
146                ref mut pending_non_checkpoint_barriers,
147                ref snapshot_epoch,
148                ..
149            } => {
150                for progress in create_mview_progress {
151                    create_mview_tracker.apply_progress(progress, version_stats);
152                }
153                if create_mview_tracker.is_finished() {
154                    pending_non_checkpoint_barriers.push(*snapshot_epoch);
155
156                    let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
157                    let barriers_to_inject: Vec<_> = [BarrierInfo {
158                        curr_epoch: TracedEpoch::new(Epoch(*snapshot_epoch)),
159                        prev_epoch: TracedEpoch::new(prev_epoch),
160                        kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
161                    }]
162                    .into_iter()
163                    .chain(pending_upstream_barriers.drain(..))
164                    .collect();
165
166                    let CreatingStreamingJobStatus::ConsumingSnapshot {
167                        create_mview_tracker,
168                        info,
169                        snapshot_epoch,
170                        snapshot_backfill_actors,
171                        ..
172                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
173                    else {
174                        unreachable!()
175                    };
176
177                    let tracking_job = create_mview_tracker.into_tracking_job();
178
179                    *self = CreatingStreamingJobStatus::ConsumingLogStore {
180                        tracking_job,
181                        info,
182                        log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
183                            snapshot_backfill_actors.iter().cloned(),
184                            barriers_to_inject
185                                .last()
186                                .map(|barrier_info| {
187                                    barrier_info.prev_epoch().saturating_sub(snapshot_epoch)
188                                })
189                                .unwrap_or(0),
190                        ),
191                        barriers_to_inject: Some(barriers_to_inject),
192                    };
193                }
194            }
195            CreatingStreamingJobStatus::ConsumingLogStore {
196                log_store_progress_tracker,
197                ..
198            } => {
199                log_store_progress_tracker.update(create_mview_progress);
200            }
201            CreatingStreamingJobStatus::Finishing(..) => {}
202            CreatingStreamingJobStatus::PlaceHolder => {
203                unreachable!()
204            }
205        }
206    }
207
208    pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) -> CreatingJobInfo {
209        match self {
210            CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
211                unreachable!(
212                    "should not start consuming upstream for a job that are consuming snapshot"
213                )
214            }
215            CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
216                let prev_epoch = barrier_info.prev_epoch();
217                {
218                    assert!(barrier_info.kind.is_checkpoint());
219                    let CreatingStreamingJobStatus::ConsumingLogStore {
220                        info, tracking_job, ..
221                    } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
222                    else {
223                        unreachable!()
224                    };
225                    *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
226                    info
227                }
228            }
229            CreatingStreamingJobStatus::Finishing { .. } => {
230                unreachable!("should not start consuming upstream for a job again")
231            }
232            CreatingStreamingJobStatus::PlaceHolder => {
233                unreachable!()
234            }
235        }
236    }
237
238    pub(super) fn on_new_upstream_epoch(
239        &mut self,
240        barrier_info: &BarrierInfo,
241    ) -> Vec<(BarrierInfo, Option<Mutation>)> {
242        match self {
243            CreatingStreamingJobStatus::ConsumingSnapshot {
244                pending_upstream_barriers,
245                prev_epoch_fake_physical_time,
246                pending_non_checkpoint_barriers,
247                create_mview_tracker,
248                ..
249            } => {
250                let mutation = {
251                    let pending_backfill_nodes = create_mview_tracker
252                        .take_pending_backfill_nodes()
253                        .collect_vec();
254                    if pending_backfill_nodes.is_empty() {
255                        None
256                    } else {
257                        Some(Mutation::StartFragmentBackfill(
258                            StartFragmentBackfillMutation {
259                                fragment_ids: pending_backfill_nodes,
260                            },
261                        ))
262                    }
263                };
264                pending_upstream_barriers.push(barrier_info.clone());
265                vec![(
266                    CreatingStreamingJobStatus::new_fake_barrier(
267                        prev_epoch_fake_physical_time,
268                        pending_non_checkpoint_barriers,
269                        match barrier_info.kind {
270                            BarrierKind::Barrier => PbBarrierKind::Barrier,
271                            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
272                            BarrierKind::Initial => {
273                                unreachable!("upstream new epoch should not be initial")
274                            }
275                        },
276                    ),
277                    mutation,
278                )]
279            }
280            CreatingStreamingJobStatus::ConsumingLogStore {
281                barriers_to_inject, ..
282            } => barriers_to_inject
283                .take()
284                .into_iter()
285                .flatten()
286                .chain([barrier_info.clone()])
287                .map(|barrier_info| (barrier_info, None))
288                .collect(),
289            CreatingStreamingJobStatus::Finishing { .. } => {
290                vec![]
291            }
292            CreatingStreamingJobStatus::PlaceHolder => {
293                unreachable!()
294            }
295        }
296    }
297
298    pub(super) fn new_fake_barrier(
299        prev_epoch_fake_physical_time: &mut u64,
300        pending_non_checkpoint_barriers: &mut Vec<u64>,
301        kind: PbBarrierKind,
302    ) -> BarrierInfo {
303        {
304            {
305                let prev_epoch =
306                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
307                *prev_epoch_fake_physical_time += 1;
308                let curr_epoch =
309                    TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
310                pending_non_checkpoint_barriers.push(prev_epoch.value().0);
311                let kind = match kind {
312                    PbBarrierKind::Unspecified => {
313                        unreachable!()
314                    }
315                    PbBarrierKind::Initial => {
316                        pending_non_checkpoint_barriers.clear();
317                        BarrierKind::Initial
318                    }
319                    PbBarrierKind::Barrier => BarrierKind::Barrier,
320                    PbBarrierKind::Checkpoint => {
321                        BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
322                    }
323                };
324                BarrierInfo {
325                    prev_epoch,
326                    curr_epoch,
327                    kind,
328                }
329            }
330        }
331    }
332
333    pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
334        match self {
335            CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
336            | CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
337                Some(&info.fragment_infos)
338            }
339            CreatingStreamingJobStatus::Finishing(..) => None,
340            CreatingStreamingJobStatus::PlaceHolder => {
341                unreachable!()
342            }
343        }
344    }
345}