risingwave_meta/barrier/checkpoint/creating_job/
status.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::FragmentId;
25use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
26use risingwave_pb::stream_plan::barrier::PbBarrierKind;
27use risingwave_pb::stream_plan::barrier_mutation::Mutation;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 CreateMviewProgress, PbCreateMviewProgress,
30};
31use tracing::warn;
32
33use crate::barrier::checkpoint::creating_job::CreatingJobInfo;
34use crate::barrier::notifier::Notifier;
35use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
36use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
37use crate::controller::fragment::InflightFragmentInfo;
38
39#[derive(Debug)]
40pub(super) struct CreateMviewLogStoreProgressTracker {
41 ongoing_actors: HashMap<ActorId, u64>,
43 finished_actors: HashSet<ActorId>,
44}
45
46impl CreateMviewLogStoreProgressTracker {
47 pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
48 Self {
49 ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
50 finished_actors: HashSet::new(),
51 }
52 }
53
54 pub(super) fn gen_backfill_progress(&self) -> String {
55 let sum = self.ongoing_actors.values().sum::<u64>() as f64;
56 let count = if self.ongoing_actors.is_empty() {
57 1
58 } else {
59 self.ongoing_actors.len()
60 } as f64;
61 let avg = sum / count;
62 let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
63 format!(
64 "actor: {}/{}, avg lag {:?}",
65 self.finished_actors.len(),
66 self.ongoing_actors.len() + self.finished_actors.len(),
67 avg_lag_time
68 )
69 }
70
71 fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
72 for progress in progress {
73 match self.ongoing_actors.entry(progress.backfill_actor_id) {
74 Entry::Occupied(mut entry) => {
75 if progress.done {
76 entry.remove_entry();
77 assert!(
78 self.finished_actors.insert(progress.backfill_actor_id),
79 "non-duplicate"
80 );
81 } else {
82 *entry.get_mut() = progress.pending_epoch_lag as _;
83 }
84 }
85 Entry::Vacant(_) => {
86 if cfg!(debug_assertions) {
87 panic!(
88 "reporting progress on non-inflight actor: {:?} {:?}",
89 progress, self
90 );
91 } else {
92 warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
93 }
94 }
95 }
96 }
97 }
98
99 pub(super) fn is_finished(&self) -> bool {
100 self.ongoing_actors.is_empty()
101 }
102}
103
104#[derive(Debug)]
105pub(super) enum CreatingStreamingJobStatus {
106 ConsumingSnapshot {
110 prev_epoch_fake_physical_time: u64,
111 pending_upstream_barriers: Vec<BarrierInfo>,
112 version_stats: HummockVersionStats,
113 create_mview_tracker: CreateMviewProgressTracker,
114 snapshot_backfill_actors: HashSet<ActorId>,
115 snapshot_epoch: u64,
116 info: CreatingJobInfo,
117 pending_non_checkpoint_barriers: Vec<u64>,
119 },
120 ConsumingLogStore {
124 tracking_job: TrackingJob,
125 info: CreatingJobInfo,
126 log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
127 barriers_to_inject: Option<Vec<BarrierInfo>>,
128 },
129 Finishing(u64, TrackingJob),
133 Resetting(Vec<Notifier>),
134 PlaceHolder,
135}
136
137impl CreatingStreamingJobStatus {
138 pub(super) fn update_progress(
139 &mut self,
140 create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
141 ) {
142 match self {
143 &mut Self::ConsumingSnapshot {
144 ref mut create_mview_tracker,
145 ref version_stats,
146 ref mut prev_epoch_fake_physical_time,
147 ref mut pending_upstream_barriers,
148 ref mut pending_non_checkpoint_barriers,
149 ref snapshot_epoch,
150 ..
151 } => {
152 for progress in create_mview_progress {
153 create_mview_tracker.apply_progress(progress, version_stats);
154 }
155 if create_mview_tracker.is_finished() {
156 pending_non_checkpoint_barriers.push(*snapshot_epoch);
157
158 let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
159 let barriers_to_inject: Vec<_> = [BarrierInfo {
160 curr_epoch: TracedEpoch::new(Epoch(*snapshot_epoch)),
161 prev_epoch: TracedEpoch::new(prev_epoch),
162 kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
163 }]
164 .into_iter()
165 .chain(pending_upstream_barriers.drain(..))
166 .collect();
167
168 let CreatingStreamingJobStatus::ConsumingSnapshot {
169 create_mview_tracker,
170 info,
171 snapshot_epoch,
172 snapshot_backfill_actors,
173 ..
174 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
175 else {
176 unreachable!()
177 };
178
179 let tracking_job = create_mview_tracker.into_tracking_job();
180
181 *self = CreatingStreamingJobStatus::ConsumingLogStore {
182 tracking_job,
183 info,
184 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
185 snapshot_backfill_actors.iter().cloned(),
186 barriers_to_inject
187 .last()
188 .map(|barrier_info| {
189 barrier_info.prev_epoch().saturating_sub(snapshot_epoch)
190 })
191 .unwrap_or(0),
192 ),
193 barriers_to_inject: Some(barriers_to_inject),
194 };
195 }
196 }
197 CreatingStreamingJobStatus::ConsumingLogStore {
198 log_store_progress_tracker,
199 ..
200 } => {
201 log_store_progress_tracker.update(create_mview_progress);
202 }
203 CreatingStreamingJobStatus::Finishing(..)
204 | CreatingStreamingJobStatus::Resetting(..) => {}
205 CreatingStreamingJobStatus::PlaceHolder => {
206 unreachable!()
207 }
208 }
209 }
210
211 pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) -> CreatingJobInfo {
212 match self {
213 CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
214 unreachable!(
215 "should not start consuming upstream for a job that are consuming snapshot"
216 )
217 }
218 CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
219 let prev_epoch = barrier_info.prev_epoch();
220 {
221 assert!(barrier_info.kind.is_checkpoint());
222 let CreatingStreamingJobStatus::ConsumingLogStore {
223 info, tracking_job, ..
224 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
225 else {
226 unreachable!()
227 };
228 *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
229 info
230 }
231 }
232 CreatingStreamingJobStatus::Finishing { .. } => {
233 unreachable!("should not start consuming upstream for a job again")
234 }
235 CreatingStreamingJobStatus::Resetting(..) => {
236 unreachable!("unlikely to start consume upstream when resetting")
237 }
238 CreatingStreamingJobStatus::PlaceHolder => {
239 unreachable!()
240 }
241 }
242 }
243
244 pub(super) fn on_new_upstream_epoch(
245 &mut self,
246 barrier_info: &BarrierInfo,
247 mutation: Option<Mutation>, ) -> Vec<(BarrierInfo, Option<Mutation>)> {
249 match self {
250 CreatingStreamingJobStatus::ConsumingSnapshot {
251 pending_upstream_barriers,
252 prev_epoch_fake_physical_time,
253 pending_non_checkpoint_barriers,
254 create_mview_tracker,
255 ..
256 } => {
257 let mutation = mutation.or_else(|| {
258 let pending_backfill_nodes = create_mview_tracker
259 .take_pending_backfill_nodes()
260 .collect_vec();
261 if pending_backfill_nodes.is_empty() {
262 None
263 } else {
264 Some(Mutation::StartFragmentBackfill(
265 StartFragmentBackfillMutation {
266 fragment_ids: pending_backfill_nodes,
267 },
268 ))
269 }
270 });
271 pending_upstream_barriers.push(barrier_info.clone());
272 vec![(
273 CreatingStreamingJobStatus::new_fake_barrier(
274 prev_epoch_fake_physical_time,
275 pending_non_checkpoint_barriers,
276 match barrier_info.kind {
277 BarrierKind::Barrier => PbBarrierKind::Barrier,
278 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
279 BarrierKind::Initial => {
280 unreachable!("upstream new epoch should not be initial")
281 }
282 },
283 ),
284 mutation,
285 )]
286 }
287 CreatingStreamingJobStatus::ConsumingLogStore {
288 barriers_to_inject, ..
289 } => barriers_to_inject
290 .take()
291 .into_iter()
292 .flatten()
293 .chain([barrier_info.clone()])
294 .map(|barrier_info| (barrier_info, None))
295 .collect(),
296 CreatingStreamingJobStatus::Finishing { .. }
297 | CreatingStreamingJobStatus::Resetting(..) => {
298 vec![]
299 }
300 CreatingStreamingJobStatus::PlaceHolder => {
301 unreachable!()
302 }
303 }
304 }
305
306 pub(super) fn new_fake_barrier(
307 prev_epoch_fake_physical_time: &mut u64,
308 pending_non_checkpoint_barriers: &mut Vec<u64>,
309 kind: PbBarrierKind,
310 ) -> BarrierInfo {
311 {
312 {
313 let prev_epoch =
314 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
315 *prev_epoch_fake_physical_time += 1;
316 let curr_epoch =
317 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
318 let kind = match kind {
319 PbBarrierKind::Unspecified => {
320 unreachable!()
321 }
322 PbBarrierKind::Initial => {
323 assert!(pending_non_checkpoint_barriers.is_empty());
324 BarrierKind::Initial
325 }
326 PbBarrierKind::Barrier => {
327 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
328 BarrierKind::Barrier
329 }
330 PbBarrierKind::Checkpoint => {
331 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
332 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
333 }
334 };
335 BarrierInfo {
336 prev_epoch,
337 curr_epoch,
338 kind,
339 }
340 }
341 }
342 }
343
344 pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
345 match self {
346 CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
347 | CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
348 Some(&info.fragment_infos)
349 }
350 CreatingStreamingJobStatus::Finishing(..)
351 | CreatingStreamingJobStatus::Resetting(..) => None,
352 CreatingStreamingJobStatus::PlaceHolder => {
353 unreachable!()
354 }
355 }
356 }
357
358 pub(super) fn creating_job_info(&self) -> Option<&CreatingJobInfo> {
359 match self {
360 CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
361 | CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => Some(info),
362 CreatingStreamingJobStatus::Finishing(..)
363 | CreatingStreamingJobStatus::Resetting(_) => None,
364 CreatingStreamingJobStatus::PlaceHolder => {
365 unreachable!()
366 }
367 }
368 }
369}