risingwave_common/telemetry/
mod.rs1pub mod manager;
16pub mod pb_compatible;
17pub mod report;
18
19use std::env;
20
21use risingwave_pb::telemetry::PbTelemetryClusterType;
22pub use risingwave_telemetry_event::{
23 TelemetryError, TelemetryResult, current_timestamp, post_telemetry_report_pb,
24 report_event_common, request_to_telemetry_event,
25};
26use serde::{Deserialize, Serialize};
27use sysinfo::System;
28
29use crate::RW_VERSION;
30use crate::util::env_var::env_var_is_true_or;
31use crate::util::resource_util::cpu::total_cpu_available;
32use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes};
33
34type Result<T> = core::result::Result<T, TelemetryError>;
35
36pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE";
37pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes";
39pub const TELEMETRY_CLUSTER_TYPE_SINGLE_NODE: &str = "single-node";
40pub const TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE: &str = "docker-compose";
41const TELEMETRY_CLUSTER_TYPE_TEST: &str = "test";
42pub use risingwave_telemetry_event::{
43 TELEMETRY_RISINGWAVE_CLOUD_UUID, get_telemetry_risingwave_cloud_uuid,
44};
45
46pub fn telemetry_cluster_type_from_env_var() -> TelemetryResult<PbTelemetryClusterType> {
47 let cluster_type = match env::var(TELEMETRY_CLUSTER_TYPE) {
48 Ok(cluster_type) => cluster_type,
49 Err(_) => return Ok(PbTelemetryClusterType::Unspecified),
50 };
51 match cluster_type.as_str() {
52 TELEMETRY_CLUSTER_TYPE_HOSTED => Ok(PbTelemetryClusterType::CloudHosted),
53 TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE => Ok(PbTelemetryClusterType::DockerCompose),
54 TELEMETRY_CLUSTER_TYPE_KUBERNETES => Ok(PbTelemetryClusterType::Kubernetes),
55 TELEMETRY_CLUSTER_TYPE_SINGLE_NODE => Ok(PbTelemetryClusterType::SingleNode),
56
57 TELEMETRY_CLUSTER_TYPE_TEST => Err(TelemetryError::from(
60 "test cluster type should not send telemetry report",
61 )),
62 _ => Err(TelemetryError::from("invalid cluster type")),
63 }
64}
65
66pub use risingwave_telemetry_event::TELEMETRY_REPORT_URL;
68pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60;
70
71const TELEMETRY_ENV_ENABLE: &str = "ENABLE_TELEMETRY";
73
74#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
75pub enum TelemetryNodeType {
76 Meta,
77 Compute,
78 Frontend,
79 Compactor,
80}
81
82#[derive(Debug, Serialize, Deserialize)]
83pub struct TelemetryReportBase {
84 pub tracking_id: String,
86 pub session_id: String,
88 pub system_data: SystemData,
90 pub up_time: u64,
92 pub time_stamp: u64,
94 pub node_type: TelemetryNodeType,
96 pub is_test: bool,
99}
100
101pub trait TelemetryReport: Serialize {}
102
103#[derive(Debug, Serialize, Deserialize)]
104pub struct SystemData {
105 memory: Memory,
106 os: Os,
107 cpu: Cpu,
108}
109
110#[derive(Debug, Serialize, Deserialize)]
111struct Memory {
112 used: usize,
113 total: usize,
114}
115
116#[derive(Debug, Serialize, Deserialize)]
117struct Os {
118 name: String,
119 kernel_version: String,
120 version: String,
121}
122
123#[derive(Debug, Serialize, Deserialize)]
124struct Cpu {
125 available: f32,
127}
128
129impl SystemData {
130 pub fn new() -> Self {
131 let memory = {
132 let total = system_memory_available_bytes();
133 let used = total_memory_used_bytes();
134 Memory { used, total }
135 };
136
137 let os = Os {
138 name: System::name().unwrap_or_default(),
139 kernel_version: System::kernel_version().unwrap_or_default(),
140 version: System::os_version().unwrap_or_default(),
141 };
142
143 let cpu = Cpu {
144 available: total_cpu_available(),
145 };
146
147 SystemData { memory, os, cpu }
148 }
149}
150
151impl Default for SystemData {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157pub fn telemetry_env_enabled() -> bool {
159 env_var_is_true_or(TELEMETRY_ENV_ENABLE, true)
161}
162
163pub fn report_scarf_enabled() -> bool {
164 telemetry_env_enabled()
165 && !matches!(
166 telemetry_cluster_type_from_env_var(),
167 Ok(PbTelemetryClusterType::CloudHosted)
168 )
169}
170
171pub async fn report_to_scarf() {
173 let request_url = format!(
174 "https://risingwave.gateway.scarf.sh/telemetry/{}/{}",
175 RW_VERSION,
176 System::name().unwrap_or_default()
177 );
178 loop {
180 let res = reqwest::get(&request_url).await;
181 if let Ok(res) = res {
182 if res.status().is_success() {
183 break;
184 }
185 }
186 tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await;
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn test_enable_scarf() {
196 unsafe { std::env::set_var(TELEMETRY_ENV_ENABLE, "true") };
197
198 unsafe { std::env::set_var(TELEMETRY_CLUSTER_TYPE, TELEMETRY_CLUSTER_TYPE_HOSTED) };
200 assert!(!report_scarf_enabled());
201
202 unsafe {
204 std::env::set_var(
205 TELEMETRY_CLUSTER_TYPE,
206 TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE,
207 )
208 };
209 assert!(report_scarf_enabled());
210 }
211
212 #[test]
213 fn test_system_data_new() {
214 let system_data = SystemData::new();
215
216 assert!(system_data.memory.used > 0);
217 assert!(system_data.memory.total > 0);
218 assert!(!system_data.os.name.is_empty());
219 assert!(!system_data.os.kernel_version.is_empty());
220 assert!(!system_data.os.version.is_empty());
221 assert!(system_data.cpu.available > 0.0);
222 }
223
224 #[test]
225 fn test_env() {
226 let key = "ENABLE_TELEMETRY";
227
228 fn is_enabled() -> bool {
230 telemetry_env_enabled()
231 }
232 fn is_not_enabled() -> bool {
233 !is_enabled()
234 }
235
236 unsafe { std::env::set_var(key, "true") };
237 assert!(is_enabled());
238
239 unsafe { std::env::set_var(key, "false") };
240 assert!(is_not_enabled());
241
242 unsafe { std::env::set_var(key, "tRue") };
243 assert!(is_enabled());
244
245 unsafe { std::env::set_var(key, "2") };
246 assert!(is_not_enabled());
247
248 unsafe { std::env::set_var(key, "1") };
249 assert!(is_enabled());
250
251 unsafe { std::env::set_var(key, "not_a_bool") };
252 assert!(is_not_enabled());
253
254 unsafe { std::env::remove_var(key) };
255 assert!(is_enabled());
256 }
257}