risingwave_telemetry_event/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/// Move the Telemetry's Event Report functions here
16/// Keep the stats report module in the common/ module
17mod util;
18
19use std::env;
20use std::sync::{LazyLock, OnceLock};
21
22use prost::Message;
23use risingwave_common_log::LogSuppressor;
24use risingwave_pb::telemetry::{
25    EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject,
26    TelemetryEventStage as PbTelemetryEventStage,
27};
28use thiserror_ext::AsReport;
29use tokio::sync::mpsc::UnboundedSender;
30pub use util::*;
31
32pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
33
34/// Telemetry errors are generally recoverable/ignorable. `String` is good enough.
35pub type TelemetryError = String;
36
37pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
38pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new();
39
40pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";
41
42// the UUID of the RisingWave Cloud (if the cluster is hosted on RisingWave Cloud)
43pub const TELEMETRY_RISINGWAVE_CLOUD_UUID: &str = "RISINGWAVE_CLOUD_UUID";
44
45pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
46    env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
47}
48
49pub async fn do_telemetry_event_report(event_stash: &mut Vec<PbEventMessage>) {
50    if event_stash.is_empty() {
51        return;
52    }
53
54    const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; // the batch report url
55    let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
56    let batch_message = PbBatchEventMessage {
57        events: std::mem::take(event_stash),
58    };
59
60    post_telemetry_report_pb(&url, batch_message.encode_to_vec())
61        .await
62        .unwrap_or_else(|e| tracing::debug!("{}", e));
63}
64
65pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds
66pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; // 100 events to trigger a report action
67
68pub fn report_event_common(
69    event_stage: PbTelemetryEventStage,
70    event_name: &str,
71    catalog_id: i64,
72    connector_name: Option<String>,
73    object: Option<PbTelemetryDatabaseObject>,
74    attributes: Option<jsonbb::Value>, // any json string
75    node: String,
76) {
77    let event_tracking_id: String;
78    if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
79        event_tracking_id = tracking_id.to_string();
80    } else {
81        static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
82            LazyLock::new(|| LogSuppressor::per_minute(1));
83        if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
84            tracing::info!(
85                suppressed_count,
86                event_name,
87                "Telemetry tracking_id is not set, event reporting disabled"
88            );
89        }
90        return;
91    }
92
93    request_to_telemetry_event(
94        event_tracking_id,
95        event_stage,
96        event_name,
97        catalog_id,
98        connector_name,
99        object,
100        attributes,
101        node,
102        false,
103    );
104}
105
106pub fn request_to_telemetry_event(
107    tracking_id: String,
108    event_stage: PbTelemetryEventStage,
109    event_name: &str,
110    catalog_id: i64,
111    connector_name: Option<String>,
112    object: Option<PbTelemetryDatabaseObject>,
113    attributes: Option<jsonbb::Value>, // any json string
114    node: String,
115    is_test: bool,
116) {
117    let event = PbEventMessage {
118        tracking_id,
119        event_time_sec: current_timestamp(),
120        event_stage: event_stage as i32,
121        event_name: event_name.to_string(),
122        connector_name,
123        object: object.map(|c| c as i32),
124        catalog_id,
125        attributes: attributes.map(|a| a.to_string()),
126        node,
127        is_test,
128    };
129
130    if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() {
131        let _ = tx.send(event).inspect_err(|e| {
132            tracing::warn!("Failed to send telemetry event queue: {}", e.as_report())
133        });
134    }
135}