risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs1use std::collections::{BTreeMap, VecDeque};
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::catalog::TableId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use tracing::debug;
27
28use crate::barrier::BarrierKind;
29use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
30use crate::rpc::metrics::GLOBAL_META_METRICS;
31
32#[derive(Debug)]
33struct CreatingStreamingJobEpochState {
34 epoch: u64,
35 node_to_collect: NodeToCollect,
36 resps: Vec<BarrierCompleteResponse>,
37 kind: BarrierKind,
38 is_first_commit: bool,
39 enqueue_time: Instant,
40}
41
42#[derive(Debug)]
43pub(super) struct CreatingStreamingJobBarrierControl {
44 table_id: TableId,
45 inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
47 backfill_epoch: u64,
48 is_first_committed: bool,
49 max_collected_epoch: Option<u64>,
50 pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
52 completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
53
54 consuming_snapshot_barrier_latency: LabelGuardedHistogram,
56 consuming_log_store_barrier_latency: LabelGuardedHistogram,
57
58 wait_commit_latency: LabelGuardedHistogram,
59 inflight_barrier_num: LabelGuardedIntGauge,
60}
61
62impl CreatingStreamingJobBarrierControl {
63 pub(super) fn new(table_id: TableId, backfill_epoch: u64, is_first_committed: bool) -> Self {
64 let table_id_str = format!("{}", table_id.table_id);
65 Self {
66 table_id,
67 inflight_barrier_queue: Default::default(),
68 backfill_epoch,
69 is_first_committed,
70 max_collected_epoch: None,
71 pending_barriers_to_complete: Default::default(),
72 completing_barrier: None,
73
74 consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
75 .snapshot_backfill_barrier_latency
76 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
77 consuming_log_store_barrier_latency: GLOBAL_META_METRICS
78 .snapshot_backfill_barrier_latency
79 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
80 wait_commit_latency: GLOBAL_META_METRICS
81 .snapshot_backfill_wait_commit_latency
82 .with_guarded_label_values(&[&table_id_str]),
83 inflight_barrier_num: GLOBAL_META_METRICS
84 .snapshot_backfill_inflight_barrier_num
85 .with_guarded_label_values(&[&table_id_str]),
86 }
87 }
88
89 pub(super) fn inflight_barrier_count(&self) -> usize {
90 self.inflight_barrier_queue.len()
91 }
92
93 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
94 self.inflight_barrier_queue
95 .values_mut()
96 .all(|state| is_valid_after_worker_err(&mut state.node_to_collect, worker_id))
97 }
98
99 fn latest_epoch(&self) -> Option<u64> {
100 self.inflight_barrier_queue
101 .last_key_value()
102 .map(|(epoch, _)| *epoch)
103 .or(self.max_collected_epoch)
104 }
105
106 pub(super) fn max_collected_epoch(&self) -> Option<u64> {
107 self.max_collected_epoch
108 }
109
110 pub(super) fn is_empty(&self) -> bool {
111 self.inflight_barrier_queue.is_empty()
112 && self.pending_barriers_to_complete.is_empty()
113 && self.completing_barrier.is_none()
114 }
115
116 pub(super) fn enqueue_epoch(
117 &mut self,
118 epoch: u64,
119 node_to_collect: NodeToCollect,
120 kind: BarrierKind,
121 ) {
122 debug!(
123 epoch,
124 ?node_to_collect,
125 table_id = self.table_id.table_id,
126 "creating job enqueue epoch"
127 );
128 let is_first_commit = !self.is_first_committed;
129 if !self.is_first_committed {
130 self.is_first_committed = true;
131 assert!(
132 kind.is_checkpoint(),
133 "first barrier must be checkpoint barrier"
134 );
135 }
136 if let Some(latest_epoch) = self.latest_epoch() {
137 assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
138 }
139 let epoch_state = CreatingStreamingJobEpochState {
140 epoch,
141 node_to_collect,
142 resps: vec![],
143 kind,
144 is_first_commit,
145 enqueue_time: Instant::now(),
146 };
147 if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
148 self.add_collected(epoch_state);
149 } else {
150 self.inflight_barrier_queue.insert(epoch, epoch_state);
151 }
152 self.inflight_barrier_num
153 .set(self.inflight_barrier_queue.len() as _);
154 }
155
156 pub(super) fn collect(&mut self, resp: BarrierCompleteResponse) {
157 let epoch = resp.epoch;
158 let worker_id = resp.worker_id as WorkerId;
159 debug!(
160 epoch,
161 worker_id,
162 table_id = self.table_id.table_id,
163 "collect barrier from worker"
164 );
165
166 let state = self
167 .inflight_barrier_queue
168 .get_mut(&epoch)
169 .expect("should exist");
170 assert!(state.node_to_collect.remove(&worker_id).is_some());
171 state.resps.push(resp);
172 while let Some((_, state)) = self.inflight_barrier_queue.first_key_value()
173 && state.node_to_collect.is_empty()
174 {
175 let (_, state) = self.inflight_barrier_queue.pop_first().expect("non-empty");
176 self.add_collected(state);
177 }
178
179 self.inflight_barrier_num
180 .set(self.inflight_barrier_queue.len() as _);
181 }
182
183 pub(super) fn start_completing(
189 &mut self,
190 epoch_end_bound: Bound<u64>,
191 ) -> Option<(u64, Vec<BarrierCompleteResponse>, bool)> {
192 assert!(self.completing_barrier.is_none());
193 let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
194 while let Some(epoch_state) = self.pending_barriers_to_complete.back()
195 && epoch_range.contains(&epoch_state.epoch)
196 {
197 let mut epoch_state = self
198 .pending_barriers_to_complete
199 .pop_back()
200 .expect("non-empty");
201 let epoch = epoch_state.epoch;
202 let is_first = epoch_state.is_first_commit;
203 if is_first {
204 assert!(epoch_state.kind.is_checkpoint());
205 } else if !epoch_state.kind.is_checkpoint() {
206 continue;
207 }
208
209 let resps = take(&mut epoch_state.resps);
210 self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
211 return Some((epoch, resps, is_first));
212 }
213 None
214 }
215
216 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
220 let (epoch_state, wait_commit_timer) =
221 self.completing_barrier.take().expect("should exist");
222 wait_commit_timer.observe_duration();
223 assert_eq!(epoch_state.epoch, completed_epoch);
224 }
225
226 fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
227 assert!(epoch_state.node_to_collect.is_empty());
228 if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
229 assert!(prev_epoch_state.epoch < epoch_state.epoch);
230 }
231 if let Some(max_collected_epoch) = self.max_collected_epoch {
232 assert!(epoch_state.epoch > max_collected_epoch);
233 }
234 self.max_collected_epoch = Some(epoch_state.epoch);
235 let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
236 let barrier_latency_metrics = if epoch_state.epoch < self.backfill_epoch {
237 &self.consuming_snapshot_barrier_latency
238 } else {
239 &self.consuming_log_store_barrier_latency
240 };
241 barrier_latency_metrics.observe(barrier_latency);
242 if !epoch_state.kind.is_initial() {
243 self.pending_barriers_to_complete.push_front(epoch_state);
244 }
245 }
246}