risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs1use std::collections::{BTreeMap, VecDeque};
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::id::JobId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use tracing::debug;
27
28use crate::barrier::BarrierKind;
29use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
30use crate::barrier::notifier::Notifier;
31use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
32use crate::rpc::metrics::GLOBAL_META_METRICS;
33
34#[derive(Debug)]
35struct CreatingStreamingJobEpochState {
36 epoch: u64,
37 node_to_collect: NodeToCollect,
38 resps: Vec<BarrierCompleteResponse>,
39 notifiers: Vec<Notifier>,
40 kind: BarrierKind,
41 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
42 enqueue_time: Instant,
43}
44
45#[derive(Debug)]
46pub(super) struct CreatingStreamingJobBarrierControl {
47 job_id: JobId,
48 inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
50 snapshot_epoch: u64,
51 max_collected_epoch: Option<u64>,
52 max_committed_epoch: Option<u64>,
53 pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
55 completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
56
57 consuming_snapshot_barrier_latency: LabelGuardedHistogram,
59 consuming_log_store_barrier_latency: LabelGuardedHistogram,
60
61 wait_commit_latency: LabelGuardedHistogram,
62 inflight_barrier_num: LabelGuardedIntGauge,
63}
64
65impl CreatingStreamingJobBarrierControl {
66 pub(super) fn new(job_id: JobId, snapshot_epoch: u64, committed_epoch: Option<u64>) -> Self {
67 let table_id_str = format!("{}", job_id);
68 Self {
69 job_id,
70 inflight_barrier_queue: Default::default(),
71 snapshot_epoch,
72 max_collected_epoch: committed_epoch,
73 max_committed_epoch: committed_epoch,
74 pending_barriers_to_complete: Default::default(),
75 completing_barrier: None,
76
77 consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
78 .snapshot_backfill_barrier_latency
79 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
80 consuming_log_store_barrier_latency: GLOBAL_META_METRICS
81 .snapshot_backfill_barrier_latency
82 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
83 wait_commit_latency: GLOBAL_META_METRICS
84 .snapshot_backfill_wait_commit_latency
85 .with_guarded_label_values(&[&table_id_str]),
86 inflight_barrier_num: GLOBAL_META_METRICS
87 .snapshot_backfill_inflight_barrier_num
88 .with_guarded_label_values(&[&table_id_str]),
89 }
90 }
91
92 pub(super) fn inflight_barrier_count(&self) -> usize {
93 self.inflight_barrier_queue.len()
94 }
95
96 pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
97 self.inflight_barrier_queue
98 .values()
99 .all(|state| is_valid_after_worker_err(&state.node_to_collect, worker_id))
100 }
101
102 fn latest_epoch(&self) -> Option<u64> {
103 self.inflight_barrier_queue
104 .last_key_value()
105 .map(|(epoch, _)| *epoch)
106 .or(self.max_collected_epoch)
107 }
108
109 pub(super) fn max_committed_epoch(&self) -> Option<u64> {
110 self.max_committed_epoch
111 }
112
113 pub(super) fn is_empty(&self) -> bool {
114 self.inflight_barrier_queue.is_empty()
115 && self.pending_barriers_to_complete.is_empty()
116 && self.completing_barrier.is_none()
117 }
118
119 pub(super) fn enqueue_epoch(
120 &mut self,
121 epoch: u64,
122 node_to_collect: NodeToCollect,
123 kind: BarrierKind,
124 notifiers: Vec<Notifier>,
125 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
126 ) {
127 debug!(
128 epoch,
129 ?node_to_collect,
130 job_id = %self.job_id,
131 "creating job enqueue epoch"
132 );
133 if first_create_info.is_some() {
134 assert!(
135 kind.is_checkpoint(),
136 "first barrier must be checkpoint barrier"
137 );
138 }
139 match &kind {
140 BarrierKind::Initial => {
141 assert_eq!(
142 self.latest_epoch().expect(
143 "should have committed when recovered and injecting Initial barrier"
144 ),
145 epoch
146 );
147 }
148 BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
149 if let Some(latest_epoch) = self.latest_epoch() {
150 assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
151 }
152 }
153 }
154
155 let epoch_state = CreatingStreamingJobEpochState {
156 epoch,
157 node_to_collect,
158 resps: vec![],
159 notifiers,
160 kind,
161 first_create_info,
162 enqueue_time: Instant::now(),
163 };
164 if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
165 self.add_collected(epoch_state);
166 } else {
167 self.inflight_barrier_queue.insert(epoch, epoch_state);
168 }
169 self.inflight_barrier_num
170 .set(self.inflight_barrier_queue.len() as _);
171 }
172
173 pub(super) fn collect(&mut self, resp: BarrierCompleteResponse) {
174 let epoch = resp.epoch;
175 let worker_id = resp.worker_id;
176 debug!(
177 epoch,
178 %worker_id,
179 job_id = %self.job_id,
180 "collect barrier from worker"
181 );
182
183 let state = self
184 .inflight_barrier_queue
185 .get_mut(&epoch)
186 .expect("should exist");
187 assert!(state.node_to_collect.remove(&worker_id));
188 state.resps.push(resp);
189 while let Some((_, state)) = self.inflight_barrier_queue.first_key_value()
190 && state.node_to_collect.is_empty()
191 {
192 let (_, state) = self.inflight_barrier_queue.pop_first().expect("non-empty");
193 self.add_collected(state);
194 }
195
196 self.inflight_barrier_num
197 .set(self.inflight_barrier_queue.len() as _);
198 }
199
200 pub(super) fn start_completing(
206 &mut self,
207 epoch_end_bound: Bound<u64>,
208 ) -> Option<(
209 u64,
210 Vec<BarrierCompleteResponse>,
211 Option<CreateSnapshotBackfillJobCommandInfo>,
212 )> {
213 assert!(self.completing_barrier.is_none());
214 let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
215 while let Some(epoch_state) = self.pending_barriers_to_complete.back()
216 && epoch_range.contains(&epoch_state.epoch)
217 {
218 let mut epoch_state = self
219 .pending_barriers_to_complete
220 .pop_back()
221 .expect("non-empty");
222 let epoch = epoch_state.epoch;
223 let first_create_info = epoch_state.first_create_info.take();
224 if first_create_info.is_some() {
225 assert!(epoch_state.kind.is_checkpoint());
226 } else if !epoch_state.kind.is_checkpoint() {
227 continue;
228 }
229
230 let resps = take(&mut epoch_state.resps);
231 self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
232 return Some((epoch, resps, first_create_info));
233 }
234 None
235 }
236
237 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
241 let (epoch_state, wait_commit_timer) =
242 self.completing_barrier.take().expect("should exist");
243 wait_commit_timer.observe_duration();
244 assert_eq!(epoch_state.epoch, completed_epoch);
245 for notifier in epoch_state.notifiers {
246 notifier.notify_collected();
247 }
248 if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
249 assert!(completed_epoch > prev_max_committed_epoch);
250 }
251 }
252
253 fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
254 assert!(epoch_state.node_to_collect.is_empty());
255 if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
256 assert!(prev_epoch_state.epoch < epoch_state.epoch);
257 }
258 if let Some(max_collected_epoch) = self.max_collected_epoch {
259 match &epoch_state.kind {
260 BarrierKind::Initial => {
261 assert_eq!(epoch_state.epoch, max_collected_epoch);
262 }
263 BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
264 assert!(epoch_state.epoch > max_collected_epoch);
265 }
266 }
267 }
268 self.max_collected_epoch = Some(epoch_state.epoch);
269 let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
270 let barrier_latency_metrics = if epoch_state.epoch < self.snapshot_epoch {
271 &self.consuming_snapshot_barrier_latency
272 } else {
273 &self.consuming_log_store_barrier_latency
274 };
275 barrier_latency_metrics.observe(barrier_latency);
276 if !epoch_state.kind.is_initial() {
277 self.pending_barriers_to_complete.push_front(epoch_state);
278 }
279 }
280}