risingwave_meta/barrier/checkpoint/creating_job/
status.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18use std::time::Duration;
19
20use risingwave_common::hash::ActorId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::stream_plan::barrier::PbBarrierKind;
24use risingwave_pb::stream_service::barrier_complete_response::{
25 CreateMviewProgress, PbCreateMviewProgress,
26};
27use tracing::warn;
28
29use crate::barrier::progress::CreateMviewProgressTracker;
30use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
31
32#[derive(Debug)]
33pub(super) struct CreateMviewLogStoreProgressTracker {
34 ongoing_actors: HashMap<ActorId, u64>,
36 finished_actors: HashSet<ActorId>,
37}
38
39impl CreateMviewLogStoreProgressTracker {
40 pub(super) fn new(actors: impl Iterator<Item = ActorId>, pending_barrier_lag: u64) -> Self {
41 Self {
42 ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, pending_barrier_lag))),
43 finished_actors: HashSet::new(),
44 }
45 }
46
47 pub(super) fn gen_ddl_progress(&self) -> String {
48 let sum = self.ongoing_actors.values().sum::<u64>() as f64;
49 let count = if self.ongoing_actors.is_empty() {
50 1
51 } else {
52 self.ongoing_actors.len()
53 } as f64;
54 let avg = sum / count;
55 let avg_lag_time = Duration::from_millis(Epoch(avg as _).physical_time());
56 format!(
57 "actor: {}/{}, avg lag {:?}",
58 self.finished_actors.len(),
59 self.ongoing_actors.len() + self.finished_actors.len(),
60 avg_lag_time
61 )
62 }
63
64 fn update(&mut self, progress: impl IntoIterator<Item = &PbCreateMviewProgress>) {
65 for progress in progress {
66 match self.ongoing_actors.entry(progress.backfill_actor_id) {
67 Entry::Occupied(mut entry) => {
68 if progress.done {
69 entry.remove_entry();
70 assert!(
71 self.finished_actors.insert(progress.backfill_actor_id),
72 "non-duplicate"
73 );
74 } else {
75 *entry.get_mut() = progress.pending_epoch_lag as _;
76 }
77 }
78 Entry::Vacant(_) => {
79 if cfg!(debug_assertions) {
80 panic!(
81 "reporting progress on non-inflight actor: {:?} {:?}",
82 progress, self
83 );
84 } else {
85 warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor");
86 }
87 }
88 }
89 }
90 }
91
92 pub(super) fn is_finished(&self) -> bool {
93 self.ongoing_actors.is_empty()
94 }
95}
96
97#[derive(Debug)]
98pub(super) enum CreatingStreamingJobStatus {
99 ConsumingSnapshot {
103 prev_epoch_fake_physical_time: u64,
104 pending_upstream_barriers: Vec<BarrierInfo>,
105 version_stats: HummockVersionStats,
106 create_mview_tracker: CreateMviewProgressTracker,
107 snapshot_backfill_actors: HashSet<ActorId>,
108 backfill_epoch: u64,
109 pending_non_checkpoint_barriers: Vec<u64>,
111 },
112 ConsumingLogStore {
116 log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
117 barriers_to_inject: Option<Vec<BarrierInfo>>,
118 },
119 Finishing(u64),
123}
124
125impl CreatingStreamingJobStatus {
126 pub(super) fn update_progress(
127 &mut self,
128 create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
129 ) {
130 match self {
131 &mut Self::ConsumingSnapshot {
132 ref mut create_mview_tracker,
133 ref version_stats,
134 ref mut prev_epoch_fake_physical_time,
135 ref mut pending_upstream_barriers,
136 ref mut pending_non_checkpoint_barriers,
137 ref backfill_epoch,
138 ref snapshot_backfill_actors,
139 ..
140 } => {
141 create_mview_tracker.update_tracking_jobs(
142 None,
143 create_mview_progress,
144 version_stats,
145 );
146 if create_mview_tracker.has_pending_finished_jobs() {
147 pending_non_checkpoint_barriers.push(*backfill_epoch);
148
149 let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
150 let barriers_to_inject: Vec<_> = [BarrierInfo {
151 curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)),
152 prev_epoch: TracedEpoch::new(prev_epoch),
153 kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
154 }]
155 .into_iter()
156 .chain(pending_upstream_barriers.drain(..))
157 .collect();
158
159 *self = CreatingStreamingJobStatus::ConsumingLogStore {
160 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
161 snapshot_backfill_actors.iter().cloned(),
162 barriers_to_inject
163 .last()
164 .map(|barrier_info| {
165 barrier_info.prev_epoch().saturating_sub(*backfill_epoch)
166 })
167 .unwrap_or(0),
168 ),
169 barriers_to_inject: Some(barriers_to_inject),
170 };
171 }
172 }
173 CreatingStreamingJobStatus::ConsumingLogStore {
174 log_store_progress_tracker,
175 ..
176 } => {
177 log_store_progress_tracker.update(create_mview_progress);
178 }
179 CreatingStreamingJobStatus::Finishing(_) => {}
180 }
181 }
182
183 pub(super) fn start_consume_upstream(&mut self, barrier_info: &BarrierInfo) {
184 match self {
185 CreatingStreamingJobStatus::ConsumingSnapshot { .. } => {
186 unreachable!(
187 "should not start consuming upstream for a job that are consuming snapshot"
188 )
189 }
190 CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
191 let prev_epoch = barrier_info.prev_epoch();
192 {
193 assert!(barrier_info.kind.is_checkpoint());
194 *self = CreatingStreamingJobStatus::Finishing(prev_epoch);
195 }
196 }
197 CreatingStreamingJobStatus::Finishing { .. } => {
198 unreachable!("should not start consuming upstream for a job again")
199 }
200 }
201 }
202
203 pub(super) fn on_new_upstream_epoch(&mut self, barrier_info: &BarrierInfo) -> Vec<BarrierInfo> {
204 match self {
205 CreatingStreamingJobStatus::ConsumingSnapshot {
206 pending_upstream_barriers,
207 prev_epoch_fake_physical_time,
208 pending_non_checkpoint_barriers,
209 ..
210 } => {
211 pending_upstream_barriers.push(barrier_info.clone());
212 vec![CreatingStreamingJobStatus::new_fake_barrier(
213 prev_epoch_fake_physical_time,
214 pending_non_checkpoint_barriers,
215 match barrier_info.kind {
216 BarrierKind::Barrier => PbBarrierKind::Barrier,
217 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
218 BarrierKind::Initial => {
219 unreachable!("upstream new epoch should not be initial")
220 }
221 },
222 )]
223 }
224 CreatingStreamingJobStatus::ConsumingLogStore {
225 barriers_to_inject, ..
226 } => barriers_to_inject
227 .take()
228 .into_iter()
229 .flatten()
230 .chain([barrier_info.clone()])
231 .collect(),
232 CreatingStreamingJobStatus::Finishing { .. } => {
233 vec![]
234 }
235 }
236 }
237
238 pub(super) fn new_fake_barrier(
239 prev_epoch_fake_physical_time: &mut u64,
240 pending_non_checkpoint_barriers: &mut Vec<u64>,
241 kind: PbBarrierKind,
242 ) -> BarrierInfo {
243 {
244 {
245 let prev_epoch =
246 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
247 *prev_epoch_fake_physical_time += 1;
248 let curr_epoch =
249 TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
250 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
251 let kind = match kind {
252 PbBarrierKind::Unspecified => {
253 unreachable!()
254 }
255 PbBarrierKind::Initial => {
256 pending_non_checkpoint_barriers.clear();
257 BarrierKind::Initial
258 }
259 PbBarrierKind::Barrier => BarrierKind::Barrier,
260 PbBarrierKind::Checkpoint => {
261 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
262 }
263 };
264 BarrierInfo {
265 prev_epoch,
266 curr_epoch,
267 kind,
268 }
269 }
270 }
271 }
272
273 pub(super) fn is_finishing(&self) -> bool {
274 matches!(self, Self::Finishing(_))
275 }
276}