risingwave_common/telemetry/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod manager;
16pub mod pb_compatible;
17pub mod report;
18
19use std::env;
20
21use risingwave_pb::telemetry::PbTelemetryClusterType;
22pub use risingwave_telemetry_event::{
23    TelemetryError, TelemetryResult, current_timestamp, post_telemetry_report_pb,
24    report_event_common, request_to_telemetry_event,
25};
26use serde::{Deserialize, Serialize};
27use sysinfo::System;
28
29use crate::RW_VERSION;
30use crate::util::env_var::env_var_is_true_or;
31use crate::util::resource_util::cpu::total_cpu_available;
32use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes};
33
34type Result<T> = core::result::Result<T, TelemetryError>;
35
36pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE";
37pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWave Cloud
38pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes";
39pub const TELEMETRY_CLUSTER_TYPE_SINGLE_NODE: &str = "single-node";
40pub const TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE: &str = "docker-compose";
41const TELEMETRY_CLUSTER_TYPE_TEST: &str = "test";
42
43pub use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid;
44
45pub fn telemetry_cluster_type_from_env_var() -> TelemetryResult<PbTelemetryClusterType> {
46    let cluster_type = match env::var(TELEMETRY_CLUSTER_TYPE) {
47        Ok(cluster_type) => cluster_type,
48        Err(_) => return Ok(PbTelemetryClusterType::Unspecified),
49    };
50    match cluster_type.as_str() {
51        TELEMETRY_CLUSTER_TYPE_HOSTED => Ok(PbTelemetryClusterType::CloudHosted),
52        TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE => Ok(PbTelemetryClusterType::DockerCompose),
53        TELEMETRY_CLUSTER_TYPE_KUBERNETES => Ok(PbTelemetryClusterType::Kubernetes),
54        TELEMETRY_CLUSTER_TYPE_SINGLE_NODE => Ok(PbTelemetryClusterType::SingleNode),
55
56        // block the report if the cluster is in test env
57        // but it only blocks the report from meta node, not other nodes
58        TELEMETRY_CLUSTER_TYPE_TEST => Err(TelemetryError::from(
59            "test cluster type should not send telemetry report",
60        )),
61        _ => Err(TelemetryError::from("invalid cluster type")),
62    }
63}
64
65/// Url of telemetry backend
66pub use risingwave_telemetry_event::TELEMETRY_REPORT_URL;
67/// Telemetry reporting interval in seconds, 6 hours
68pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60;
69
70/// Environment Variable that is default to be true
71const TELEMETRY_ENV_ENABLE: &str = "ENABLE_TELEMETRY";
72
73#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
74pub enum TelemetryNodeType {
75    Meta,
76    Compute,
77    Frontend,
78    Compactor,
79}
80
81#[derive(Debug, Serialize, Deserialize)]
82pub struct TelemetryReportBase {
83    /// `tracking_id` is persistent in metastore
84    pub tracking_id: String,
85    /// `session_id` is reset every time node restarts
86    pub session_id: String,
87    /// `system_data` is hardware and os info
88    pub system_data: SystemData,
89    /// `up_time` is how long the node has been running
90    pub up_time: u64,
91    /// `time_stamp` is when the report is created
92    pub time_stamp: u64,
93    /// `node_type` is the node that creates the report
94    pub node_type: TelemetryNodeType,
95    /// `is_test` is whether the report is from a test environment, default to be false
96    /// needed in CI for compatible tests with telemetry backend
97    pub is_test: bool,
98}
99
100pub trait TelemetryReport: Serialize {}
101
102#[derive(Debug, Serialize, Deserialize)]
103pub struct SystemData {
104    memory: Memory,
105    os: Os,
106    cpu: Cpu,
107}
108
109#[derive(Debug, Serialize, Deserialize)]
110struct Memory {
111    used: usize,
112    total: usize,
113}
114
115#[derive(Debug, Serialize, Deserialize)]
116struct Os {
117    name: String,
118    kernel_version: String,
119    version: String,
120}
121
122#[derive(Debug, Serialize, Deserialize)]
123struct Cpu {
124    // total number of cpu available as a float
125    available: f32,
126}
127
128impl SystemData {
129    pub fn new() -> Self {
130        let memory = {
131            let total = system_memory_available_bytes();
132            let used = total_memory_used_bytes();
133            Memory { used, total }
134        };
135
136        let os = Os {
137            name: System::name().unwrap_or_default(),
138            kernel_version: System::kernel_version().unwrap_or_default(),
139            version: System::os_version().unwrap_or_default(),
140        };
141
142        let cpu = Cpu {
143            available: total_cpu_available(),
144        };
145
146        SystemData { memory, os, cpu }
147    }
148}
149
150impl Default for SystemData {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156/// check whether telemetry is enabled in environment variable
157pub fn telemetry_env_enabled() -> bool {
158    // default to be true
159    env_var_is_true_or(TELEMETRY_ENV_ENABLE, true)
160}
161
162pub fn report_scarf_enabled() -> bool {
163    telemetry_env_enabled()
164        && !matches!(
165            telemetry_cluster_type_from_env_var(),
166            Ok(PbTelemetryClusterType::CloudHosted)
167        )
168}
169
170// impl logic to report to Scarf service, containing RW version and deployment platform
171pub async fn report_to_scarf() {
172    let request_url = format!(
173        "https://risingwave.gateway.scarf.sh/telemetry/{}/{}",
174        RW_VERSION,
175        System::name().unwrap_or_default()
176    );
177    // keep trying every 1h until success
178    loop {
179        let res = reqwest::get(&request_url).await;
180        if let Ok(res) = res {
181            if res.status().is_success() {
182                break;
183            }
184        }
185        tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await;
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_enable_scarf() {
195        unsafe { std::env::set_var(TELEMETRY_ENV_ENABLE, "true") };
196
197        // setting env var to `Hosted` should disable scarf
198        unsafe { std::env::set_var(TELEMETRY_CLUSTER_TYPE, TELEMETRY_CLUSTER_TYPE_HOSTED) };
199        assert!(!report_scarf_enabled());
200
201        // setting env var to `DockerCompose` should enable scarf
202        unsafe {
203            std::env::set_var(
204                TELEMETRY_CLUSTER_TYPE,
205                TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE,
206            )
207        };
208        assert!(report_scarf_enabled());
209    }
210
211    #[test]
212    fn test_system_data_new() {
213        let system_data = SystemData::new();
214
215        assert!(system_data.memory.used > 0);
216        assert!(system_data.memory.total > 0);
217        assert!(!system_data.os.name.is_empty());
218        assert!(!system_data.os.kernel_version.is_empty());
219        assert!(!system_data.os.version.is_empty());
220        assert!(system_data.cpu.available > 0.0);
221    }
222
223    #[test]
224    fn test_env() {
225        let key = "ENABLE_TELEMETRY";
226
227        // make assertions more readable...
228        fn is_enabled() -> bool {
229            telemetry_env_enabled()
230        }
231        fn is_not_enabled() -> bool {
232            !is_enabled()
233        }
234
235        unsafe { std::env::set_var(key, "true") };
236        assert!(is_enabled());
237
238        unsafe { std::env::set_var(key, "false") };
239        assert!(is_not_enabled());
240
241        unsafe { std::env::set_var(key, "tRue") };
242        assert!(is_enabled());
243
244        unsafe { std::env::set_var(key, "2") };
245        assert!(is_not_enabled());
246
247        unsafe { std::env::set_var(key, "1") };
248        assert!(is_enabled());
249
250        unsafe { std::env::set_var(key, "not_a_bool") };
251        assert!(is_not_enabled());
252
253        unsafe { std::env::remove_var(key) };
254        assert!(is_enabled());
255    }
256}