risingwave_telemetry_event/
lib.rsmod util;
use std::env;
use std::sync::OnceLock;
use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbTelemetryDatabaseObject,
TelemetryEventStage as PbTelemetryEventStage,
};
pub use util::*;
pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
pub type TelemetryError = String;
pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";
pub const TELEMETRY_RISINGWAVE_CLOUD_UUID: &str = "RISINGWAVE_CLOUD_UUID";
pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
}
pub fn report_event_common(
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, node: String,
) {
let event_tracking_id: String;
if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
event_tracking_id = tracking_id.to_string();
} else {
tracing::info!("Telemetry tracking_id is not set, event reporting disabled");
return;
}
request_to_telemetry_event(
event_tracking_id,
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
false,
);
}
pub fn request_to_telemetry_event(
tracking_id: String,
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, node: String,
is_test: bool,
) {
let event = PbEventMessage {
tracking_id,
event_time_sec: current_timestamp(),
event_stage: event_stage as i32,
event_name: event_name.to_string(),
connector_name,
object: object.map(|c| c as i32),
catalog_id,
attributes: attributes.map(|a| a.to_string()),
node,
is_test,
};
let report_bytes = event.encode_to_vec();
tokio::spawn(async move {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
post_telemetry_report_pb(&url, report_bytes)
.await
.unwrap_or_else(|e| tracing::info!("{}", e))
});
}
#[cfg(test)]
mod test {
use super::*;
#[ignore]
#[tokio::test]
async fn test_telemetry_report_event() {
let event_stage = PbTelemetryEventStage::CreateStreamJob;
let event_name = "test_feature";
let catalog_id = 1;
let connector_name = Some("test_connector".to_string());
let object = Some(PbTelemetryDatabaseObject::Source);
let attributes = None;
let node = "test_node".to_string();
request_to_telemetry_event(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
true,
);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}