risingwave_telemetry_event/
lib.rs1mod util;
18
19use std::env;
20use std::sync::OnceLock;
21
22use prost::Message;
23use risingwave_pb::telemetry::{
24 EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject,
25 TelemetryEventStage as PbTelemetryEventStage,
26};
27use thiserror_ext::AsReport;
28use tokio::sync::mpsc::UnboundedSender;
29pub use util::*;
30
31pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
32
33pub type TelemetryError = String;
35
36pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
37pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new();
38
39pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";
40
41pub const TELEMETRY_RISINGWAVE_CLOUD_UUID: &str = "RISINGWAVE_CLOUD_UUID";
43
44pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
45 env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
46}
47
48pub async fn do_telemetry_event_report(event_stash: &mut Vec<PbEventMessage>) {
49 if event_stash.is_empty() {
50 return;
51 }
52
53 const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
55 let batch_message = PbBatchEventMessage {
56 events: std::mem::take(event_stash),
57 };
58
59 post_telemetry_report_pb(&url, batch_message.encode_to_vec())
60 .await
61 .unwrap_or_else(|e| tracing::debug!("{}", e));
62}
63
64pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; pub fn report_event_common(
68 event_stage: PbTelemetryEventStage,
69 event_name: &str,
70 catalog_id: i64,
71 connector_name: Option<String>,
72 object: Option<PbTelemetryDatabaseObject>,
73 attributes: Option<jsonbb::Value>, node: String,
75) {
76 let event_tracking_id: String;
77 if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
78 event_tracking_id = tracking_id.to_string();
79 } else {
80 tracing::info!("Telemetry tracking_id is not set, event reporting disabled");
81 return;
82 }
83
84 request_to_telemetry_event(
85 event_tracking_id,
86 event_stage,
87 event_name,
88 catalog_id,
89 connector_name,
90 object,
91 attributes,
92 node,
93 false,
94 );
95}
96
97pub fn request_to_telemetry_event(
98 tracking_id: String,
99 event_stage: PbTelemetryEventStage,
100 event_name: &str,
101 catalog_id: i64,
102 connector_name: Option<String>,
103 object: Option<PbTelemetryDatabaseObject>,
104 attributes: Option<jsonbb::Value>, node: String,
106 is_test: bool,
107) {
108 let event = PbEventMessage {
109 tracking_id,
110 event_time_sec: current_timestamp(),
111 event_stage: event_stage as i32,
112 event_name: event_name.to_string(),
113 connector_name,
114 object: object.map(|c| c as i32),
115 catalog_id,
116 attributes: attributes.map(|a| a.to_string()),
117 node,
118 is_test,
119 };
120
121 if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() {
122 let _ = tx.send(event).inspect_err(|e| {
123 tracing::warn!("Failed to send telemetry event queue: {}", e.as_report())
124 });
125 }
126}