risingwave_telemetry_event/
lib.rs1mod util;
18
19use std::env;
20use std::sync::{LazyLock, OnceLock};
21
22use prost::Message;
23use risingwave_common_log::LogSuppressor;
24use risingwave_pb::telemetry::{
25 EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject,
26 TelemetryEventStage as PbTelemetryEventStage,
27};
28use thiserror_ext::AsReport;
29use tokio::sync::mpsc::UnboundedSender;
30pub use util::*;
31
32pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
33
34pub type TelemetryError = String;
36
37pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
38pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new();
39
40pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";
41
42pub const TELEMETRY_RISINGWAVE_CLOUD_UUID: &str = "RISINGWAVE_CLOUD_UUID";
44
45pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
46 env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
47}
48
49pub async fn do_telemetry_event_report(event_stash: &mut Vec<PbEventMessage>) {
50 if event_stash.is_empty() {
51 return;
52 }
53
54 const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
56 let batch_message = PbBatchEventMessage {
57 events: std::mem::take(event_stash),
58 };
59
60 post_telemetry_report_pb(&url, batch_message.encode_to_vec())
61 .await
62 .unwrap_or_else(|e| tracing::debug!("{}", e));
63}
64
65pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; pub fn report_event_common(
69 event_stage: PbTelemetryEventStage,
70 event_name: &str,
71 catalog_id: i64,
72 connector_name: Option<String>,
73 object: Option<PbTelemetryDatabaseObject>,
74 attributes: Option<jsonbb::Value>, node: String,
76) {
77 let event_tracking_id: String;
78 if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
79 event_tracking_id = tracking_id.to_string();
80 } else {
81 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
82 LazyLock::new(|| LogSuppressor::per_minute(1));
83 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
84 tracing::info!(
85 suppressed_count,
86 event_name,
87 "Telemetry tracking_id is not set, event reporting disabled"
88 );
89 }
90 return;
91 }
92
93 request_to_telemetry_event(
94 event_tracking_id,
95 event_stage,
96 event_name,
97 catalog_id,
98 connector_name,
99 object,
100 attributes,
101 node,
102 false,
103 );
104}
105
106pub fn request_to_telemetry_event(
107 tracking_id: String,
108 event_stage: PbTelemetryEventStage,
109 event_name: &str,
110 catalog_id: i64,
111 connector_name: Option<String>,
112 object: Option<PbTelemetryDatabaseObject>,
113 attributes: Option<jsonbb::Value>, node: String,
115 is_test: bool,
116) {
117 let event = PbEventMessage {
118 tracking_id,
119 event_time_sec: current_timestamp(),
120 event_stage: event_stage as i32,
121 event_name: event_name.to_string(),
122 connector_name,
123 object: object.map(|c| c as i32),
124 catalog_id,
125 attributes: attributes.map(|a| a.to_string()),
126 node,
127 is_test,
128 };
129
130 if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() {
131 let _ = tx.send(event).inspect_err(|e| {
132 tracing::warn!("Failed to send telemetry event queue: {}", e.as_report())
133 });
134 }
135}