risingwave_meta/barrier/checkpoint/creating_job/
status.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::FragmentId;
25use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
26use risingwave_pb::stream_plan::barrier::PbBarrierKind;
27use risingwave_pb::stream_plan::barrier_mutation::Mutation;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 CreateMviewProgress, PbCreateMviewProgress,
30};
31use tracing::warn;
32
33use crate::barrier::checkpoint::creating_job::CreatingJobInfo;
34use crate::barrier::checkpoint::recovery::ResetPartialGraphCollector;
35use crate::barrier::notifier::Notifier;
36use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
37use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
38use crate::controller::fragment::InflightFragmentInfo;
39
40#[derive(Debug)]
41pub(super) struct CreateMviewLogStoreProgressTracker {
42 ongoing_actors: HashMap<ActorId, u64>,
44 finished_actors: HashSet<ActorId>,
45}
46
47impl CreateMviewLogStoreProgressTracker {
48 pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
49 Self {
50 ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
51 finished_actors: HashSet::new(),
52 }
53 }
54
55 pub(super) fn gen_backfill_progress(&self) -> String {
56 let sum = self.ongoing_actors.values().sum::<u64>() as f64;
57 let count = if self.ongoing_actors.is_empty() {
58 1
59 } else {
60 self.ongoing_actors.len()
61 } as f64;
62 let avg = sum / count;
63 let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
64 format!(
65 "actor: {}/{}, avg lag {:?}",
66 self.finished_actors.len(),
67 self.ongoing_actors.len() + self.finished_actors.len(),
68 avg_lag_time
69 )
70 }
71
72 fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
73 for progress in progress {
74 match self.ongoing_actors.entry(progress.backfill_actor_id) {
75 Entry::Occupied(mut entry) => {
76 if progress.done {
77 entry.remove_entry();
78 assert!(
79 self.finished_actors.insert(progress.backfill_actor_id),
80 "non-duplicate"
81 );
82 } else {
83 *entry.get_mut() = progress.pending_epoch_lag as _;
84 }
85 }
86 Entry::Vacant(_) => {
87 if cfg!(debug_assertions) {
88 panic!(
89 "reporting progress on non-inflight actor: {:?} {:?}",
90 progress, self
91 );
92 } else {
93 warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
94 }
95 }
96 }
97 }
98 }
99
100 pub(super) fn is_finished(&self) -> bool {
101 self.ongoing_actors.is_empty()
102 }
103}
104
105#[derive(Debug)]
106pub(super) enum CreatingStreamingJobStatus {
107 ConsumingSnapshot {
111 prev_epoch_fake_physical_time: u64,
112 pending_upstream_barriers: Vec<BarrierInfo>,
113 version_stats: HummockVersionStats,
114 create_mview_tracker: CreateMviewProgressTracker,
115 snapshot_backfill_actors: HashSet<ActorId>,
116 snapshot_epoch: u64,
117 info: CreatingJobInfo,
118 pending_non_checkpoint_barriers: Vec<u64>,
120 },
121 ConsumingLogStore {
125 tracking_job: TrackingJob,
126 info: CreatingJobInfo,
127 log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
128 barriers_to_inject: Option<Vec<BarrierInfo>>,
129 },
130 Finishing(u64, TrackingJob),
134 Resetting(ResetPartialGraphCollector, Vec<Notifier>),
135 PlaceHolder,
136}
137
138impl CreatingStreamingJobStatus {
139 pub(super) fn update_progress(
140 &mut self,
141 create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
142 ) {
143 match self {
144 &mut Self::ConsumingSnapshot {
145 ref mut create_mview_tracker,
146 ref version_stats,
147 ref mut prev_epoch_fake_physical_time,
148 ref mut pending_upstream_barriers,
149 ref mut pending_non_checkpoint_barriers,
150 ref snapshot_epoch,
151 ..
152 } => {
153 for progress in create_mview_progress {
154 create_mview_tracker.apply_progress(progress, version_stats);
155 }
156 if create_mview_tracker.is_finished() {
157 pending_non_checkpoint_barriers.push(*snapshot_epoch);
158
159 let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
160 let barriers_to_inject: Vec<_> = [BarrierInfo {
161 curr_epoch: TracedEpoch::new(Epoch(*snapshot_epoch)),
162 prev_epoch: TracedEpoch::new(prev_epoch),
163 kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
164 }]
165 .into_iter()
166 .chain(pending_upstream_barriers.drain(..))
167 .collect();
168
169 let CreatingStreamingJobStatus::ConsumingSnapshot {
170 create_mview_tracker,
171 info,
172 snapshot_epoch,
173 snapshot_backfill_actors,
174 ..
175 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
176 else {
177 unreachable!()
178 };
179
180 let tracking_job = create_mview_tracker.into_tracking_job();
181
182 *self = CreatingStreamingJobStatus::ConsumingLogStore {
183 tracking_job,
184 info,
185 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
186 snapshot_backfill_actors.iter().cloned(),
187 barriers_to_inject
188 .last()
189 .map(|barrier_info| {
190 barrier_info.prev_epoch().saturating_sub(snapshot_epoch)
191 })
192 .unwrap_or(0),
193 ),
194 barriers_to_inject: Some(barriers_to_inject),
195 };
196 }
197 }
198 CreatingStreamingJobStatus::ConsumingLogStore {
199 log_store_progress_tracker,
200 ..
201 } => {
202 log_store_progress_tracker.update(create_mview_progress);
203 }
204 CreatingStreamingJobStatus::Finishing(..)
205 | CreatingStreamingJobStatus::Resetting(_, _) => {}
206 CreatingStreamingJobStatus::PlaceHolder => {
207 unreachable!()
208 }
209 }
210 }
211
212 pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) -> CreatingJobInfo {
213 match self {
214 CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
215 unreachable!(
216 "should not start consuming upstream for a job that are consuming snapshot"
217 )
218 }
219 CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
220 let prev_epoch = barrier_info.prev_epoch();
221 {
222 assert!(barrier_info.kind.is_checkpoint());
223 let CreatingStreamingJobStatus::ConsumingLogStore {
224 info, tracking_job, ..
225 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
226 else {
227 unreachable!()
228 };
229 *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
230 info
231 }
232 }
233 CreatingStreamingJobStatus::Finishing { .. } => {
234 unreachable!("should not start consuming upstream for a job again")
235 }
236 CreatingStreamingJobStatus::Resetting(_, _) => {
237 unreachable!("unlikely to start consume upstream when resetting")
238 }
239 CreatingStreamingJobStatus::PlaceHolder => {
240 unreachable!()
241 }
242 }
243 }
244
245 pub(super) fn on_new_upstream_epoch(
246 &mut self,
247 barrier_info: &BarrierInfo,
248 mutation: Option<Mutation>, ) -> Vec<(BarrierInfo, Option<Mutation>)> {
250 match self {
251 CreatingStreamingJobStatus::ConsumingSnapshot {
252 pending_upstream_barriers,
253 prev_epoch_fake_physical_time,
254 pending_non_checkpoint_barriers,
255 create_mview_tracker,
256 ..
257 } => {
258 let mutation = mutation.or_else(|| {
259 let pending_backfill_nodes = create_mview_tracker
260 .take_pending_backfill_nodes()
261 .collect_vec();
262 if pending_backfill_nodes.is_empty() {
263 None
264 } else {
265 Some(Mutation::StartFragmentBackfill(
266 StartFragmentBackfillMutation {
267 fragment_ids: pending_backfill_nodes,
268 },
269 ))
270 }
271 });
272 pending_upstream_barriers.push(barrier_info.clone());
273 vec![(
274 CreatingStreamingJobStatus::new_fake_barrier(
275 prev_epoch_fake_physical_time,
276 pending_non_checkpoint_barriers,
277 match barrier_info.kind {
278 BarrierKind::Barrier => PbBarrierKind::Barrier,
279 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
280 BarrierKind::Initial => {
281 unreachable!("upstream new epoch should not be initial")
282 }
283 },
284 ),
285 mutation,
286 )]
287 }
288 CreatingStreamingJobStatus::ConsumingLogStore {
289 barriers_to_inject, ..
290 } => barriers_to_inject
291 .take()
292 .into_iter()
293 .flatten()
294 .chain([barrier_info.clone()])
295 .map(|barrier_info| (barrier_info, None))
296 .collect(),
297 CreatingStreamingJobStatus::Finishing { .. }
298 | CreatingStreamingJobStatus::Resetting(_, _) => {
299 vec![]
300 }
301 CreatingStreamingJobStatus::PlaceHolder => {
302 unreachable!()
303 }
304 }
305 }
306
307 pub(super) fn new_fake_barrier(
308 prev_epoch_fake_physical_time: &mut u64,
309 pending_non_checkpoint_barriers: &mut Vec<u64>,
310 kind: PbBarrierKind,
311 ) -> BarrierInfo {
312 {
313 {
314 let prev_epoch =
315 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
316 *prev_epoch_fake_physical_time += 1;
317 let curr_epoch =
318 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
319 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
320 let kind = match kind {
321 PbBarrierKind::Unspecified => {
322 unreachable!()
323 }
324 PbBarrierKind::Initial => {
325 pending_non_checkpoint_barriers.clear();
326 BarrierKind::Initial
327 }
328 PbBarrierKind::Barrier => BarrierKind::Barrier,
329 PbBarrierKind::Checkpoint => {
330 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
331 }
332 };
333 BarrierInfo {
334 prev_epoch,
335 curr_epoch,
336 kind,
337 }
338 }
339 }
340 }
341
342 pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
343 match self {
344 CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
345 | CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
346 Some(&info.fragment_infos)
347 }
348 CreatingStreamingJobStatus::Finishing(..)
349 | CreatingStreamingJobStatus::Resetting(_, _) => None,
350 CreatingStreamingJobStatus::PlaceHolder => {
351 unreachable!()
352 }
353 }
354 }
355}