risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs1use std::collections::VecDeque;
16use std::ops::Bound::Unbounded;
17use std::ops::{Bound, RangeBounds};
18
19use risingwave_common::id::JobId;
20use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_pb::stream_service::BarrierCompleteResponse;
23use tracing::debug;
24
25use crate::barrier::notifier::Notifier;
26use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphBarrierInfo, PartialGraphStat};
27use crate::rpc::metrics::GLOBAL_META_METRICS;
28
29pub(super) struct CreatingStreamingJobBarrierStats {
30 consuming_snapshot_barrier_latency: LabelGuardedHistogram,
31 consuming_log_store_barrier_latency: LabelGuardedHistogram,
32 inflight_barrier_num: LabelGuardedIntGauge,
33
34 snapshot_epoch: u64,
35}
36
37impl CreatingStreamingJobBarrierStats {
38 pub(super) fn new(job_id: JobId, snapshot_epoch: u64) -> Self {
39 let table_id_str = format!("{}", job_id);
40 Self {
41 snapshot_epoch,
42 consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
43 .snapshot_backfill_barrier_latency
44 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
45 consuming_log_store_barrier_latency: GLOBAL_META_METRICS
46 .snapshot_backfill_barrier_latency
47 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
48 inflight_barrier_num: GLOBAL_META_METRICS
49 .snapshot_backfill_inflight_barrier_num
50 .with_guarded_label_values(&[&table_id_str]),
51 }
52 }
53}
54
55impl PartialGraphStat for CreatingStreamingJobBarrierStats {
56 fn observe_barrier_latency(&self, epoch: EpochPair, barrier_latency_secs: f64) {
57 let barrier_latency_metrics = if epoch.prev < self.snapshot_epoch {
58 &self.consuming_snapshot_barrier_latency
59 } else {
60 &self.consuming_log_store_barrier_latency
61 };
62 barrier_latency_metrics.observe(barrier_latency_secs);
63 }
64
65 fn observe_barrier_num(&self, inflight_barrier_num: usize, _collected_barrier_num: usize) {
66 self.inflight_barrier_num.set(inflight_barrier_num as _);
67 }
68}
69
70#[derive(Debug)]
71pub(super) struct CreatingStreamingJobBarrierControl {
72 job_id: JobId,
73 inflight_barrier_queue: VecDeque<u64>,
75 max_collected_epoch: Option<u64>,
76 max_committed_epoch: Option<u64>,
77 pending_barriers_to_complete: VecDeque<u64>,
79 completing_barrier: Option<u64>,
80}
81
82impl CreatingStreamingJobBarrierControl {
83 pub(super) fn new(job_id: JobId, committed_epoch: Option<u64>) -> Self {
84 Self {
85 job_id,
86 inflight_barrier_queue: Default::default(),
87 max_collected_epoch: committed_epoch,
88 max_committed_epoch: committed_epoch,
89 pending_barriers_to_complete: Default::default(),
90 completing_barrier: None,
91 }
92 }
93
94 pub(super) fn inflight_barrier_count(&self) -> usize {
95 self.inflight_barrier_queue.len()
96 }
97
98 fn latest_epoch(&self) -> Option<u64> {
99 self.inflight_barrier_queue
100 .front()
101 .copied()
102 .or(self.max_collected_epoch)
103 }
104
105 pub(super) fn max_committed_epoch(&self) -> Option<u64> {
106 self.max_committed_epoch
107 }
108
109 pub(super) fn is_empty(&self) -> bool {
110 self.inflight_barrier_queue.is_empty()
111 && self.pending_barriers_to_complete.is_empty()
112 && self.completing_barrier.is_none()
113 }
114
115 pub(super) fn enqueue_epoch(&mut self, epoch: u64) {
116 debug!(
117 epoch,
118 job_id = %self.job_id,
119 "creating job enqueue epoch"
120 );
121 if let Some(latest_epoch) = self.latest_epoch() {
122 assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
123 }
124
125 self.inflight_barrier_queue.push_front(epoch);
126 }
127
128 pub(super) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) {
129 let epoch = self
130 .inflight_barrier_queue
131 .pop_back()
132 .expect("non-empty when collected");
133 assert_eq!(epoch, collected_barrier.epoch.prev);
134 self.add_collected(collected_barrier);
135 }
136
137 pub(super) fn start_completing(
143 &mut self,
144 epoch_end_bound: Bound<u64>,
145 mut take_resps: impl FnMut(u64) -> (Vec<BarrierCompleteResponse>, PartialGraphBarrierInfo),
146 ) -> Option<(u64, Vec<BarrierCompleteResponse>, PartialGraphBarrierInfo)> {
147 assert!(self.completing_barrier.is_none());
148 let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
149 while let Some(&epoch) = self.pending_barriers_to_complete.back()
150 && epoch_range.contains(&epoch)
151 {
152 let epoch_state = self
153 .pending_barriers_to_complete
154 .pop_back()
155 .expect("non-empty");
156 let (resps, info) = take_resps(epoch);
157 if info.post_collect_command.should_checkpoint() {
158 assert!(info.barrier_info.kind.is_checkpoint());
159 } else if !info.barrier_info.kind.is_checkpoint() {
160 info.notifiers
161 .into_iter()
162 .for_each(Notifier::notify_collected);
163 continue;
164 }
165 self.completing_barrier = Some(epoch_state);
166 return Some((epoch, resps, info));
167 }
168 None
169 }
170
171 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
175 let epoch = self.completing_barrier.take().expect("should exist");
176 assert_eq!(epoch, completed_epoch);
177 if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
178 assert!(completed_epoch > prev_max_committed_epoch);
179 }
180 }
181
182 fn add_collected(&mut self, collected_barrier: CollectedBarrier<'_>) {
183 let epoch = collected_barrier.epoch.prev;
184 if let Some(prev_epoch) = self.pending_barriers_to_complete.front() {
185 assert!(*prev_epoch < epoch);
186 }
187 if let Some(max_collected_epoch) = self.max_collected_epoch {
188 assert!(epoch > max_collected_epoch);
189 }
190 self.max_collected_epoch = Some(epoch);
191 self.pending_barriers_to_complete.push_front(epoch);
192 }
193}