risingwave_meta/barrier/checkpoint/creating_job/
status.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::{replace, take};
18use std::time::Duration;
19
20use itertools::Itertools;
21use risingwave_common::hash::ActorId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::FragmentId;
25use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
26use risingwave_pb::stream_plan::barrier::PbBarrierKind;
27use risingwave_pb::stream_plan::barrier_mutation::Mutation;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 CreateMviewProgress, PbCreateMviewProgress,
30};
31use tracing::warn;
32
33use crate::barrier::checkpoint::creating_job::CreatingJobInfo;
34use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
35use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
36use crate::controller::fragment::InflightFragmentInfo;
37
38#[derive(Debug)]
39pub(super) struct CreateMviewLogStoreProgressTracker {
40 ongoing_actors: HashMap<ActorId, u64>,
42 finished_actors: HashSet<ActorId>,
43}
44
45impl CreateMviewLogStoreProgressTracker {
46 pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
47 Self {
48 ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
49 finished_actors: HashSet::new(),
50 }
51 }
52
53 pub(super) fn gen_ddl_progress(&self) -> String {
54 let sum = self.ongoing_actors.values().sum::<u64>() as f64;
55 let count = if self.ongoing_actors.is_empty() {
56 1
57 } else {
58 self.ongoing_actors.len()
59 } as f64;
60 let avg = sum / count;
61 let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
62 format!(
63 "actor: {}/{}, avg lag {:?}",
64 self.finished_actors.len(),
65 self.ongoing_actors.len() + self.finished_actors.len(),
66 avg_lag_time
67 )
68 }
69
70 fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
71 for progress in progress {
72 match self.ongoing_actors.entry(progress.backfill_actor_id) {
73 Entry::Occupied(mut entry) => {
74 if progress.done {
75 entry.remove_entry();
76 assert!(
77 self.finished_actors.insert(progress.backfill_actor_id),
78 "non-duplicate"
79 );
80 } else {
81 *entry.get_mut() = progress.pending_epoch_lag as _;
82 }
83 }
84 Entry::Vacant(_) => {
85 if cfg!(debug_assertions) {
86 panic!(
87 "reporting progress on non-inflight actor: {:?} {:?}",
88 progress, self
89 );
90 } else {
91 warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
92 }
93 }
94 }
95 }
96 }
97
98 pub(super) fn is_finished(&self) -> bool {
99 self.ongoing_actors.is_empty()
100 }
101}
102
103#[derive(Debug)]
104pub(super) enum CreatingStreamingJobStatus {
105 ConsumingSnapshot {
109 prev_epoch_fake_physical_time: u64,
110 pending_upstream_barriers: Vec<BarrierInfo>,
111 version_stats: HummockVersionStats,
112 create_mview_tracker: CreateMviewProgressTracker,
113 snapshot_backfill_actors: HashSet<ActorId>,
114 snapshot_epoch: u64,
115 info: CreatingJobInfo,
116 pending_non_checkpoint_barriers: Vec<u64>,
118 },
119 ConsumingLogStore {
123 tracking_job: TrackingJob,
124 info: CreatingJobInfo,
125 log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
126 barriers_to_inject: Option<Vec<BarrierInfo>>,
127 },
128 Finishing(u64, TrackingJob),
132 PlaceHolder,
133}
134
135impl CreatingStreamingJobStatus {
136 pub(super) fn update_progress(
137 &mut self,
138 create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
139 ) {
140 match self {
141 &mut Self::ConsumingSnapshot {
142 ref mut create_mview_tracker,
143 ref version_stats,
144 ref mut prev_epoch_fake_physical_time,
145 ref mut pending_upstream_barriers,
146 ref mut pending_non_checkpoint_barriers,
147 ref snapshot_epoch,
148 ..
149 } => {
150 for progress in create_mview_progress {
151 create_mview_tracker.apply_progress(progress, version_stats);
152 }
153 if create_mview_tracker.is_finished() {
154 pending_non_checkpoint_barriers.push(*snapshot_epoch);
155
156 let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
157 let barriers_to_inject: Vec<_> = [BarrierInfo {
158 curr_epoch: TracedEpoch::new(Epoch(*snapshot_epoch)),
159 prev_epoch: TracedEpoch::new(prev_epoch),
160 kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
161 }]
162 .into_iter()
163 .chain(pending_upstream_barriers.drain(..))
164 .collect();
165
166 let CreatingStreamingJobStatus::ConsumingSnapshot {
167 create_mview_tracker,
168 info,
169 snapshot_epoch,
170 snapshot_backfill_actors,
171 ..
172 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
173 else {
174 unreachable!()
175 };
176
177 let tracking_job = create_mview_tracker.into_tracking_job();
178
179 *self = CreatingStreamingJobStatus::ConsumingLogStore {
180 tracking_job,
181 info,
182 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
183 snapshot_backfill_actors.iter().cloned(),
184 barriers_to_inject
185 .last()
186 .map(|barrier_info| {
187 barrier_info.prev_epoch().saturating_sub(snapshot_epoch)
188 })
189 .unwrap_or(0),
190 ),
191 barriers_to_inject: Some(barriers_to_inject),
192 };
193 }
194 }
195 CreatingStreamingJobStatus::ConsumingLogStore {
196 log_store_progress_tracker,
197 ..
198 } => {
199 log_store_progress_tracker.update(create_mview_progress);
200 }
201 CreatingStreamingJobStatus::Finishing(..) => {}
202 CreatingStreamingJobStatus::PlaceHolder => {
203 unreachable!()
204 }
205 }
206 }
207
208 pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) -> CreatingJobInfo {
209 match self {
210 CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
211 unreachable!(
212 "should not start consuming upstream for a job that are consuming snapshot"
213 )
214 }
215 CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
216 let prev_epoch = barrier_info.prev_epoch();
217 {
218 assert!(barrier_info.kind.is_checkpoint());
219 let CreatingStreamingJobStatus::ConsumingLogStore {
220 info, tracking_job, ..
221 } = replace(self, CreatingStreamingJobStatus::PlaceHolder)
222 else {
223 unreachable!()
224 };
225 *self = CreatingStreamingJobStatus::Finishing(prev_epoch, tracking_job);
226 info
227 }
228 }
229 CreatingStreamingJobStatus::Finishing { .. } => {
230 unreachable!("should not start consuming upstream for a job again")
231 }
232 CreatingStreamingJobStatus::PlaceHolder => {
233 unreachable!()
234 }
235 }
236 }
237
238 pub(super) fn on_new_upstream_epoch(
239 &mut self,
240 barrier_info: &BarrierInfo,
241 ) -> Vec<(BarrierInfo, Option<Mutation>)> {
242 match self {
243 CreatingStreamingJobStatus::ConsumingSnapshot {
244 pending_upstream_barriers,
245 prev_epoch_fake_physical_time,
246 pending_non_checkpoint_barriers,
247 create_mview_tracker,
248 ..
249 } => {
250 let mutation = {
251 let pending_backfill_nodes = create_mview_tracker
252 .take_pending_backfill_nodes()
253 .collect_vec();
254 if pending_backfill_nodes.is_empty() {
255 None
256 } else {
257 Some(Mutation::StartFragmentBackfill(
258 StartFragmentBackfillMutation {
259 fragment_ids: pending_backfill_nodes,
260 },
261 ))
262 }
263 };
264 pending_upstream_barriers.push(barrier_info.clone());
265 vec![(
266 CreatingStreamingJobStatus::new_fake_barrier(
267 prev_epoch_fake_physical_time,
268 pending_non_checkpoint_barriers,
269 match barrier_info.kind {
270 BarrierKind::Barrier => PbBarrierKind::Barrier,
271 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
272 BarrierKind::Initial => {
273 unreachable!("upstream new epoch should not be initial")
274 }
275 },
276 ),
277 mutation,
278 )]
279 }
280 CreatingStreamingJobStatus::ConsumingLogStore {
281 barriers_to_inject, ..
282 } => barriers_to_inject
283 .take()
284 .into_iter()
285 .flatten()
286 .chain([barrier_info.clone()])
287 .map(|barrier_info| (barrier_info, None))
288 .collect(),
289 CreatingStreamingJobStatus::Finishing { .. } => {
290 vec![]
291 }
292 CreatingStreamingJobStatus::PlaceHolder => {
293 unreachable!()
294 }
295 }
296 }
297
298 pub(super) fn new_fake_barrier(
299 prev_epoch_fake_physical_time: &mut u64,
300 pending_non_checkpoint_barriers: &mut Vec<u64>,
301 kind: PbBarrierKind,
302 ) -> BarrierInfo {
303 {
304 {
305 let prev_epoch =
306 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
307 *prev_epoch_fake_physical_time += 1;
308 let curr_epoch =
309 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
310 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
311 let kind = match kind {
312 PbBarrierKind::Unspecified => {
313 unreachable!()
314 }
315 PbBarrierKind::Initial => {
316 pending_non_checkpoint_barriers.clear();
317 BarrierKind::Initial
318 }
319 PbBarrierKind::Barrier => BarrierKind::Barrier,
320 PbBarrierKind::Checkpoint => {
321 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
322 }
323 };
324 BarrierInfo {
325 prev_epoch,
326 curr_epoch,
327 kind,
328 }
329 }
330 }
331 }
332
333 pub(super) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
334 match self {
335 CreatingStreamingJobStatus::ConsumingSnapshot { info, .. }
336 | CreatingStreamingJobStatus::ConsumingLogStore { info, .. } => {
337 Some(&info.fragment_infos)
338 }
339 CreatingStreamingJobStatus::Finishing(..) => None,
340 CreatingStreamingJobStatus::PlaceHolder => {
341 unreachable!()
342 }
343 }
344 }
345}