risingwave_meta/barrier/checkpoint/independent_job/creating_job/
barrier_control.rs1use risingwave_common::id::JobId;
16use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
17use risingwave_common::util::epoch::EpochPair;
18
19use crate::barrier::partial_graph::PartialGraphStat;
20use crate::rpc::metrics::GLOBAL_META_METRICS;
21
22pub(super) struct CreatingStreamingJobBarrierStats {
23 consuming_snapshot_barrier_latency: LabelGuardedHistogram,
24 consuming_log_store_barrier_latency: LabelGuardedHistogram,
25 inflight_barrier_num: LabelGuardedIntGauge,
26
27 snapshot_epoch: u64,
28}
29
30impl CreatingStreamingJobBarrierStats {
31 pub(super) fn new(job_id: JobId, snapshot_epoch: u64) -> Self {
32 let table_id_str = format!("{}", job_id);
33 Self {
34 snapshot_epoch,
35 consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
36 .snapshot_backfill_barrier_latency
37 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
38 consuming_log_store_barrier_latency: GLOBAL_META_METRICS
39 .snapshot_backfill_barrier_latency
40 .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
41 inflight_barrier_num: GLOBAL_META_METRICS
42 .snapshot_backfill_inflight_barrier_num
43 .with_guarded_label_values(&[&table_id_str]),
44 }
45 }
46}
47
48impl PartialGraphStat for CreatingStreamingJobBarrierStats {
49 fn observe_barrier_latency(&self, epoch: EpochPair, barrier_latency_secs: f64) {
50 let barrier_latency_metrics = if epoch.prev < self.snapshot_epoch {
51 &self.consuming_snapshot_barrier_latency
52 } else {
53 &self.consuming_log_store_barrier_latency
54 };
55 barrier_latency_metrics.observe(barrier_latency_secs);
56 }
57
58 fn observe_barrier_num(&self, inflight_barrier_num: usize, _collected_barrier_num: usize) {
59 self.inflight_barrier_num.set(inflight_barrier_num as _);
60 }
61}