risingwave_meta/barrier/checkpoint/creating_job/
status.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18use std::time::Duration;
19
20use risingwave_common::hash::ActorId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::stream_plan::StartFragmentBackfillMutation;
24use risingwave_pb::stream_plan::barrier::PbBarrierKind;
25use risingwave_pb::stream_plan::barrier_mutation::Mutation;
26use risingwave_pb::stream_service::barrier_complete_response::{
27 CreateMviewProgress, PbCreateMviewProgress,
28};
29use tracing::warn;
30
31use crate::barrier::progress::CreateMviewProgressTracker;
32use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
33
34#[derive(Debug)]
35pub(super) struct CreateMviewLogStoreProgressTracker {
36 ongoing_actors: HashMap<ActorId, u64>,
38 finished_actors: HashSet<ActorId>,
39}
40
41impl CreateMviewLogStoreProgressTracker {
42 pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
43 Self {
44 ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
45 finished_actors: HashSet::new(),
46 }
47 }
48
49 pub(super) fn gen_ddl_progress(&self) -> String {
50 let sum = self.ongoing_actors.values().sum::<u64>() as f64;
51 let count = if self.ongoing_actors.is_empty() {
52 1
53 } else {
54 self.ongoing_actors.len()
55 } as f64;
56 let avg = sum / count;
57 let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
58 format!(
59 "actor: {}/{}, avg lag {:?}",
60 self.finished_actors.len(),
61 self.ongoing_actors.len() + self.finished_actors.len(),
62 avg_lag_time
63 )
64 }
65
66 fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
67 for progress in progress {
68 match self.ongoing_actors.entry(progress.backfill_actor_id) {
69 Entry::Occupied(mut entry) => {
70 if progress.done {
71 entry.remove_entry();
72 assert!(
73 self.finished_actors.insert(progress.backfill_actor_id),
74 "non-duplicate"
75 );
76 } else {
77 *entry.get_mut() = progress.pending_epoch_lag as _;
78 }
79 }
80 Entry::Vacant(_) => {
81 if cfg!(debug_assertions) {
82 panic!(
83 "reporting progress on non-inflight actor: {:?} {:?}",
84 progress, self
85 );
86 } else {
87 warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
88 }
89 }
90 }
91 }
92 }
93
94 pub(super) fn is_finished(&self) -> bool {
95 self.ongoing_actors.is_empty()
96 }
97}
98
99#[derive(Debug)]
100pub(super) enum CreatingStreamingJobStatus {
101 ConsumingSnapshot {
105 prev_epoch_fake_physical_time: u64,
106 pending_upstream_barriers: Vec<BarrierInfo>,
107 version_stats: HummockVersionStats,
108 create_mview_tracker: CreateMviewProgressTracker,
109 snapshot_backfill_actors: HashSet<ActorId>,
110 backfill_epoch: u64,
111 pending_non_checkpoint_barriers: Vec<u64>,
113 },
114 ConsumingLogStore {
118 log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
119 barriers_to_inject: Option<Vec<BarrierInfo>>,
120 },
121 Finishing(u64),
125}
126
127impl CreatingStreamingJobStatus {
128 pub(super) fn update_progress(
129 &mut self,
130 create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
131 ) {
132 match self {
133 &mut Self::ConsumingSnapshot {
134 ref mut create_mview_tracker,
135 ref version_stats,
136 ref mut prev_epoch_fake_physical_time,
137 ref mut pending_upstream_barriers,
138 ref mut pending_non_checkpoint_barriers,
139 ref backfill_epoch,
140 ref snapshot_backfill_actors,
141 ..
142 } => {
143 create_mview_tracker.update_tracking_jobs(
144 None,
145 create_mview_progress,
146 version_stats,
147 );
148 if create_mview_tracker.has_pending_finished_jobs() {
149 pending_non_checkpoint_barriers.push(*backfill_epoch);
150
151 let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
152 let barriers_to_inject: Vec<_> = [BarrierInfo {
153 curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
154 prev_epoch: TracedEpoch::new(prev_epoch),
155 kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
156 }]
157 .into_iter()
158 .chain(pending_upstream_barriers.drain(..))
159 .collect();
160
161 *self = CreatingStreamingJobStatus::ConsumingLogStore {
162 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
163 snapshot_backfill_actors.iter().cloned(),
164 barriers_to_inject
165 .last()
166 .map(|barrier_info| {
167 barrier_info.prev_epoch().saturating_sub(*backfill_epoch)
168 })
169 .unwrap_or(0),
170 ),
171 barriers_to_inject: Some(barriers_to_inject),
172 };
173 }
174 }
175 CreatingStreamingJobStatus::ConsumingLogStore {
176 log_store_progress_tracker,
177 ..
178 } => {
179 log_store_progress_tracker.update(create_mview_progress);
180 }
181 CreatingStreamingJobStatus::Finishing(_) => {}
182 }
183 }
184
185 pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) {
186 match self {
187 CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
188 unreachable!(
189 "should not start consuming upstream for a job that are consuming snapshot"
190 )
191 }
192 CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
193 let prev_epoch = barrier_info.prev_epoch();
194 {
195 assert!(barrier_info.kind.is_checkpoint());
196 *self = CreatingStreamingJobStatus::Finishing(prev_epoch);
197 }
198 }
199 CreatingStreamingJobStatus::Finishing { .. } => {
200 unreachable!("should not start consuming upstream for a job again")
201 }
202 }
203 }
204
205 pub(super) fn on_new_upstream_epoch(
206 &mut self,
207 barrier_info: &BarrierInfo,
208 ) -> Vec<(BarrierInfo, Option<Mutation>)> {
209 match self {
210 CreatingStreamingJobStatus::ConsumingSnapshot {
211 pending_upstream_barriers,
212 prev_epoch_fake_physical_time,
213 pending_non_checkpoint_barriers,
214 create_mview_tracker,
215 ..
216 } => {
217 let mutation = {
218 let pending_backfill_nodes = create_mview_tracker.take_pending_backfill_nodes();
219 if pending_backfill_nodes.is_empty() {
220 None
221 } else {
222 Some(Mutation::StartFragmentBackfill(
223 StartFragmentBackfillMutation {
224 fragment_ids: pending_backfill_nodes
225 .into_iter()
226 .map(|fragment_id| fragment_id as _)
227 .collect(),
228 },
229 ))
230 }
231 };
232 pending_upstream_barriers.push(barrier_info.clone());
233 vec![(
234 CreatingStreamingJobStatus::new_fake_barrier(
235 prev_epoch_fake_physical_time,
236 pending_non_checkpoint_barriers,
237 match barrier_info.kind {
238 BarrierKind::Barrier => PbBarrierKind::Barrier,
239 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
240 BarrierKind::Initial => {
241 unreachable!("upstream new epoch should not be initial")
242 }
243 },
244 ),
245 mutation,
246 )]
247 }
248 CreatingStreamingJobStatus::ConsumingLogStore {
249 barriers_to_inject, ..
250 } => barriers_to_inject
251 .take()
252 .into_iter()
253 .flatten()
254 .chain([barrier_info.clone()])
255 .map(|barrier_info| (barrier_info, None))
256 .collect(),
257 CreatingStreamingJobStatus::Finishing { .. } => {
258 vec![]
259 }
260 }
261 }
262
263 pub(super) fn new_fake_barrier(
264 prev_epoch_fake_physical_time: &mut u64,
265 pending_non_checkpoint_barriers: &mut Vec<u64>,
266 kind: PbBarrierKind,
267 ) -> BarrierInfo {
268 {
269 {
270 let prev_epoch =
271 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
272 *prev_epoch_fake_physical_time += 1;
273 let curr_epoch =
274 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
275 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
276 let kind = match kind {
277 PbBarrierKind::Unspecified => {
278 unreachable!()
279 }
280 PbBarrierKind::Initial => {
281 pending_non_checkpoint_barriers.clear();
282 BarrierKind::Initial
283 }
284 PbBarrierKind::Barrier => BarrierKind::Barrier,
285 PbBarrierKind::Checkpoint => {
286 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
287 }
288 };
289 BarrierInfo {
290 prev_epoch,
291 curr_epoch,
292 kind,
293 }
294 }
295 }
296 }
297
298 pub(super) fn is_finishing(&self) -> bool {
299 matches!(self, Self::Finishing(_))
300 }
301}