risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs1use std::collections::VecDeque;
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::id::JobId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_pb::stream_service::BarrierCompleteResponse;
25use tracing::debug;
26
27use crate::barrier::BarrierKind;
28use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
29use crate::barrier::notifier::Notifier;
30use crate::barrier::partial_graph::CollectedBarrier;
31use crate::rpc::metrics::GLOBAL_META_METRICS;
32
33#[derive(Debug)]
34struct CreatingStreamingJobEpochState {
35 epoch: u64,
36 resps: Vec<BarrierCompleteResponse>,
37 notifiers: Vec<Notifier>,
38 kind: BarrierKind,
39 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
40 enqueue_time: Instant,
41}
42
43#[derive(Debug)]
44pub(super) struct CreatingStreamingJobBarrierControl {
45 job_id: JobId,
46 inflight_barrier_queue: VecDeque<CreatingStreamingJobEpochState>,
48 snapshot_epoch: u64,
49 max_collected_epoch: Option<u64>,
50 max_committed_epoch: Option<u64>,
51 pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
53 completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
54
55 consuming_snapshot_barrier_latency: LabelGuardedHistogram,
57 consuming_log_store_barrier_latency: LabelGuardedHistogram,
58
59 wait_commit_latency: LabelGuardedHistogram,
60 inflight_barrier_num: LabelGuardedIntGauge,
61}
62
63impl CreatingStreamingJobBarrierControl {
64 pub(super) fn new(job_id: JobId, snapshot_epoch: u64, committed_epoch: Option<u64>) -> Self {
65 let table_id_str = format!("{}", job_id);
66 Self {
67 job_id,
68 inflight_barrier_queue: Default::default(),
69 snapshot_epoch,
70 max_collected_epoch: committed_epoch,
71 max_committed_epoch: committed_epoch,
72 pending_barriers_to_complete: Default::default(),
73 completing_barrier: None,
74
75 consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
76 .snapshot_backfill_barrier_latency
77 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
78 consuming_log_store_barrier_latency: GLOBAL_META_METRICS
79 .snapshot_backfill_barrier_latency
80 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
81 wait_commit_latency: GLOBAL_META_METRICS
82 .snapshot_backfill_wait_commit_latency
83 .with_guarded_label_values(&[&table_id_str]),
84 inflight_barrier_num: GLOBAL_META_METRICS
85 .snapshot_backfill_inflight_barrier_num
86 .with_guarded_label_values(&[&table_id_str]),
87 }
88 }
89
90 pub(super) fn inflight_barrier_count(&self) -> usize {
91 self.inflight_barrier_queue.len()
92 }
93
94 fn latest_epoch(&self) -> Option<u64> {
95 self.inflight_barrier_queue
96 .front()
97 .map(|epoch| epoch.epoch)
98 .or(self.max_collected_epoch)
99 }
100
101 pub(super) fn max_committed_epoch(&self) -> Option<u64> {
102 self.max_committed_epoch
103 }
104
105 pub(super) fn is_empty(&self) -> bool {
106 self.inflight_barrier_queue.is_empty()
107 && self.pending_barriers_to_complete.is_empty()
108 && self.completing_barrier.is_none()
109 }
110
111 pub(super) fn enqueue_epoch(
112 &mut self,
113 epoch: u64,
114 kind: BarrierKind,
115 notifiers: Vec<Notifier>,
116 first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
117 ) {
118 debug!(
119 epoch,
120 job_id = %self.job_id,
121 "creating job enqueue epoch"
122 );
123 if first_create_info.is_some() {
124 assert!(
125 kind.is_checkpoint(),
126 "first barrier must be checkpoint barrier"
127 );
128 }
129 match &kind {
130 BarrierKind::Initial => {
131 unreachable!("should not inject initial barrier here");
132 }
133 BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
134 if let Some(latest_epoch) = self.latest_epoch() {
135 assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
136 }
137 }
138 }
139
140 let epoch_state = CreatingStreamingJobEpochState {
141 epoch,
142 resps: vec![],
143 notifiers,
144 kind,
145 first_create_info,
146 enqueue_time: Instant::now(),
147 };
148 self.inflight_barrier_queue.push_front(epoch_state);
149 self.inflight_barrier_num
150 .set(self.inflight_barrier_queue.len() as _);
151 }
152
153 pub(super) fn collect(&mut self, collected_barrier: CollectedBarrier) {
154 let mut state = self
155 .inflight_barrier_queue
156 .pop_back()
157 .expect("non-empty when collected");
158 assert_eq!(state.epoch, collected_barrier.epoch.prev);
159 assert!(state.resps.is_empty());
160 state.resps.extend(collected_barrier.resps.into_values());
161 self.add_collected(state);
162
163 self.inflight_barrier_num
164 .set(self.inflight_barrier_queue.len() as _);
165 }
166
167 pub(super) fn start_completing(
173 &mut self,
174 epoch_end_bound: Bound<u64>,
175 ) -> Option<(
176 u64,
177 Vec<BarrierCompleteResponse>,
178 Option<CreateSnapshotBackfillJobCommandInfo>,
179 )> {
180 assert!(self.completing_barrier.is_none());
181 let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
182 while let Some(epoch_state) = self.pending_barriers_to_complete.back()
183 && epoch_range.contains(&epoch_state.epoch)
184 {
185 let mut epoch_state = self
186 .pending_barriers_to_complete
187 .pop_back()
188 .expect("non-empty");
189 let epoch = epoch_state.epoch;
190 let first_create_info = epoch_state.first_create_info.take();
191 if first_create_info.is_some() {
192 assert!(epoch_state.kind.is_checkpoint());
193 } else if !epoch_state.kind.is_checkpoint() {
194 continue;
195 }
196
197 let resps = take(&mut epoch_state.resps);
198 self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
199 return Some((epoch, resps, first_create_info));
200 }
201 None
202 }
203
204 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
208 let (epoch_state, wait_commit_timer) =
209 self.completing_barrier.take().expect("should exist");
210 wait_commit_timer.observe_duration();
211 assert_eq!(epoch_state.epoch, completed_epoch);
212 for notifier in epoch_state.notifiers {
213 notifier.notify_collected();
214 }
215 if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
216 assert!(completed_epoch > prev_max_committed_epoch);
217 }
218 }
219
220 fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
221 if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
222 assert!(prev_epoch_state.epoch < epoch_state.epoch);
223 }
224 if let Some(max_collected_epoch) = self.max_collected_epoch {
225 match &epoch_state.kind {
226 BarrierKind::Initial => {
227 unreachable!("should not collect initial barrier here")
228 }
229 BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
230 assert!(epoch_state.epoch > max_collected_epoch);
231 }
232 }
233 }
234 self.max_collected_epoch = Some(epoch_state.epoch);
235 let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
236 let barrier_latency_metrics = if epoch_state.epoch < self.snapshot_epoch {
237 &self.consuming_snapshot_barrier_latency
238 } else {
239 &self.consuming_log_store_barrier_latency
240 };
241 barrier_latency_metrics.observe(barrier_latency);
242 self.pending_barriers_to_complete.push_front(epoch_state);
243 }
244}