risingwave_meta/barrier/checkpoint/creating_job/
status.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18use std::time::Duration;
19
20use risingwave_common::hash::ActorId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::stream_service::barrier_complete_response::{
24 CreateMviewProgress, PbCreateMviewProgress,
25};
26use tracing::warn;
27
28use crate::barrier::progress::CreateMviewProgressTracker;
29use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
30
31#[derive(Debug)]
32pub(super) struct CreateMviewLogStoreProgressTracker {
33 ongoing_actors: HashMap<ActorId, u64>,
35 finished_actors: HashSet<ActorId>,
36}
37
38impl CreateMviewLogStoreProgressTracker {
39 fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
40 Self {
41 ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
42 finished_actors: HashSet::new(),
43 }
44 }
45
46 pub(super) fn gen_ddl_progress(&self) -> String {
47 let sum = self.ongoing_actors.values().sum::<u64>() as f64;
48 let count = if self.ongoing_actors.is_empty() {
49 1
50 } else {
51 self.ongoing_actors.len()
52 } as f64;
53 let avg = sum / count;
54 let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
55 format!(
56 "actor: {}/{}, avg lag {:?}",
57 self.finished_actors.len(),
58 self.ongoing_actors.len() + self.finished_actors.len(),
59 avg_lag_time
60 )
61 }
62
63 fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
64 for progress in progress {
65 match self.ongoing_actors.entry(progress.backfill_actor_id) {
66 Entry::Occupied(mut entry) => {
67 if progress.done {
68 entry.remove_entry();
69 assert!(
70 self.finished_actors.insert(progress.backfill_actor_id),
71 "non-duplicate"
72 );
73 } else {
74 *entry.get_mut() = progress.pending_epoch_lag as _;
75 }
76 }
77 Entry::Vacant(_) => {
78 if cfg!(debug_assertions) {
79 panic!(
80 "reporting progress on non-inflight actor: {:?} {:?}",
81 progress, self
82 );
83 } else {
84 warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
85 }
86 }
87 }
88 }
89 }
90
91 pub(super) fn is_finished(&self) -> bool {
92 self.ongoing_actors.is_empty()
93 }
94}
95
96#[derive(Debug)]
97pub(super) enum CreatingStreamingJobStatus {
98 ConsumingSnapshot {
102 prev_epoch_fake_physical_time: u64,
103 pending_upstream_barriers: Vec<BarrierInfo>,
104 version_stats: HummockVersionStats,
105 create_mview_tracker: CreateMviewProgressTracker,
106 snapshot_backfill_actors: HashSet<ActorId>,
107 backfill_epoch: u64,
108 pending_non_checkpoint_barriers: Vec<u64>,
110 },
111 ConsumingLogStore {
115 log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
116 barriers_to_inject: Option<Vec<BarrierInfo>>,
117 },
118 Finishing(u64),
122}
123
124impl CreatingStreamingJobStatus {
125 pub(super) fn update_progress(
126 &mut self,
127 create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
128 ) {
129 match self {
130 &mut Self::ConsumingSnapshot {
131 ref mut create_mview_tracker,
132 ref version_stats,
133 ref mut prev_epoch_fake_physical_time,
134 ref mut pending_upstream_barriers,
135 ref mut pending_non_checkpoint_barriers,
136 ref backfill_epoch,
137 ref snapshot_backfill_actors,
138 ..
139 } => {
140 create_mview_tracker.update_tracking_jobs(
141 None,
142 create_mview_progress,
143 version_stats,
144 );
145 if create_mview_tracker.has_pending_finished_jobs() {
146 pending_non_checkpoint_barriers.push(*backfill_epoch);
147
148 let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
149 let barriers_to_inject: Vec<_> = [BarrierInfo {
150 curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
151 prev_epoch: TracedEpoch::new(prev_epoch),
152 kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
153 }]
154 .into_iter()
155 .chain(pending_upstream_barriers.drain(..))
156 .collect();
157
158 *self = CreatingStreamingJobStatus::ConsumingLogStore {
159 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
160 snapshot_backfill_actors.iter().cloned(),
161 barriers_to_inject
162 .last()
163 .map(|barrier_info| {
164 barrier_info.prev_epoch().saturating_sub(*backfill_epoch)
165 })
166 .unwrap_or(0),
167 ),
168 barriers_to_inject: Some(barriers_to_inject),
169 };
170 }
171 }
172 CreatingStreamingJobStatus::ConsumingLogStore {
173 log_store_progress_tracker,
174 ..
175 } => {
176 log_store_progress_tracker.update(create_mview_progress);
177 }
178 CreatingStreamingJobStatus::Finishing(_) => {}
179 }
180 }
181
182 pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) {
183 match self {
184 CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
185 unreachable!(
186 "should not start consuming upstream for a job that are consuming snapshot"
187 )
188 }
189 CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
190 let prev_epoch = barrier_info.prev_epoch();
191 {
192 assert!(barrier_info.kind.is_checkpoint());
193 *self = CreatingStreamingJobStatus::Finishing(prev_epoch);
194 }
195 }
196 CreatingStreamingJobStatus::Finishing { .. } => {
197 unreachable!("should not start consuming upstream for a job again")
198 }
199 }
200 }
201
202 pub(super) fn on_new_upstream_epoch(&mut self, barrier_info: &BarrierInfo) -> Vec<BarrierInfo> {
203 match self {
204 CreatingStreamingJobStatus::ConsumingSnapshot {
205 pending_upstream_barriers,
206 prev_epoch_fake_physical_time,
207 pending_non_checkpoint_barriers,
208 ..
209 } => {
210 pending_upstream_barriers.push(barrier_info.clone());
211 vec![CreatingStreamingJobStatus::new_fake_barrier(
212 prev_epoch_fake_physical_time,
213 pending_non_checkpoint_barriers,
214 barrier_info.kind.is_checkpoint(),
215 )]
216 }
217 CreatingStreamingJobStatus::ConsumingLogStore {
218 barriers_to_inject, ..
219 } => barriers_to_inject
220 .take()
221 .into_iter()
222 .flatten()
223 .chain([barrier_info.clone()])
224 .collect(),
225 CreatingStreamingJobStatus::Finishing { .. } => {
226 vec![]
227 }
228 }
229 }
230
231 pub(super) fn new_fake_barrier(
232 prev_epoch_fake_physical_time: &mut u64,
233 pending_non_checkpoint_barriers: &mut Vec<u64>,
234 is_checkpoint: bool,
235 ) -> BarrierInfo {
236 {
237 {
238 let prev_epoch =
239 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
240 *prev_epoch_fake_physical_time += 1;
241 let curr_epoch =
242 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
243 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
244 let kind = if is_checkpoint {
245 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
246 } else {
247 BarrierKind::Barrier
248 };
249 BarrierInfo {
250 prev_epoch,
251 curr_epoch,
252 kind,
253 }
254 }
255 }
256 }
257
258 pub(super) fn is_finishing(&self) -> bool {
259 matches!(self, Self::Finishing(_))
260 }
261}