risingwave_common_metrics/monitor/
process.rsuse prometheus::core::{Collector, Desc};
use prometheus::{proto, IntCounter, IntGauge, Opts, Registry};
#[cfg(target_os = "linux")]
use super::{CLOCK_TICK, PAGESIZE};
pub fn monitor_process(registry: &Registry) {
let pc = ProcessCollector::new();
registry.register(Box::new(pc)).unwrap()
}
struct ProcessCollector {
descs: Vec<Desc>,
cpu_total: IntCounter,
vsize: IntGauge,
rss: IntGauge,
cpu_core_num: IntGauge,
}
impl Default for ProcessCollector {
fn default() -> Self {
Self::new()
}
}
impl ProcessCollector {
fn new() -> Self {
let mut descs = Vec::new();
let cpu_total = IntCounter::with_opts(Opts::new(
"process_cpu_seconds_total",
"Total user and system CPU time spent in \
seconds.",
))
.unwrap();
descs.extend(cpu_total.desc().into_iter().cloned());
let vsize = IntGauge::with_opts(Opts::new(
"process_virtual_memory_bytes",
"Virtual memory size in bytes.",
))
.unwrap();
descs.extend(vsize.desc().into_iter().cloned());
let rss = IntGauge::with_opts(Opts::new(
"process_resident_memory_bytes",
"Resident memory size in bytes.",
))
.unwrap();
descs.extend(rss.desc().into_iter().cloned());
let cpu_core_num =
IntGauge::with_opts(Opts::new("process_cpu_core_num", "Cpu core num.")).unwrap();
descs.extend(cpu_core_num.desc().into_iter().cloned());
Self {
descs,
cpu_total,
vsize,
rss,
cpu_core_num,
}
}
}
#[cfg(target_os = "linux")]
impl Collector for ProcessCollector {
fn desc(&self) -> Vec<&Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<proto::MetricFamily> {
let p = match procfs::process::Process::myself() {
Ok(p) => p,
Err(..) => {
return Vec::new();
}
};
let stat = match p.stat() {
Ok(stat) => stat,
Err(..) => {
return Vec::new();
}
};
self.vsize.set(stat.vsize as i64);
self.rss.set(stat.rss as i64 * *PAGESIZE);
let cpu_total_mfs = {
let total = (stat.utime + stat.stime) / *CLOCK_TICK;
let past = self.cpu_total.get();
self.cpu_total.inc_by(total - past);
self.cpu_total.collect()
};
self.cpu_core_num
.set(rw_resource_util::cpu::total_cpu_available() as i64);
let mut mfs = Vec::with_capacity(4);
mfs.extend(cpu_total_mfs);
mfs.extend(self.vsize.collect());
mfs.extend(self.rss.collect());
mfs.extend(self.cpu_core_num.collect());
mfs
}
}
#[cfg(target_os = "macos")]
impl Collector for ProcessCollector {
fn desc(&self) -> Vec<&Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<proto::MetricFamily> {
let pid = unsafe { libc::getpid() };
let clock_tick = unsafe {
let mut info = mach2::mach_time::mach_timebase_info::default();
let errno = mach2::mach_time::mach_timebase_info(&mut info as *mut _);
if errno != 0 {
1_f64
} else {
(info.numer / info.denom) as f64
}
};
let proc_info = match darwin_libproc::task_info(pid) {
Ok(info) => info,
Err(_) => {
return Vec::new();
}
};
self.vsize.set(proc_info.pti_virtual_size as i64);
self.rss.set(proc_info.pti_resident_size as i64);
let cpu_total_mfs = {
let total =
(proc_info.pti_total_user + proc_info.pti_total_system) as f64 * clock_tick / 1e9;
let past = self.cpu_total.get();
self.cpu_total.inc_by((total - past as f64) as u64);
self.cpu_total.collect()
};
self.cpu_core_num
.set(rw_resource_util::cpu::total_cpu_available() as i64);
let mut mfs = Vec::with_capacity(4);
mfs.extend(cpu_total_mfs);
mfs.extend(self.vsize.collect());
mfs.extend(self.rss.collect());
mfs.extend(self.cpu_core_num.collect());
mfs
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
impl Collector for ProcessCollector {
fn desc(&self) -> Vec<&Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<proto::MetricFamily> {
self.vsize.set(100 * 1000);
self.rss.set(100 * 1000);
let cpu_total_mfs = {
self.cpu_total.inc_by(10);
self.cpu_total.collect()
};
self.cpu_core_num
.set(rw_resource_util::cpu::total_cpu_available() as i64);
let mut mfs = Vec::with_capacity(4);
mfs.extend(cpu_total_mfs);
mfs.extend(self.vsize.collect());
mfs.extend(self.rss.collect());
mfs.extend(self.cpu_core_num.collect());
mfs
}
}