risingwave_meta/barrier/checkpoint/independent_job/creating_job/
barrier_control.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::id::JobId;
16use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
17use risingwave_common::util::epoch::EpochPair;
18
19use crate::barrier::partial_graph::PartialGraphStat;
20use crate::rpc::metrics::GLOBAL_META_METRICS;
21
22pub(super) struct CreatingStreamingJobBarrierStats {
23    consuming_snapshot_barrier_latency: LabelGuardedHistogram,
24    consuming_log_store_barrier_latency: LabelGuardedHistogram,
25    inflight_barrier_num: LabelGuardedIntGauge,
26
27    snapshot_epoch: u64,
28}
29
30impl CreatingStreamingJobBarrierStats {
31    pub(super) fn new(job_id: JobId, snapshot_epoch: u64) -> Self {
32        let table_id_str = format!("{}", job_id);
33        Self {
34            snapshot_epoch,
35            consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
36                .snapshot_backfill_barrier_latency
37                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
38            consuming_log_store_barrier_latency: GLOBAL_META_METRICS
39                .snapshot_backfill_barrier_latency
40                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
41            inflight_barrier_num: GLOBAL_META_METRICS
42                .snapshot_backfill_inflight_barrier_num
43                .with_guarded_label_values(&[&table_id_str]),
44        }
45    }
46}
47
48impl PartialGraphStat for CreatingStreamingJobBarrierStats {
49    fn observe_barrier_latency(&self, epoch: EpochPair, barrier_latency_secs: f64) {
50        let barrier_latency_metrics = if epoch.prev < self.snapshot_epoch {
51            &self.consuming_snapshot_barrier_latency
52        } else {
53            &self.consuming_log_store_barrier_latency
54        };
55        barrier_latency_metrics.observe(barrier_latency_secs);
56    }
57
58    fn observe_barrier_num(&self, inflight_barrier_num: usize, _collected_barrier_num: usize) {
59        self.inflight_barrier_num.set(inflight_barrier_num as _);
60    }
61}