risingwave_common/telemetry/
report.rsuse std::sync::Arc;
use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid;
pub use risingwave_telemetry_event::{
current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use uuid::Uuid;
use super::{Result, TELEMETRY_REPORT_INTERVAL};
use crate::telemetry::pb_compatible::TelemetryToProtobuf;
#[async_trait::async_trait]
pub trait TelemetryInfoFetcher {
async fn fetch_telemetry_info(&self) -> Result<Option<String>>;
}
#[async_trait::async_trait]
pub trait TelemetryReportCreator {
async fn create_report(
&self,
tracking_id: String,
session_id: String,
up_time: u64,
) -> Result<impl TelemetryToProtobuf>;
fn report_type(&self) -> &str;
}
pub async fn start_telemetry_reporting<F, I>(
info_fetcher: Arc<I>,
report_creator: Arc<F>,
) -> (JoinHandle<()>, Sender<()>)
where
F: TelemetryReportCreator + Send + Sync + 'static,
I: TelemetryInfoFetcher + Send + Sync + 'static,
{
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
tracing::info!("start telemetry reporting");
let begin_time = std::time::Instant::now();
let session_id = Uuid::new_v4().to_string();
let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let tracking_id = {
if let Some(cloud_uuid) = get_telemetry_risingwave_cloud_uuid() {
cloud_uuid
} else {
match info_fetcher.fetch_telemetry_info().await {
Ok(Some(id)) => id,
Ok(None) => {
tracing::info!("Telemetry is disabled");
return;
}
Err(err) => {
tracing::error!("Telemetry failed to get tracking_id, err {}", err);
return;
}
}
}
};
TELEMETRY_TRACKING_ID
.set(tracking_id.clone())
.unwrap_or_else(|_| {
tracing::warn!(
"Telemetry failed to set tracking_id, event reporting will be disabled"
)
});
loop {
tokio::select! {
_ = interval.tick() => {},
_ = &mut shutdown_rx => {
tracing::info!("Telemetry exit");
return;
}
}
let bin_report = match report_creator
.create_report(
tracking_id.clone(),
session_id.clone(),
begin_time.elapsed().as_secs(),
)
.await
.map(TelemetryToProtobuf::to_pb_bytes)
{
Ok(bin_report) => bin_report,
Err(e) => {
tracing::error!("Telemetry failed to create report {}", e);
continue;
}
};
let url =
(TELEMETRY_REPORT_URL.to_owned() + "/" + report_creator.report_type()).to_owned();
match post_telemetry_report_pb(&url, bin_report).await {
Ok(_) => tracing::info!("Telemetry post success, id {}", tracking_id),
Err(e) => tracing::error!("Telemetry post error, {}", e),
}
}
});
(join_handle, shutdown_tx)
}