risingwave_telemetry_event/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/// Move the Telemetry's Event Report functions here
16/// Keep the stats report module in the common/ module
17mod util;
18
19use std::env;
20use std::sync::OnceLock;
21
22use prost::Message;
23use risingwave_pb::telemetry::{
24    EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject,
25    TelemetryEventStage as PbTelemetryEventStage,
26};
27use thiserror_ext::AsReport;
28use tokio::sync::mpsc::UnboundedSender;
29pub use util::*;
30
31pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
32
33/// Telemetry errors are generally recoverable/ignorable. `String` is good enough.
34pub type TelemetryError = String;
35
36pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
37pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new();
38
39pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";
40
41// the UUID of the RisingWave Cloud (if the cluster is hosted on RisingWave Cloud)
42pub const TELEMETRY_RISINGWAVE_CLOUD_UUID: &str = "RISINGWAVE_CLOUD_UUID";
43
44pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
45    env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
46}
47
48pub async fn do_telemetry_event_report(event_stash: &mut Vec<PbEventMessage>) {
49    if event_stash.is_empty() {
50        return;
51    }
52
53    const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; // the batch report url
54    let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
55    let batch_message = PbBatchEventMessage {
56        events: std::mem::take(event_stash),
57    };
58
59    post_telemetry_report_pb(&url, batch_message.encode_to_vec())
60        .await
61        .unwrap_or_else(|e| tracing::debug!("{}", e));
62}
63
64pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds
65pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; // 100 events to trigger a report action
66
67pub fn report_event_common(
68    event_stage: PbTelemetryEventStage,
69    event_name: &str,
70    catalog_id: i64,
71    connector_name: Option<String>,
72    object: Option<PbTelemetryDatabaseObject>,
73    attributes: Option<jsonbb::Value>, // any json string
74    node: String,
75) {
76    let event_tracking_id: String;
77    if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
78        event_tracking_id = tracking_id.to_string();
79    } else {
80        tracing::info!("Telemetry tracking_id is not set, event reporting disabled");
81        return;
82    }
83
84    request_to_telemetry_event(
85        event_tracking_id,
86        event_stage,
87        event_name,
88        catalog_id,
89        connector_name,
90        object,
91        attributes,
92        node,
93        false,
94    );
95}
96
97pub fn request_to_telemetry_event(
98    tracking_id: String,
99    event_stage: PbTelemetryEventStage,
100    event_name: &str,
101    catalog_id: i64,
102    connector_name: Option<String>,
103    object: Option<PbTelemetryDatabaseObject>,
104    attributes: Option<jsonbb::Value>, // any json string
105    node: String,
106    is_test: bool,
107) {
108    let event = PbEventMessage {
109        tracking_id,
110        event_time_sec: current_timestamp(),
111        event_stage: event_stage as i32,
112        event_name: event_name.to_string(),
113        connector_name,
114        object: object.map(|c| c as i32),
115        catalog_id,
116        attributes: attributes.map(|a| a.to_string()),
117        node,
118        is_test,
119    };
120
121    if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() {
122        let _ = tx.send(event).inspect_err(|e| {
123            tracing::warn!("Failed to send telemetry event queue: {}", e.as_report())
124        });
125    }
126}