risingwave_common_metrics/monitor/
process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::core::{Collector, Desc};
use prometheus::{proto, IntCounter, IntGauge, Opts, Registry};

#[cfg(target_os = "linux")]
use super::{CLOCK_TICK, PAGESIZE};

/// Monitors current process.
pub fn monitor_process(registry: &Registry) {
    let pc = ProcessCollector::new();
    registry.register(Box::new(pc)).unwrap()
}

/// A collector to collect process metrics.
struct ProcessCollector {
    descs: Vec<Desc>,
    cpu_total: IntCounter,
    vsize: IntGauge,
    rss: IntGauge,
    cpu_core_num: IntGauge,
}

impl Default for ProcessCollector {
    fn default() -> Self {
        Self::new()
    }
}

impl ProcessCollector {
    fn new() -> Self {
        let mut descs = Vec::new();

        let cpu_total = IntCounter::with_opts(Opts::new(
            "process_cpu_seconds_total",
            "Total user and system CPU time spent in \
                 seconds.",
        ))
        .unwrap();
        descs.extend(cpu_total.desc().into_iter().cloned());

        let vsize = IntGauge::with_opts(Opts::new(
            "process_virtual_memory_bytes",
            "Virtual memory size in bytes.",
        ))
        .unwrap();
        descs.extend(vsize.desc().into_iter().cloned());

        let rss = IntGauge::with_opts(Opts::new(
            "process_resident_memory_bytes",
            "Resident memory size in bytes.",
        ))
        .unwrap();
        descs.extend(rss.desc().into_iter().cloned());

        let cpu_core_num =
            IntGauge::with_opts(Opts::new("process_cpu_core_num", "Cpu core num.")).unwrap();
        descs.extend(cpu_core_num.desc().into_iter().cloned());

        Self {
            descs,
            cpu_total,
            vsize,
            rss,
            cpu_core_num,
        }
    }
}

#[cfg(target_os = "linux")]
impl Collector for ProcessCollector {
    fn desc(&self) -> Vec<&Desc> {
        self.descs.iter().collect()
    }

    fn collect(&self) -> Vec<proto::MetricFamily> {
        let p = match procfs::process::Process::myself() {
            Ok(p) => p,
            Err(..) => {
                // we can't construct a Process object, so there's no stats to gather
                return Vec::new();
            }
        };
        let stat = match p.stat() {
            Ok(stat) => stat,
            Err(..) => {
                // we can't get the stat, so there's no stats to gather
                return Vec::new();
            }
        };

        // memory
        self.vsize.set(stat.vsize as i64);
        self.rss.set(stat.rss as i64 * *PAGESIZE);

        // cpu
        let cpu_total_mfs = {
            let total = (stat.utime + stat.stime) / *CLOCK_TICK;
            let past = self.cpu_total.get();
            self.cpu_total.inc_by(total - past);
            self.cpu_total.collect()
        };

        self.cpu_core_num
            .set(rw_resource_util::cpu::total_cpu_available() as i64);

        // collect MetricFamilies.
        let mut mfs = Vec::with_capacity(4);
        mfs.extend(cpu_total_mfs);
        mfs.extend(self.vsize.collect());
        mfs.extend(self.rss.collect());
        mfs.extend(self.cpu_core_num.collect());
        mfs
    }
}

#[cfg(target_os = "macos")]
impl Collector for ProcessCollector {
    fn desc(&self) -> Vec<&Desc> {
        self.descs.iter().collect()
    }

    fn collect(&self) -> Vec<proto::MetricFamily> {
        let pid = unsafe { libc::getpid() };
        let clock_tick = unsafe {
            let mut info = mach2::mach_time::mach_timebase_info::default();
            let errno = mach2::mach_time::mach_timebase_info(&mut info as *mut _);
            if errno != 0 {
                1_f64
            } else {
                (info.numer / info.denom) as f64
            }
        };
        let proc_info = match darwin_libproc::task_info(pid) {
            Ok(info) => info,
            Err(_) => {
                return Vec::new();
            }
        };

        // memory
        self.vsize.set(proc_info.pti_virtual_size as i64);
        self.rss.set(proc_info.pti_resident_size as i64);

        // cpu
        let cpu_total_mfs = {
            // both pti_total_user and pti_total_system are returned in nano seconds
            let total =
                (proc_info.pti_total_user + proc_info.pti_total_system) as f64 * clock_tick / 1e9;
            let past = self.cpu_total.get();
            self.cpu_total.inc_by((total - past as f64) as u64);
            self.cpu_total.collect()
        };

        self.cpu_core_num
            .set(rw_resource_util::cpu::total_cpu_available() as i64);

        // collect MetricFamilies.
        let mut mfs = Vec::with_capacity(4);
        mfs.extend(cpu_total_mfs);
        mfs.extend(self.vsize.collect());
        mfs.extend(self.rss.collect());
        mfs.extend(self.cpu_core_num.collect());
        mfs
    }
}

#[cfg(not(any(target_os = "linux", target_os = "macos")))]
impl Collector for ProcessCollector {
    fn desc(&self) -> Vec<&Desc> {
        self.descs.iter().collect()
    }

    fn collect(&self) -> Vec<proto::MetricFamily> {
        // fake number
        self.vsize.set(100 * 1000);
        self.rss.set(100 * 1000);

        // cpu
        let cpu_total_mfs = {
            self.cpu_total.inc_by(10);
            self.cpu_total.collect()
        };

        self.cpu_core_num
            .set(rw_resource_util::cpu::total_cpu_available() as i64);

        // collect MetricFamilies.
        let mut mfs = Vec::with_capacity(4);
        mfs.extend(cpu_total_mfs);
        mfs.extend(self.vsize.collect());
        mfs.extend(self.rss.collect());
        mfs.extend(self.cpu_core_num.collect());
        mfs
    }
}