risingwave_telemetry_event/
lib.rsmod util;
use std::env;
use std::sync::OnceLock;
use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject,
TelemetryEventStage as PbTelemetryEventStage,
};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedSender;
pub use util::*;
pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
pub type TelemetryError = String;
pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new();
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";
pub const TELEMETRY_RISINGWAVE_CLOUD_UUID: &str = "RISINGWAVE_CLOUD_UUID";
pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
}
pub async fn do_telemetry_event_report(event_stash: &mut Vec<PbEventMessage>) {
if event_stash.is_empty() {
return;
}
const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
let batch_message = PbBatchEventMessage {
events: std::mem::take(event_stash),
};
post_telemetry_report_pb(&url, batch_message.encode_to_vec())
.await
.unwrap_or_else(|e| tracing::debug!("{}", e));
}
pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; pub fn report_event_common(
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, node: String,
) {
let event_tracking_id: String;
if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
event_tracking_id = tracking_id.to_string();
} else {
tracing::info!("Telemetry tracking_id is not set, event reporting disabled");
return;
}
request_to_telemetry_event(
event_tracking_id,
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
false,
);
}
pub fn request_to_telemetry_event(
tracking_id: String,
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, node: String,
is_test: bool,
) {
let event = PbEventMessage {
tracking_id,
event_time_sec: current_timestamp(),
event_stage: event_stage as i32,
event_name: event_name.to_string(),
connector_name,
object: object.map(|c| c as i32),
catalog_id,
attributes: attributes.map(|a| a.to_string()),
node,
is_test,
};
if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() {
let _ = tx.send(event).inspect_err(|e| {
tracing::warn!("Failed to send telemetry event queue: {}", e.as_report())
});
}
}
#[cfg(test)]
mod test {
use super::*;
#[ignore]
#[tokio::test]
async fn test_telemetry_report_event() {
let event_stage = PbTelemetryEventStage::CreateStreamJob;
let event_name = "test_feature";
let catalog_id = 1;
let connector_name = Some("test_connector".to_string());
let object = Some(PbTelemetryDatabaseObject::Source);
let attributes = None;
let node = "test_node".to_string();
request_to_telemetry_event(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
true,
);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}