risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs1use std::collections::{BTreeMap, VecDeque};
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::catalog::TableId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use tracing::debug;
27
28use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
29use crate::rpc::metrics::GLOBAL_META_METRICS;
30
31#[derive(Debug)]
32struct CreatingStreamingJobEpochState {
33 epoch: u64,
34 node_to_collect: NodeToCollect,
35 resps: Vec<BarrierCompleteResponse>,
36 is_checkpoint: bool,
37 is_first_commit: bool,
38 enqueue_time: Instant,
39}
40
41#[derive(Debug)]
42pub(super) struct CreatingStreamingJobBarrierControl {
43 table_id: TableId,
44 inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
46 backfill_epoch: u64,
47 is_first_committed: bool,
48 max_collected_epoch: Option<u64>,
49 pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
51 completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
52
53 consuming_snapshot_barrier_latency: LabelGuardedHistogram<2>,
55 consuming_log_store_barrier_latency: LabelGuardedHistogram<2>,
56
57 wait_commit_latency: LabelGuardedHistogram<1>,
58 inflight_barrier_num: LabelGuardedIntGauge<1>,
59}
60
61impl CreatingStreamingJobBarrierControl {
62 pub(super) fn new(table_id: TableId, backfill_epoch: u64, is_first_committed: bool) -> Self {
63 let table_id_str = format!("{}", table_id.table_id);
64 Self {
65 table_id,
66 inflight_barrier_queue: Default::default(),
67 backfill_epoch,
68 is_first_committed,
69 max_collected_epoch: None,
70 pending_barriers_to_complete: Default::default(),
71 completing_barrier: None,
72
73 consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
74 .snapshot_backfill_barrier_latency
75 .with_guarded_label_values(&[&table_id_str, "consuming_snapshot"]),
76 consuming_log_store_barrier_latency: GLOBAL_META_METRICS
77 .snapshot_backfill_barrier_latency
78 .with_guarded_label_values(&[&table_id_str, "consuming_log_store"]),
79 wait_commit_latency: GLOBAL_META_METRICS
80 .snapshot_backfill_wait_commit_latency
81 .with_guarded_label_values(&[&table_id_str]),
82 inflight_barrier_num: GLOBAL_META_METRICS
83 .snapshot_backfill_inflight_barrier_num
84 .with_guarded_label_values(&[&table_id_str]),
85 }
86 }
87
88 pub(super) fn inflight_barrier_count(&self) -> usize {
89 self.inflight_barrier_queue.len()
90 }
91
92 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
93 self.inflight_barrier_queue
94 .values_mut()
95 .all(|state| is_valid_after_worker_err(&mut state.node_to_collect, worker_id))
96 }
97
98 fn latest_epoch(&self) -> Option<u64> {
99 self.inflight_barrier_queue
100 .last_key_value()
101 .map(|(epoch, _)| *epoch)
102 .or(self.max_collected_epoch)
103 }
104
105 pub(super) fn max_collected_epoch(&self) -> Option<u64> {
106 self.max_collected_epoch
107 }
108
109 pub(super) fn is_empty(&self) -> bool {
110 self.inflight_barrier_queue.is_empty()
111 && self.pending_barriers_to_complete.is_empty()
112 && self.completing_barrier.is_none()
113 }
114
115 pub(super) fn enqueue_epoch(
116 &mut self,
117 epoch: u64,
118 node_to_collect: NodeToCollect,
119 is_checkpoint: bool,
120 ) {
121 debug!(
122 epoch,
123 ?node_to_collect,
124 table_id = self.table_id.table_id,
125 "creating job enqueue epoch"
126 );
127 let is_first_commit = !self.is_first_committed;
128 if !self.is_first_committed {
129 self.is_first_committed = true;
130 assert!(is_checkpoint, "first barrier must be checkpoint barrier");
131 }
132 if let Some(latest_epoch) = self.latest_epoch() {
133 assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
134 }
135 let epoch_state = CreatingStreamingJobEpochState {
136 epoch,
137 node_to_collect,
138 resps: vec![],
139 is_checkpoint,
140 is_first_commit,
141 enqueue_time: Instant::now(),
142 };
143 if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
144 self.add_collected(epoch_state);
145 } else {
146 self.inflight_barrier_queue.insert(epoch, epoch_state);
147 }
148 self.inflight_barrier_num
149 .set(self.inflight_barrier_queue.len() as _);
150 }
151
152 pub(super) fn collect(
153 &mut self,
154 epoch: u64,
155 worker_id: WorkerId,
156 resp: BarrierCompleteResponse,
157 ) {
158 debug!(
159 epoch,
160 worker_id,
161 table_id = self.table_id.table_id,
162 "collect barrier from worker"
163 );
164
165 let state = self
166 .inflight_barrier_queue
167 .get_mut(&epoch)
168 .expect("should exist");
169 assert!(state.node_to_collect.remove(&worker_id).is_some());
170 state.resps.push(resp);
171 while let Some((_, state)) = self.inflight_barrier_queue.first_key_value()
172 && state.node_to_collect.is_empty()
173 {
174 let (_, state) = self.inflight_barrier_queue.pop_first().expect("non-empty");
175 self.add_collected(state);
176 }
177
178 self.inflight_barrier_num
179 .set(self.inflight_barrier_queue.len() as _);
180 }
181
182 pub(super) fn start_completing(
188 &mut self,
189 epoch_end_bound: Bound<u64>,
190 ) -> Option<(u64, Vec<BarrierCompleteResponse>, bool)> {
191 assert!(self.completing_barrier.is_none());
192 let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
193 while let Some(epoch_state) = self.pending_barriers_to_complete.back()
194 && epoch_range.contains(&epoch_state.epoch)
195 {
196 let mut epoch_state = self
197 .pending_barriers_to_complete
198 .pop_back()
199 .expect("non-empty");
200 let epoch = epoch_state.epoch;
201 let is_first = epoch_state.is_first_commit;
202 if is_first {
203 assert!(epoch_state.is_checkpoint);
204 } else if !epoch_state.is_checkpoint {
205 continue;
206 }
207
208 let resps = take(&mut epoch_state.resps);
209 self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
210 return Some((epoch, resps, is_first));
211 }
212 None
213 }
214
215 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
219 let (epoch_state, wait_commit_timer) =
220 self.completing_barrier.take().expect("should exist");
221 wait_commit_timer.observe_duration();
222 assert_eq!(epoch_state.epoch, completed_epoch);
223 }
224
225 fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
226 assert!(epoch_state.node_to_collect.is_empty());
227 if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
228 assert!(prev_epoch_state.epoch < epoch_state.epoch);
229 }
230 if let Some(max_collected_epoch) = self.max_collected_epoch {
231 assert!(epoch_state.epoch > max_collected_epoch);
232 }
233 self.max_collected_epoch = Some(epoch_state.epoch);
234 let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
235 let barrier_latency_metrics = if epoch_state.epoch < self.backfill_epoch {
236 &self.consuming_snapshot_barrier_latency
237 } else {
238 &self.consuming_log_store_barrier_latency
239 };
240 barrier_latency_metrics.observe(barrier_latency);
241 self.pending_barriers_to_complete.push_front(epoch_state);
242 }
243}