risingwave_meta/barrier/checkpoint/creating_job/
status.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::FragmentId;
25use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
26use risingwave_pb::stream_plan::barrier::PbBarrierKind;
27use risingwave_pb::stream_plan::barrier_mutation::Mutation;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 CreateMviewProgress, PbCreateMviewProgress,
30};
31use tracing::warn;
32
33use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
34use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
35use crate::controller::fragment::InflightFragmentInfo;
36
37#[derive(Debug)]
38pub(super) struct CreateMviewLogStoreProgressTracker {
39 ongoing_actors: HashMap<ActorId, u64>,
41 finished_actors: HashSet<ActorId>,
42}
43
44impl CreateMviewLogStoreProgressTracker {
45 pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
46 Self {
47 ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
48 finished_actors: HashSet::new(),
49 }
50 }
51
52 pub(super) fn gen_ddl_progress(&self) -> String {
53 let sum = self.ongoing_actors.values().sum::<u64>() as f64;
54 let count = if self.ongoing_actors.is_empty() {
55 1
56 } else {
57 self.ongoing_actors.len()
58 } as f64;
59 let avg = sum / count;
60 let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
61 format!(
62 "actor: {}/{}, avg lag {:?}",
63 self.finished_actors.len(),
64 self.ongoing_actors.len() + self.finished_actors.len(),
65 avg_lag_time
66 )
67 }
68
69 fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
70 for progress in progress {
71 match self.ongoing_actors.entry(progress.backfill_actor_id) {
72 Entry::Occupied(mut entry) => {
73 if progress.done {
74 entry.remove_entry();
75 assert!(
76 self.finished_actors.insert(progress.backfill_actor_id),
77 "non-duplicate"
78 );
79 } else {
80 *entry.get_mut() = progress.pending_epoch_lag as _;
81 }
82 }
83 Entry::Vacant(_) => {
84 if cfg!(debug_assertions) {
85 panic!(
86 "reporting progress on non-inflight actor: {:?} {:?}",
87 progress, self
88 );
89 } else {
90 warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
91 }
92 }
93 }
94 }
95 }
96
97 pub(super) fn is_finished(&self) -> bool {
98 self.ongoing_actors.is_empty()
99 }
100}
101
102#[derive(Debug)]
103pub(super) enum CreatingStreamingJobStatus {
104 ConsumingSnapshot {
108 prev_epoch_fake_physical_time: u64,
109 pending_upstream_barriers: Vec<BarrierInfo>,
110 version_stats: HummockVersionStats,
111 create_mview_tracker: CreateMviewProgressTracker,
112 snapshot_backfill_actors: HashSet<ActorId>,
113 backfill_epoch: u64,
114 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
115 pending_non_checkpoint_barriers: Vec<u64>,
117 },
118 ConsumingLogStore {
122 tracking_job: TrackingJob,
123 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
124 log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
125 barriers_to_inject: Option<Vec<BarrierInfo>>,
126 },
127 Finishing(u64, TrackingJob),
131 PlaceHolder,
132}
133
134impl CreatingStreamingJobStatus {
135 pub(super) fn update_progress(
136 &mut self,
137 create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
138 ) {
139 match self {
140 &mut Self::ConsumingSnapshot {
141 ref mut create_mview_tracker,
142 ref version_stats,
143 ref mut prev_epoch_fake_physical_time,
144 ref mut pending_upstream_barriers,
145 ref mut pending_non_checkpoint_barriers,
146 ref backfill_epoch,
147 ..
148 } => {
149 for progress in create_mview_progress {
150 create_mview_tracker.apply_progress(progress, version_stats);
151 }
152 if create_mview_tracker.is_finished() {
153 pending_non_checkpoint_barriers.push(*backfill_epoch);
154
155 let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
156 let barriers_to_inject: Vec<_> = [BarrierInfo {
157 curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
158 prev_epoch: TracedEpoch::new(prev_epoch),
159 kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
160 }]
161 .into_iter()
162 .chain(pending_upstream_barriers.drain(..))
163 .collect();
164
165 let CreatingStreamingJobStatus::ConsumingSnapshot {
166 create_mview_tracker,
167 fragment_infos,
168 backfill_epoch,
169 snapshot_backfill_actors,
170 ..
171 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
172 else {
173 unreachable!()
174 };
175
176 let tracking_job = create_mview_tracker.into_tracking_job();
177
178 *self = CreatingStreamingJobStatus::ConsumingLogStore {
179 tracking_job,
180 fragment_infos,
181 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
182 snapshot_backfill_actors.iter().cloned(),
183 barriers_to_inject
184 .last()
185 .map(|barrier_info| {
186 barrier_info.prev_epoch().saturating_sub(backfill_epoch)
187 })
188 .unwrap_or(0),
189 ),
190 barriers_to_inject: Some(barriers_to_inject),
191 };
192 }
193 }
194 CreatingStreamingJobStatus::ConsumingLogStore {
195 log_store_progress_tracker,
196 ..
197 } => {
198 log_store_progress_tracker.update(create_mview_progress);
199 }
200 CreatingStreamingJobStatus::Finishing(..) => {}
201 CreatingStreamingJobStatus::PlaceHolder => {
202 unreachable!()
203 }
204 }
205 }
206
207 pub(super) fn start_consume_upstream(
208 &mut self,
209 barrier_info: &BarrierInfo,
210 ) -> HashMap<FragmentId, InflightFragmentInfo> {
211 match self {
212 CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
213 unreachable!(
214 "should not start consuming upstream for a job that are consuming snapshot"
215 )
216 }
217 CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
218 let prev_epoch = barrier_info.prev_epoch();
219 {
220 assert!(barrier_info.kind.is_checkpoint());
221 let CreatingStreamingJobStatus::ConsumingLogStore {
222 fragment_infos,
223 tracking_job,
224 ..
225 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
226 else {
227 unreachable!()
228 };
229 *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
230 fragment_infos
231 }
232 }
233 CreatingStreamingJobStatus::Finishing { .. } => {
234 unreachable!("should not start consuming upstream for a job again")
235 }
236 CreatingStreamingJobStatus::PlaceHolder => {
237 unreachable!()
238 }
239 }
240 }
241
242 pub(super) fn on_new_upstream_epoch(
243 &mut self,
244 barrier_info: &BarrierInfo,
245 ) -> Vec<(BarrierInfo, Option<Mutation>)> {
246 match self {
247 CreatingStreamingJobStatus::ConsumingSnapshot {
248 pending_upstream_barriers,
249 prev_epoch_fake_physical_time,
250 pending_non_checkpoint_barriers,
251 create_mview_tracker,
252 ..
253 } => {
254 let mutation = {
255 let pending_backfill_nodes = create_mview_tracker
256 .take_pending_backfill_nodes()
257 .collect_vec();
258 if pending_backfill_nodes.is_empty() {
259 None
260 } else {
261 Some(Mutation::StartFragmentBackfill(
262 StartFragmentBackfillMutation {
263 fragment_ids: pending_backfill_nodes,
264 },
265 ))
266 }
267 };
268 pending_upstream_barriers.push(barrier_info.clone());
269 vec![(
270 CreatingStreamingJobStatus::new_fake_barrier(
271 prev_epoch_fake_physical_time,
272 pending_non_checkpoint_barriers,
273 match barrier_info.kind {
274 BarrierKind::Barrier => PbBarrierKind::Barrier,
275 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
276 BarrierKind::Initial => {
277 unreachable!("upstream new epoch should not be initial")
278 }
279 },
280 ),
281 mutation,
282 )]
283 }
284 CreatingStreamingJobStatus::ConsumingLogStore {
285 barriers_to_inject, ..
286 } => barriers_to_inject
287 .take()
288 .into_iter()
289 .flatten()
290 .chain([barrier_info.clone()])
291 .map(|barrier_info| (barrier_info, None))
292 .collect(),
293 CreatingStreamingJobStatus::Finishing { .. } => {
294 vec![]
295 }
296 CreatingStreamingJobStatus::PlaceHolder => {
297 unreachable!()
298 }
299 }
300 }
301
302 pub(super) fn new_fake_barrier(
303 prev_epoch_fake_physical_time: &mut u64,
304 pending_non_checkpoint_barriers: &mut Vec<u64>,
305 kind: PbBarrierKind,
306 ) -> BarrierInfo {
307 {
308 {
309 let prev_epoch =
310 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
311 *prev_epoch_fake_physical_time += 1;
312 let curr_epoch =
313 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
314 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
315 let kind = match kind {
316 PbBarrierKind::Unspecified => {
317 unreachable!()
318 }
319 PbBarrierKind::Initial => {
320 pending_non_checkpoint_barriers.clear();
321 BarrierKind::Initial
322 }
323 PbBarrierKind::Barrier => BarrierKind::Barrier,
324 PbBarrierKind::Checkpoint => {
325 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
326 }
327 };
328 BarrierInfo {
329 prev_epoch,
330 curr_epoch,
331 kind,
332 }
333 }
334 }
335 }
336
337 pub(super) fn is_finishing(&self) -> bool {
338 matches!(self, Self::Finishing(..))
339 }
340
341 pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
342 match self {
343 CreatingStreamingJobStatus::ConsumingSnapshot { fragment_infos, .. } => {
344 Some(fragment_infos)
345 }
346 CreatingStreamingJobStatus::ConsumingLogStore { fragment_infos, .. } => {
347 Some(fragment_infos)
348 }
349 CreatingStreamingJobStatus::Finishing(..) => None,
350 CreatingStreamingJobStatus::PlaceHolder => {
351 unreachable!()
352 }
353 }
354 }
355}