risingwave_meta/barrier/checkpoint/creating_job/
status.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
25use risingwave_pb::stream_plan::barrier::PbBarrierKind;
26use risingwave_pb::stream_plan::barrier_mutation::Mutation;
27use risingwave_pb::stream_service::barrier_complete_response::{
28 CreateMviewProgress, PbCreateMviewProgress,
29};
30use tracing::warn;
31
32use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
33use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
34
35#[derive(Debug)]
36pub(super) struct CreateMviewLogStoreProgressTracker {
37 ongoing_actors: HashMap<ActorId, u64>,
39 finished_actors: HashSet<ActorId>,
40}
41
42impl CreateMviewLogStoreProgressTracker {
43 pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
44 Self {
45 ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
46 finished_actors: HashSet::new(),
47 }
48 }
49
50 pub(super) fn gen_ddl_progress(&self) -> String {
51 let sum = self.ongoing_actors.values().sum::<u64>() as f64;
52 let count = if self.ongoing_actors.is_empty() {
53 1
54 } else {
55 self.ongoing_actors.len()
56 } as f64;
57 let avg = sum / count;
58 let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
59 format!(
60 "actor: {}/{}, avg lag {:?}",
61 self.finished_actors.len(),
62 self.ongoing_actors.len() + self.finished_actors.len(),
63 avg_lag_time
64 )
65 }
66
67 fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
68 for progress in progress {
69 match self.ongoing_actors.entry(progress.backfill_actor_id) {
70 Entry::Occupied(mut entry) => {
71 if progress.done {
72 entry.remove_entry();
73 assert!(
74 self.finished_actors.insert(progress.backfill_actor_id),
75 "non-duplicate"
76 );
77 } else {
78 *entry.get_mut() = progress.pending_epoch_lag as _;
79 }
80 }
81 Entry::Vacant(_) => {
82 if cfg!(debug_assertions) {
83 panic!(
84 "reporting progress on non-inflight actor: {:?} {:?}",
85 progress, self
86 );
87 } else {
88 warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
89 }
90 }
91 }
92 }
93 }
94
95 pub(super) fn is_finished(&self) -> bool {
96 self.ongoing_actors.is_empty()
97 }
98}
99
100#[derive(Debug)]
101pub(super) enum CreatingStreamingJobStatus {
102 ConsumingSnapshot {
106 prev_epoch_fake_physical_time: u64,
107 pending_upstream_barriers: Vec<BarrierInfo>,
108 version_stats: HummockVersionStats,
109 create_mview_tracker: CreateMviewProgressTracker,
110 snapshot_backfill_actors: HashSet<ActorId>,
111 backfill_epoch: u64,
112 pending_non_checkpoint_barriers: Vec<u64>,
114 },
115 ConsumingLogStore {
119 tracking_job: TrackingJob,
120 log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
121 barriers_to_inject: Option<Vec<BarrierInfo>>,
122 },
123 Finishing(u64, TrackingJob),
127 PlaceHolder,
128}
129
130impl CreatingStreamingJobStatus {
131 pub(super) fn update_progress(
132 &mut self,
133 create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
134 ) {
135 match self {
136 &mut Self::ConsumingSnapshot {
137 ref mut create_mview_tracker,
138 ref version_stats,
139 ref mut prev_epoch_fake_physical_time,
140 ref mut pending_upstream_barriers,
141 ref mut pending_non_checkpoint_barriers,
142 ref backfill_epoch,
143 ..
144 } => {
145 for progress in create_mview_progress {
146 create_mview_tracker.apply_progress(progress, version_stats);
147 }
148 if create_mview_tracker.is_finished() {
149 pending_non_checkpoint_barriers.push(*backfill_epoch);
150
151 let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
152 let barriers_to_inject: Vec<_> = [BarrierInfo {
153 curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
154 prev_epoch: TracedEpoch::new(prev_epoch),
155 kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
156 }]
157 .into_iter()
158 .chain(pending_upstream_barriers.drain(..))
159 .collect();
160
161 let CreatingStreamingJobStatus::ConsumingSnapshot {
162 create_mview_tracker,
163 backfill_epoch,
164 snapshot_backfill_actors,
165 ..
166 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
167 else {
168 unreachable!()
169 };
170
171 let tracking_job = create_mview_tracker.into_tracking_job();
172
173 *self = CreatingStreamingJobStatus::ConsumingLogStore {
174 tracking_job,
175 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
176 snapshot_backfill_actors.iter().cloned(),
177 barriers_to_inject
178 .last()
179 .map(|barrier_info| {
180 barrier_info.prev_epoch().saturating_sub(backfill_epoch)
181 })
182 .unwrap_or(0),
183 ),
184 barriers_to_inject: Some(barriers_to_inject),
185 };
186 }
187 }
188 CreatingStreamingJobStatus::ConsumingLogStore {
189 log_store_progress_tracker,
190 ..
191 } => {
192 log_store_progress_tracker.update(create_mview_progress);
193 }
194 CreatingStreamingJobStatus::Finishing(..) => {}
195 CreatingStreamingJobStatus::PlaceHolder => {
196 unreachable!()
197 }
198 }
199 }
200
201 pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) {
202 match self {
203 CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
204 unreachable!(
205 "should not start consuming upstream for a job that are consuming snapshot"
206 )
207 }
208 CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
209 let prev_epoch = barrier_info.prev_epoch();
210 {
211 assert!(barrier_info.kind.is_checkpoint());
212 let CreatingStreamingJobStatus::ConsumingLogStore { tracking_job, .. } =
213 replace(self, CreatingStreamingJobStatus::PlaceHolder)
214 else {
215 unreachable!()
216 };
217 *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
218 }
219 }
220 CreatingStreamingJobStatus::Finishing { .. } => {
221 unreachable!("should not start consuming upstream for a job again")
222 }
223 CreatingStreamingJobStatus::PlaceHolder => {
224 unreachable!()
225 }
226 }
227 }
228
229 pub(super) fn on_new_upstream_epoch(
230 &mut self,
231 barrier_info: &BarrierInfo,
232 ) -> Vec<(BarrierInfo, Option<Mutation>)> {
233 match self {
234 CreatingStreamingJobStatus::ConsumingSnapshot {
235 pending_upstream_barriers,
236 prev_epoch_fake_physical_time,
237 pending_non_checkpoint_barriers,
238 create_mview_tracker,
239 ..
240 } => {
241 let mutation = {
242 let pending_backfill_nodes = create_mview_tracker
243 .take_pending_backfill_nodes()
244 .collect_vec();
245 if pending_backfill_nodes.is_empty() {
246 None
247 } else {
248 Some(Mutation::StartFragmentBackfill(
249 StartFragmentBackfillMutation {
250 fragment_ids: pending_backfill_nodes,
251 },
252 ))
253 }
254 };
255 pending_upstream_barriers.push(barrier_info.clone());
256 vec![(
257 CreatingStreamingJobStatus::new_fake_barrier(
258 prev_epoch_fake_physical_time,
259 pending_non_checkpoint_barriers,
260 match barrier_info.kind {
261 BarrierKind::Barrier => PbBarrierKind::Barrier,
262 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
263 BarrierKind::Initial => {
264 unreachable!("upstream new epoch should not be initial")
265 }
266 },
267 ),
268 mutation,
269 )]
270 }
271 CreatingStreamingJobStatus::ConsumingLogStore {
272 barriers_to_inject, ..
273 } => barriers_to_inject
274 .take()
275 .into_iter()
276 .flatten()
277 .chain([barrier_info.clone()])
278 .map(|barrier_info| (barrier_info, None))
279 .collect(),
280 CreatingStreamingJobStatus::Finishing { .. } => {
281 vec![]
282 }
283 CreatingStreamingJobStatus::PlaceHolder => {
284 unreachable!()
285 }
286 }
287 }
288
289 pub(super) fn new_fake_barrier(
290 prev_epoch_fake_physical_time: &mut u64,
291 pending_non_checkpoint_barriers: &mut Vec<u64>,
292 kind: PbBarrierKind,
293 ) -> BarrierInfo {
294 {
295 {
296 let prev_epoch =
297 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
298 *prev_epoch_fake_physical_time += 1;
299 let curr_epoch =
300 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
301 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
302 let kind = match kind {
303 PbBarrierKind::Unspecified => {
304 unreachable!()
305 }
306 PbBarrierKind::Initial => {
307 pending_non_checkpoint_barriers.clear();
308 BarrierKind::Initial
309 }
310 PbBarrierKind::Barrier => BarrierKind::Barrier,
311 PbBarrierKind::Checkpoint => {
312 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
313 }
314 };
315 BarrierInfo {
316 prev_epoch,
317 curr_epoch,
318 kind,
319 }
320 }
321 }
322 }
323
324 pub(super) fn is_finishing(&self) -> bool {
325 matches!(self, Self::Finishing(..))
326 }
327}