risingwave_common/telemetry/
mod.rs1pub mod manager;
16pub mod pb_compatible;
17pub mod report;
18
19use std::env;
20
21use risingwave_pb::telemetry::PbTelemetryClusterType;
22pub use risingwave_telemetry_event::{
23 TelemetryError, TelemetryResult, current_timestamp, post_telemetry_report_pb,
24 report_event_common, request_to_telemetry_event,
25};
26use serde::{Deserialize, Serialize};
27use sysinfo::System;
28
29use crate::RW_VERSION;
30use crate::util::env_var::env_var_is_true_or;
31use crate::util::resource_util::cpu::total_cpu_available;
32use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes};
33
34type Result<T> = core::result::Result<T, TelemetryError>;
35
36pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE";
37pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes";
39pub const TELEMETRY_CLUSTER_TYPE_SINGLE_NODE: &str = "single-node";
40pub const TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE: &str = "docker-compose";
41const TELEMETRY_CLUSTER_TYPE_TEST: &str = "test";
42
43pub use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid;
44
45pub fn telemetry_cluster_type_from_env_var() -> TelemetryResult<PbTelemetryClusterType> {
46 let cluster_type = match env::var(TELEMETRY_CLUSTER_TYPE) {
47 Ok(cluster_type) => cluster_type,
48 Err(_) => return Ok(PbTelemetryClusterType::Unspecified),
49 };
50 match cluster_type.as_str() {
51 TELEMETRY_CLUSTER_TYPE_HOSTED => Ok(PbTelemetryClusterType::CloudHosted),
52 TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE => Ok(PbTelemetryClusterType::DockerCompose),
53 TELEMETRY_CLUSTER_TYPE_KUBERNETES => Ok(PbTelemetryClusterType::Kubernetes),
54 TELEMETRY_CLUSTER_TYPE_SINGLE_NODE => Ok(PbTelemetryClusterType::SingleNode),
55
56 TELEMETRY_CLUSTER_TYPE_TEST => Err(TelemetryError::from(
59 "test cluster type should not send telemetry report",
60 )),
61 _ => Err(TelemetryError::from("invalid cluster type")),
62 }
63}
64
65pub use risingwave_telemetry_event::TELEMETRY_REPORT_URL;
67pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60;
69
70const TELEMETRY_ENV_ENABLE: &str = "ENABLE_TELEMETRY";
72
73#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
74pub enum TelemetryNodeType {
75 Meta,
76 Compute,
77 Frontend,
78 Compactor,
79}
80
81#[derive(Debug, Serialize, Deserialize)]
82pub struct TelemetryReportBase {
83 pub tracking_id: String,
85 pub session_id: String,
87 pub system_data: SystemData,
89 pub up_time: u64,
91 pub time_stamp: u64,
93 pub node_type: TelemetryNodeType,
95 pub is_test: bool,
98}
99
100pub trait TelemetryReport: Serialize {}
101
102#[derive(Debug, Serialize, Deserialize)]
103pub struct SystemData {
104 memory: Memory,
105 os: Os,
106 cpu: Cpu,
107}
108
109#[derive(Debug, Serialize, Deserialize)]
110struct Memory {
111 used: usize,
112 total: usize,
113}
114
115#[derive(Debug, Serialize, Deserialize)]
116struct Os {
117 name: String,
118 kernel_version: String,
119 version: String,
120}
121
122#[derive(Debug, Serialize, Deserialize)]
123struct Cpu {
124 available: f32,
126}
127
128impl SystemData {
129 pub fn new() -> Self {
130 let memory = {
131 let total = system_memory_available_bytes();
132 let used = total_memory_used_bytes();
133 Memory { used, total }
134 };
135
136 let os = Os {
137 name: System::name().unwrap_or_default(),
138 kernel_version: System::kernel_version().unwrap_or_default(),
139 version: System::os_version().unwrap_or_default(),
140 };
141
142 let cpu = Cpu {
143 available: total_cpu_available(),
144 };
145
146 SystemData { memory, os, cpu }
147 }
148}
149
150impl Default for SystemData {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156pub fn telemetry_env_enabled() -> bool {
158 env_var_is_true_or(TELEMETRY_ENV_ENABLE, true)
160}
161
162pub fn report_scarf_enabled() -> bool {
163 telemetry_env_enabled()
164 && !matches!(
165 telemetry_cluster_type_from_env_var(),
166 Ok(PbTelemetryClusterType::CloudHosted)
167 )
168}
169
170pub async fn report_to_scarf() {
172 let request_url = format!(
173 "https://risingwave.gateway.scarf.sh/telemetry/{}/{}",
174 RW_VERSION,
175 System::name().unwrap_or_default()
176 );
177 loop {
179 let res = reqwest::get(&request_url).await;
180 if let Ok(res) = res {
181 if res.status().is_success() {
182 break;
183 }
184 }
185 tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await;
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn test_enable_scarf() {
195 unsafe { std::env::set_var(TELEMETRY_ENV_ENABLE, "true") };
196
197 unsafe { std::env::set_var(TELEMETRY_CLUSTER_TYPE, TELEMETRY_CLUSTER_TYPE_HOSTED) };
199 assert!(!report_scarf_enabled());
200
201 unsafe {
203 std::env::set_var(
204 TELEMETRY_CLUSTER_TYPE,
205 TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE,
206 )
207 };
208 assert!(report_scarf_enabled());
209 }
210
211 #[test]
212 fn test_system_data_new() {
213 let system_data = SystemData::new();
214
215 assert!(system_data.memory.used > 0);
216 assert!(system_data.memory.total > 0);
217 assert!(!system_data.os.name.is_empty());
218 assert!(!system_data.os.kernel_version.is_empty());
219 assert!(!system_data.os.version.is_empty());
220 assert!(system_data.cpu.available > 0.0);
221 }
222
223 #[test]
224 fn test_env() {
225 let key = "ENABLE_TELEMETRY";
226
227 fn is_enabled() -> bool {
229 telemetry_env_enabled()
230 }
231 fn is_not_enabled() -> bool {
232 !is_enabled()
233 }
234
235 unsafe { std::env::set_var(key, "true") };
236 assert!(is_enabled());
237
238 unsafe { std::env::set_var(key, "false") };
239 assert!(is_not_enabled());
240
241 unsafe { std::env::set_var(key, "tRue") };
242 assert!(is_enabled());
243
244 unsafe { std::env::set_var(key, "2") };
245 assert!(is_not_enabled());
246
247 unsafe { std::env::set_var(key, "1") };
248 assert!(is_enabled());
249
250 unsafe { std::env::set_var(key, "not_a_bool") };
251 assert!(is_not_enabled());
252
253 unsafe { std::env::remove_var(key) };
254 assert!(is_enabled());
255 }
256}