risingwave_common_metrics/monitor/
process.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use prometheus::core::{Collector, Desc};
16use prometheus::{IntCounter, IntGauge, Opts, Registry, proto};
17
18#[cfg(target_os = "linux")]
19use super::{CLOCK_TICK, PAGESIZE};
20
21/// Monitors current process.
22pub fn monitor_process(registry: &Registry) {
23    let pc = ProcessCollector::new();
24    registry.register(Box::new(pc)).unwrap()
25}
26
27/// A collector to collect process metrics.
28struct ProcessCollector {
29    descs: Vec<Desc>,
30    cpu_total: IntCounter,
31    vsize: IntGauge,
32    rss: IntGauge,
33    cpu_core_num: IntGauge,
34}
35
36impl Default for ProcessCollector {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl ProcessCollector {
43    fn new() -> Self {
44        let mut descs = Vec::new();
45
46        let cpu_total = IntCounter::with_opts(Opts::new(
47            "process_cpu_seconds_total",
48            "Total user and system CPU time spent in \
49                 seconds.",
50        ))
51        .unwrap();
52        descs.extend(cpu_total.desc().into_iter().cloned());
53
54        let vsize = IntGauge::with_opts(Opts::new(
55            "process_virtual_memory_bytes",
56            "Virtual memory size in bytes.",
57        ))
58        .unwrap();
59        descs.extend(vsize.desc().into_iter().cloned());
60
61        let rss = IntGauge::with_opts(Opts::new(
62            "process_resident_memory_bytes",
63            "Resident memory size in bytes.",
64        ))
65        .unwrap();
66        descs.extend(rss.desc().into_iter().cloned());
67
68        let cpu_core_num =
69            IntGauge::with_opts(Opts::new("process_cpu_core_num", "Cpu core num.")).unwrap();
70        descs.extend(cpu_core_num.desc().into_iter().cloned());
71
72        Self {
73            descs,
74            cpu_total,
75            vsize,
76            rss,
77            cpu_core_num,
78        }
79    }
80}
81
82#[cfg(target_os = "linux")]
83impl Collector for ProcessCollector {
84    fn desc(&self) -> Vec<&Desc> {
85        self.descs.iter().collect()
86    }
87
88    fn collect(&self) -> Vec<proto::MetricFamily> {
89        let p = match procfs::process::Process::myself() {
90            Ok(p) => p,
91            Err(..) => {
92                // we can't construct a Process object, so there's no stats to gather
93                return Vec::new();
94            }
95        };
96        let stat = match p.stat() {
97            Ok(stat) => stat,
98            Err(..) => {
99                // we can't get the stat, so there's no stats to gather
100                return Vec::new();
101            }
102        };
103
104        // memory
105        self.vsize.set(stat.vsize as i64);
106        self.rss.set(stat.rss as i64 * *PAGESIZE);
107
108        // cpu
109        let cpu_total_mfs = {
110            let total = (stat.utime + stat.stime) / *CLOCK_TICK;
111            let past = self.cpu_total.get();
112            self.cpu_total.inc_by(total - past);
113            self.cpu_total.collect()
114        };
115
116        self.cpu_core_num
117            .set(rw_resource_util::cpu::total_cpu_available() as i64);
118
119        // collect MetricFamilies.
120        let mut mfs = Vec::with_capacity(4);
121        mfs.extend(cpu_total_mfs);
122        mfs.extend(self.vsize.collect());
123        mfs.extend(self.rss.collect());
124        mfs.extend(self.cpu_core_num.collect());
125        mfs
126    }
127}
128
129#[cfg(target_os = "macos")]
130impl Collector for ProcessCollector {
131    fn desc(&self) -> Vec<&Desc> {
132        self.descs.iter().collect()
133    }
134
135    fn collect(&self) -> Vec<proto::MetricFamily> {
136        let pid = unsafe { libc::getpid() };
137        let clock_tick = unsafe {
138            let mut info = mach2::mach_time::mach_timebase_info::default();
139            let errno = mach2::mach_time::mach_timebase_info(&mut info as *mut _);
140            if errno != 0 {
141                1_f64
142            } else {
143                (info.numer / info.denom) as f64
144            }
145        };
146        let proc_info = match darwin_libproc::task_info(pid) {
147            Ok(info) => info,
148            Err(_) => {
149                return Vec::new();
150            }
151        };
152
153        // memory
154        self.vsize.set(proc_info.pti_virtual_size as i64);
155        self.rss.set(proc_info.pti_resident_size as i64);
156
157        // cpu
158        let cpu_total_mfs = {
159            // both pti_total_user and pti_total_system are returned in nano seconds
160            let total =
161                (proc_info.pti_total_user + proc_info.pti_total_system) as f64 * clock_tick / 1e9;
162            let past = self.cpu_total.get();
163            self.cpu_total.inc_by((total - past as f64) as u64);
164            self.cpu_total.collect()
165        };
166
167        self.cpu_core_num
168            .set(rw_resource_util::cpu::total_cpu_available() as i64);
169
170        // collect MetricFamilies.
171        let mut mfs = Vec::with_capacity(4);
172        mfs.extend(cpu_total_mfs);
173        mfs.extend(self.vsize.collect());
174        mfs.extend(self.rss.collect());
175        mfs.extend(self.cpu_core_num.collect());
176        mfs
177    }
178}
179
180#[cfg(not(any(target_os = "linux", target_os = "macos")))]
181impl Collector for ProcessCollector {
182    fn desc(&self) -> Vec<&Desc> {
183        self.descs.iter().collect()
184    }
185
186    fn collect(&self) -> Vec<proto::MetricFamily> {
187        // fake number
188        self.vsize.set(100 * 1000);
189        self.rss.set(100 * 1000);
190
191        // cpu
192        let cpu_total_mfs = {
193            self.cpu_total.inc_by(10);
194            self.cpu_total.collect()
195        };
196
197        self.cpu_core_num
198            .set(rw_resource_util::cpu::total_cpu_available() as i64);
199
200        // collect MetricFamilies.
201        let mut mfs = Vec::with_capacity(4);
202        mfs.extend(cpu_total_mfs);
203        mfs.extend(self.vsize.collect());
204        mfs.extend(self.rss.collect());
205        mfs.extend(self.cpu_core_num.collect());
206        mfs
207    }
208}