risingwave_common/telemetry/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod manager;
16pub mod pb_compatible;
17pub mod report;
18
19use std::env;
20
21use risingwave_pb::telemetry::PbTelemetryClusterType;
22pub use risingwave_telemetry_event::{
23    TelemetryError, TelemetryResult, current_timestamp, post_telemetry_report_pb,
24    report_event_common, request_to_telemetry_event,
25};
26use serde::{Deserialize, Serialize};
27use sysinfo::System;
28
29use crate::RW_VERSION;
30use crate::util::env_var::env_var_is_true_or;
31use crate::util::resource_util::cpu::total_cpu_available;
32use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes};
33
34type Result<T> = core::result::Result<T, TelemetryError>;
35
36pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE";
37pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWave Cloud
38pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes";
39pub const TELEMETRY_CLUSTER_TYPE_SINGLE_NODE: &str = "single-node";
40pub const TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE: &str = "docker-compose";
41const TELEMETRY_CLUSTER_TYPE_TEST: &str = "test";
42pub use risingwave_telemetry_event::{
43    TELEMETRY_RISINGWAVE_CLOUD_UUID, get_telemetry_risingwave_cloud_uuid,
44};
45
46pub fn telemetry_cluster_type_from_env_var() -> TelemetryResult<PbTelemetryClusterType> {
47    let cluster_type = match env::var(TELEMETRY_CLUSTER_TYPE) {
48        Ok(cluster_type) => cluster_type,
49        Err(_) => return Ok(PbTelemetryClusterType::Unspecified),
50    };
51    match cluster_type.as_str() {
52        TELEMETRY_CLUSTER_TYPE_HOSTED => Ok(PbTelemetryClusterType::CloudHosted),
53        TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE => Ok(PbTelemetryClusterType::DockerCompose),
54        TELEMETRY_CLUSTER_TYPE_KUBERNETES => Ok(PbTelemetryClusterType::Kubernetes),
55        TELEMETRY_CLUSTER_TYPE_SINGLE_NODE => Ok(PbTelemetryClusterType::SingleNode),
56
57        // block the report if the cluster is in test env
58        // but it only blocks the report from meta node, not other nodes
59        TELEMETRY_CLUSTER_TYPE_TEST => Err(TelemetryError::from(
60            "test cluster type should not send telemetry report",
61        )),
62        _ => Err(TelemetryError::from("invalid cluster type")),
63    }
64}
65
66/// Url of telemetry backend
67pub use risingwave_telemetry_event::TELEMETRY_REPORT_URL;
68/// Telemetry reporting interval in seconds, 6 hours
69pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60;
70
71/// Environment Variable that is default to be true
72const TELEMETRY_ENV_ENABLE: &str = "ENABLE_TELEMETRY";
73
74#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
75pub enum TelemetryNodeType {
76    Meta,
77    Compute,
78    Frontend,
79    Compactor,
80}
81
82#[derive(Debug, Serialize, Deserialize)]
83pub struct TelemetryReportBase {
84    /// `tracking_id` is persistent in metastore
85    pub tracking_id: String,
86    /// `session_id` is reset every time node restarts
87    pub session_id: String,
88    /// `system_data` is hardware and os info
89    pub system_data: SystemData,
90    /// `up_time` is how long the node has been running
91    pub up_time: u64,
92    /// `time_stamp` is when the report is created
93    pub time_stamp: u64,
94    /// `node_type` is the node that creates the report
95    pub node_type: TelemetryNodeType,
96    /// `is_test` is whether the report is from a test environment, default to be false
97    /// needed in CI for compatible tests with telemetry backend
98    pub is_test: bool,
99}
100
101pub trait TelemetryReport: Serialize {}
102
103#[derive(Debug, Serialize, Deserialize)]
104pub struct SystemData {
105    memory: Memory,
106    os: Os,
107    cpu: Cpu,
108}
109
110#[derive(Debug, Serialize, Deserialize)]
111struct Memory {
112    used: usize,
113    total: usize,
114}
115
116#[derive(Debug, Serialize, Deserialize)]
117struct Os {
118    name: String,
119    kernel_version: String,
120    version: String,
121}
122
123#[derive(Debug, Serialize, Deserialize)]
124struct Cpu {
125    // total number of cpu available as a float
126    available: f32,
127}
128
129impl SystemData {
130    pub fn new() -> Self {
131        let memory = {
132            let total = system_memory_available_bytes();
133            let used = total_memory_used_bytes();
134            Memory { used, total }
135        };
136
137        let os = Os {
138            name: System::name().unwrap_or_default(),
139            kernel_version: System::kernel_version().unwrap_or_default(),
140            version: System::os_version().unwrap_or_default(),
141        };
142
143        let cpu = Cpu {
144            available: total_cpu_available(),
145        };
146
147        SystemData { memory, os, cpu }
148    }
149}
150
151impl Default for SystemData {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157/// check whether telemetry is enabled in environment variable
158pub fn telemetry_env_enabled() -> bool {
159    // default to be true
160    env_var_is_true_or(TELEMETRY_ENV_ENABLE, true)
161}
162
163pub fn report_scarf_enabled() -> bool {
164    telemetry_env_enabled()
165        && !matches!(
166            telemetry_cluster_type_from_env_var(),
167            Ok(PbTelemetryClusterType::CloudHosted)
168        )
169}
170
171// impl logic to report to Scarf service, containing RW version and deployment platform
172pub async fn report_to_scarf() {
173    let request_url = format!(
174        "https://risingwave.gateway.scarf.sh/telemetry/{}/{}",
175        RW_VERSION,
176        System::name().unwrap_or_default()
177    );
178    // keep trying every 1h until success
179    loop {
180        let res = reqwest::get(&request_url).await;
181        if let Ok(res) = res {
182            if res.status().is_success() {
183                break;
184            }
185        }
186        tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await;
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn test_enable_scarf() {
196        unsafe { std::env::set_var(TELEMETRY_ENV_ENABLE, "true") };
197
198        // setting env var to `Hosted` should disable scarf
199        unsafe { std::env::set_var(TELEMETRY_CLUSTER_TYPE, TELEMETRY_CLUSTER_TYPE_HOSTED) };
200        assert!(!report_scarf_enabled());
201
202        // setting env var to `DockerCompose` should enable scarf
203        unsafe {
204            std::env::set_var(
205                TELEMETRY_CLUSTER_TYPE,
206                TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE,
207            )
208        };
209        assert!(report_scarf_enabled());
210    }
211
212    #[test]
213    fn test_system_data_new() {
214        let system_data = SystemData::new();
215
216        assert!(system_data.memory.used > 0);
217        assert!(system_data.memory.total > 0);
218        assert!(!system_data.os.name.is_empty());
219        assert!(!system_data.os.kernel_version.is_empty());
220        assert!(!system_data.os.version.is_empty());
221        assert!(system_data.cpu.available > 0.0);
222    }
223
224    #[test]
225    fn test_env() {
226        let key = "ENABLE_TELEMETRY";
227
228        // make assertions more readable...
229        fn is_enabled() -> bool {
230            telemetry_env_enabled()
231        }
232        fn is_not_enabled() -> bool {
233            !is_enabled()
234        }
235
236        unsafe { std::env::set_var(key, "true") };
237        assert!(is_enabled());
238
239        unsafe { std::env::set_var(key, "false") };
240        assert!(is_not_enabled());
241
242        unsafe { std::env::set_var(key, "tRue") };
243        assert!(is_enabled());
244
245        unsafe { std::env::set_var(key, "2") };
246        assert!(is_not_enabled());
247
248        unsafe { std::env::set_var(key, "1") };
249        assert!(is_enabled());
250
251        unsafe { std::env::set_var(key, "not_a_bool") };
252        assert!(is_not_enabled());
253
254        unsafe { std::env::remove_var(key) };
255        assert!(is_enabled());
256    }
257}