risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs1use std::collections::{BTreeMap, VecDeque};
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::id::JobId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use tracing::debug;
27
28use crate::barrier::BarrierKind;
29use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
30use crate::rpc::metrics::GLOBAL_META_METRICS;
31
32#[derive(Debug)]
33struct CreatingStreamingJobEpochState {
34 epoch: u64,
35 node_to_collect: NodeToCollect,
36 resps: Vec<BarrierCompleteResponse>,
37 kind: BarrierKind,
38 is_first_commit: bool,
39 enqueue_time: Instant,
40}
41
42#[derive(Debug)]
43pub(super) struct CreatingStreamingJobBarrierControl {
44 job_id: JobId,
45 inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
47 snapshot_epoch: u64,
48 is_first_committed: bool,
49 max_collected_epoch: Option<u64>,
50 max_committed_epoch: Option<u64>,
51 pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
53 completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
54
55 consuming_snapshot_barrier_latency: LabelGuardedHistogram,
57 consuming_log_store_barrier_latency: LabelGuardedHistogram,
58
59 wait_commit_latency: LabelGuardedHistogram,
60 inflight_barrier_num: LabelGuardedIntGauge,
61}
62
63impl CreatingStreamingJobBarrierControl {
64 pub(super) fn new(
65 job_id: JobId,
66 snapshot_epoch: u64,
67 is_first_committed: bool,
68 committed_epoch: Option<u64>,
69 ) -> Self {
70 let table_id_str = format!("{}", job_id);
71 Self {
72 job_id,
73 inflight_barrier_queue: Default::default(),
74 snapshot_epoch,
75 is_first_committed,
76 max_collected_epoch: committed_epoch,
77 max_committed_epoch: committed_epoch,
78 pending_barriers_to_complete: Default::default(),
79 completing_barrier: None,
80
81 consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
82 .snapshot_backfill_barrier_latency
83 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
84 consuming_log_store_barrier_latency: GLOBAL_META_METRICS
85 .snapshot_backfill_barrier_latency
86 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
87 wait_commit_latency: GLOBAL_META_METRICS
88 .snapshot_backfill_wait_commit_latency
89 .with_guarded_label_values(&[&table_id_str]),
90 inflight_barrier_num: GLOBAL_META_METRICS
91 .snapshot_backfill_inflight_barrier_num
92 .with_guarded_label_values(&[&table_id_str]),
93 }
94 }
95
96 pub(super) fn inflight_barrier_count(&self) -> usize {
97 self.inflight_barrier_queue.len()
98 }
99
100 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
101 self.inflight_barrier_queue
102 .values_mut()
103 .all(|state| is_valid_after_worker_err(&mut state.node_to_collect, worker_id))
104 }
105
106 fn latest_epoch(&self) -> Option<u64> {
107 self.inflight_barrier_queue
108 .last_key_value()
109 .map(|(epoch, _)| *epoch)
110 .or(self.max_collected_epoch)
111 }
112
113 pub(super) fn max_committed_epoch(&self) -> Option<u64> {
114 self.max_committed_epoch
115 }
116
117 pub(super) fn is_empty(&self) -> bool {
118 self.inflight_barrier_queue.is_empty()
119 && self.pending_barriers_to_complete.is_empty()
120 && self.completing_barrier.is_none()
121 }
122
123 pub(super) fn enqueue_epoch(
124 &mut self,
125 epoch: u64,
126 node_to_collect: NodeToCollect,
127 kind: BarrierKind,
128 ) {
129 debug!(
130 epoch,
131 ?node_to_collect,
132 job_id = %self.job_id,
133 "creating job enqueue epoch"
134 );
135 let is_first_commit = !self.is_first_committed;
136 if !self.is_first_committed {
137 self.is_first_committed = true;
138 assert!(
139 kind.is_checkpoint(),
140 "first barrier must be checkpoint barrier"
141 );
142 }
143 match &kind {
144 BarrierKind::Initial => {
145 assert_eq!(
146 self.latest_epoch().expect(
147 "should have committed when recovered and injecting Initial barrier"
148 ),
149 epoch
150 );
151 }
152 BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
153 if let Some(latest_epoch) = self.latest_epoch() {
154 assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
155 }
156 }
157 }
158
159 let epoch_state = CreatingStreamingJobEpochState {
160 epoch,
161 node_to_collect,
162 resps: vec![],
163 kind,
164 is_first_commit,
165 enqueue_time: Instant::now(),
166 };
167 if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
168 self.add_collected(epoch_state);
169 } else {
170 self.inflight_barrier_queue.insert(epoch, epoch_state);
171 }
172 self.inflight_barrier_num
173 .set(self.inflight_barrier_queue.len() as _);
174 }
175
176 pub(super) fn collect(&mut self, resp: BarrierCompleteResponse) {
177 let epoch = resp.epoch;
178 let worker_id = resp.worker_id;
179 debug!(
180 epoch,
181 %worker_id,
182 job_id = %self.job_id,
183 "collect barrier from worker"
184 );
185
186 let state = self
187 .inflight_barrier_queue
188 .get_mut(&epoch)
189 .expect("should exist");
190 assert!(state.node_to_collect.remove(&worker_id).is_some());
191 state.resps.push(resp);
192 while let Some((_, state)) = self.inflight_barrier_queue.first_key_value()
193 && state.node_to_collect.is_empty()
194 {
195 let (_, state) = self.inflight_barrier_queue.pop_first().expect("non-empty");
196 self.add_collected(state);
197 }
198
199 self.inflight_barrier_num
200 .set(self.inflight_barrier_queue.len() as _);
201 }
202
203 pub(super) fn start_completing(
209 &mut self,
210 epoch_end_bound: Bound<u64>,
211 ) -> Option<(u64, Vec<BarrierCompleteResponse>, bool)> {
212 assert!(self.completing_barrier.is_none());
213 let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
214 while let Some(epoch_state) = self.pending_barriers_to_complete.back()
215 && epoch_range.contains(&epoch_state.epoch)
216 {
217 let mut epoch_state = self
218 .pending_barriers_to_complete
219 .pop_back()
220 .expect("non-empty");
221 let epoch = epoch_state.epoch;
222 let is_first = epoch_state.is_first_commit;
223 if is_first {
224 assert!(epoch_state.kind.is_checkpoint());
225 } else if !epoch_state.kind.is_checkpoint() {
226 continue;
227 }
228
229 let resps = take(&mut epoch_state.resps);
230 self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
231 return Some((epoch, resps, is_first));
232 }
233 None
234 }
235
236 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
240 let (epoch_state, wait_commit_timer) =
241 self.completing_barrier.take().expect("should exist");
242 wait_commit_timer.observe_duration();
243 assert_eq!(epoch_state.epoch, completed_epoch);
244 if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
245 assert!(completed_epoch > prev_max_committed_epoch);
246 }
247 }
248
249 fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
250 assert!(epoch_state.node_to_collect.is_empty());
251 if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
252 assert!(prev_epoch_state.epoch < epoch_state.epoch);
253 }
254 if let Some(max_collected_epoch) = self.max_collected_epoch {
255 match &epoch_state.kind {
256 BarrierKind::Initial => {
257 assert_eq!(epoch_state.epoch, max_collected_epoch);
258 }
259 BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
260 assert!(epoch_state.epoch > max_collected_epoch);
261 }
262 }
263 }
264 self.max_collected_epoch = Some(epoch_state.epoch);
265 let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
266 let barrier_latency_metrics = if epoch_state.epoch < self.snapshot_epoch {
267 &self.consuming_snapshot_barrier_latency
268 } else {
269 &self.consuming_log_store_barrier_latency
270 };
271 barrier_latency_metrics.observe(barrier_latency);
272 if !epoch_state.kind.is_initial() {
273 self.pending_barriers_to_complete.push_front(epoch_state);
274 }
275 }
276}