risingwave_common_metrics/monitor/
process.rs1use prometheus::core::{Collector, Desc};
16use prometheus::{IntCounter, IntGauge, Opts, Registry, proto};
17
18#[cfg(target_os = "linux")]
19use super::{CLOCK_TICK, PAGESIZE};
20
21pub fn monitor_process(registry: &Registry) {
23 let pc = ProcessCollector::new();
24 registry.register(Box::new(pc)).unwrap()
25}
26
27struct ProcessCollector {
29 descs: Vec<Desc>,
30 cpu_total: IntCounter,
31 vsize: IntGauge,
32 rss: IntGauge,
33 cpu_core_num: IntGauge,
34}
35
36impl Default for ProcessCollector {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl ProcessCollector {
43 fn new() -> Self {
44 let mut descs = Vec::new();
45
46 let cpu_total = IntCounter::with_opts(Opts::new(
47 "process_cpu_seconds_total",
48 "Total user and system CPU time spent in \
49 seconds.",
50 ))
51 .unwrap();
52 descs.extend(cpu_total.desc().into_iter().cloned());
53
54 let vsize = IntGauge::with_opts(Opts::new(
55 "process_virtual_memory_bytes",
56 "Virtual memory size in bytes.",
57 ))
58 .unwrap();
59 descs.extend(vsize.desc().into_iter().cloned());
60
61 let rss = IntGauge::with_opts(Opts::new(
62 "process_resident_memory_bytes",
63 "Resident memory size in bytes.",
64 ))
65 .unwrap();
66 descs.extend(rss.desc().into_iter().cloned());
67
68 let cpu_core_num =
69 IntGauge::with_opts(Opts::new("process_cpu_core_num", "Cpu core num.")).unwrap();
70 descs.extend(cpu_core_num.desc().into_iter().cloned());
71
72 Self {
73 descs,
74 cpu_total,
75 vsize,
76 rss,
77 cpu_core_num,
78 }
79 }
80}
81
82#[cfg(target_os = "linux")]
83impl Collector for ProcessCollector {
84 fn desc(&self) -> Vec<&Desc> {
85 self.descs.iter().collect()
86 }
87
88 fn collect(&self) -> Vec<proto::MetricFamily> {
89 let p = match procfs::process::Process::myself() {
90 Ok(p) => p,
91 Err(..) => {
92 return Vec::new();
94 }
95 };
96 let stat = match p.stat() {
97 Ok(stat) => stat,
98 Err(..) => {
99 return Vec::new();
101 }
102 };
103
104 self.vsize.set(stat.vsize as i64);
106 self.rss.set(stat.rss as i64 * *PAGESIZE);
107
108 let cpu_total_mfs = {
110 let total = (stat.utime + stat.stime) / *CLOCK_TICK;
111 let past = self.cpu_total.get();
112 self.cpu_total.inc_by(total - past);
113 self.cpu_total.collect()
114 };
115
116 self.cpu_core_num
117 .set(rw_resource_util::cpu::total_cpu_available() as i64);
118
119 let mut mfs = Vec::with_capacity(4);
121 mfs.extend(cpu_total_mfs);
122 mfs.extend(self.vsize.collect());
123 mfs.extend(self.rss.collect());
124 mfs.extend(self.cpu_core_num.collect());
125 mfs
126 }
127}
128
129#[cfg(target_os = "macos")]
130impl Collector for ProcessCollector {
131 fn desc(&self) -> Vec<&Desc> {
132 self.descs.iter().collect()
133 }
134
135 fn collect(&self) -> Vec<proto::MetricFamily> {
136 let pid = unsafe { libc::getpid() };
137 let clock_tick = unsafe {
138 let mut info = mach2::mach_time::mach_timebase_info::default();
139 let errno = mach2::mach_time::mach_timebase_info(&mut info as *mut _);
140 if errno != 0 {
141 1_f64
142 } else {
143 (info.numer / info.denom) as f64
144 }
145 };
146 let proc_info = match darwin_libproc::task_info(pid) {
147 Ok(info) => info,
148 Err(_) => {
149 return Vec::new();
150 }
151 };
152
153 self.vsize.set(proc_info.pti_virtual_size as i64);
155 self.rss.set(proc_info.pti_resident_size as i64);
156
157 let cpu_total_mfs = {
159 let total =
161 (proc_info.pti_total_user + proc_info.pti_total_system) as f64 * clock_tick / 1e9;
162 let past = self.cpu_total.get();
163 self.cpu_total.inc_by((total - past as f64) as u64);
164 self.cpu_total.collect()
165 };
166
167 self.cpu_core_num
168 .set(rw_resource_util::cpu::total_cpu_available() as i64);
169
170 let mut mfs = Vec::with_capacity(4);
172 mfs.extend(cpu_total_mfs);
173 mfs.extend(self.vsize.collect());
174 mfs.extend(self.rss.collect());
175 mfs.extend(self.cpu_core_num.collect());
176 mfs
177 }
178}
179
180#[cfg(not(any(target_os = "linux", target_os = "macos")))]
181impl Collector for ProcessCollector {
182 fn desc(&self) -> Vec<&Desc> {
183 self.descs.iter().collect()
184 }
185
186 fn collect(&self) -> Vec<proto::MetricFamily> {
187 self.vsize.set(100 * 1000);
189 self.rss.set(100 * 1000);
190
191 let cpu_total_mfs = {
193 self.cpu_total.inc_by(10);
194 self.cpu_total.collect()
195 };
196
197 self.cpu_core_num
198 .set(rw_resource_util::cpu::total_cpu_available() as i64);
199
200 let mut mfs = Vec::with_capacity(4);
202 mfs.extend(cpu_total_mfs);
203 mfs.extend(self.vsize.collect());
204 mfs.extend(self.rss.collect());
205 mfs.extend(self.cpu_core_num.collect());
206 mfs
207 }
208}