risingwave_common/telemetry/
report.rs1use std::sync::Arc;
16
17use risingwave_pb::telemetry::PbEventMessage;
18pub use risingwave_telemetry_event::{
19 TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
20 current_timestamp, do_telemetry_event_report, post_telemetry_report_pb,
21};
22use risingwave_telemetry_event::{
23 TELEMETRY_EVENT_REPORT_STASH_SIZE, TELEMETRY_EVENT_REPORT_TX,
24 get_telemetry_risingwave_cloud_uuid,
25};
26use tokio::sync::oneshot::Sender;
27use tokio::task::JoinHandle;
28use tokio::time::{Duration, interval as tokio_interval_fn};
29use uuid::Uuid;
30
31use super::{Result, TELEMETRY_REPORT_INTERVAL};
32use crate::telemetry::pb_compatible::TelemetryToProtobuf;
33
34#[async_trait::async_trait]
35pub trait TelemetryInfoFetcher {
36 async fn fetch_telemetry_info(&self) -> Result<Option<String>>;
38}
39
40#[async_trait::async_trait]
41pub trait TelemetryReportCreator {
42 async fn create_report(
44 &self,
45 tracking_id: String,
46 session_id: String,
47 up_time: u64,
48 ) -> Result<impl TelemetryToProtobuf>;
49
50 fn report_type(&self) -> &str;
51}
52
53pub async fn start_telemetry_reporting<F, I>(
54 info_fetcher: Arc<I>,
55 report_creator: Arc<F>,
56) -> (JoinHandle<()>, Sender<()>)
57where
58 F: TelemetryReportCreator + Send + Sync + 'static,
59 I: TelemetryInfoFetcher + Send + Sync + 'static,
60{
61 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
62
63 let join_handle = tokio::spawn(async move {
64 tracing::info!("start telemetry reporting");
65
66 let begin_time = std::time::Instant::now();
67 let session_id = Uuid::new_v4().to_string();
68 let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
69 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
70
71 let mut event_interval =
72 tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL));
73 event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
74
75 let tracking_id = {
79 match (
80 info_fetcher.fetch_telemetry_info().await,
81 get_telemetry_risingwave_cloud_uuid(),
82 ) {
83 (Ok(None), _) => {
84 tracing::info!("Telemetry is disabled");
85 return;
86 }
87 (Err(err), _) => {
88 tracing::error!("Telemetry failed to get tracking_id, err {}", err);
89 return;
90 }
91 (Ok(Some(_)), Some(cloud_uuid)) => cloud_uuid,
92 (Ok(Some(id)), None) => id,
93 }
94 };
95
96 TELEMETRY_TRACKING_ID
97 .set(tracking_id.clone())
98 .unwrap_or_else(|_| {
99 tracing::warn!(
100 "Telemetry failed to set tracking_id, event reporting will be disabled"
101 )
102 });
103
104 let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<PbEventMessage>();
105
106 let mut enable_event_report = true;
107 TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| {
108 tracing::warn!(
109 "Telemetry failed to set event reporting tx, event reporting will be disabled"
110 );
111 enable_event_report = false;
117 });
118 let mut event_stash = Vec::new();
119
120 loop {
121 tokio::select! {
122 _ = interval.tick() => {},
123 event = event_rx.recv(), if enable_event_report => {
124 debug_assert!(event.is_some());
125 event_stash.push(event.unwrap());
126 if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE {
127 do_telemetry_event_report(&mut event_stash).await;
128 }
129 continue;
130 }
131 _ = event_interval.tick(), if enable_event_report => {
132 do_telemetry_event_report(&mut event_stash).await;
133 continue;
134 },
135 _ = &mut shutdown_rx => {
136 tracing::info!("Telemetry exit");
137 return;
138 }
139 }
140
141 let bin_report = match report_creator
143 .create_report(
144 tracking_id.clone(),
145 session_id.clone(),
146 begin_time.elapsed().as_secs(),
147 )
148 .await
149 .map(TelemetryToProtobuf::to_pb_bytes)
150 {
151 Ok(bin_report) => bin_report,
152 Err(e) => {
153 tracing::error!("Telemetry failed to create report {}", e);
154 continue;
155 }
156 };
157
158 let url =
159 (TELEMETRY_REPORT_URL.to_owned() + "/" + report_creator.report_type()).to_owned();
160
161 match post_telemetry_report_pb(&url, bin_report).await {
162 Ok(_) => tracing::info!("Telemetry post success, id {}", tracking_id),
163 Err(e) => tracing::error!("Telemetry post error, {}", e),
164 }
165 }
166 });
167 (join_handle, shutdown_tx)
168}