risingwave_common/telemetry/
report.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_pb::telemetry::PbEventMessage;
18pub use risingwave_telemetry_event::{
19    TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
20    current_timestamp, do_telemetry_event_report, post_telemetry_report_pb,
21};
22use risingwave_telemetry_event::{
23    TELEMETRY_EVENT_REPORT_STASH_SIZE, TELEMETRY_EVENT_REPORT_TX,
24    get_telemetry_risingwave_cloud_uuid,
25};
26use tokio::sync::oneshot::Sender;
27use tokio::task::JoinHandle;
28use tokio::time::{Duration, interval as tokio_interval_fn};
29use uuid::Uuid;
30
31use super::{Result, TELEMETRY_REPORT_INTERVAL};
32use crate::telemetry::pb_compatible::TelemetryToProtobuf;
33
34#[async_trait::async_trait]
35pub trait TelemetryInfoFetcher {
36    /// Fetches telemetry info from meta. Currently it's only `tracking_id` (`cluster_id`).
37    async fn fetch_telemetry_info(&self) -> Result<Option<String>>;
38}
39
40#[async_trait::async_trait]
41pub trait TelemetryReportCreator {
42    // inject dependencies to impl structs if more metrics needed
43    async fn create_report(
44        &self,
45        tracking_id: String,
46        session_id: String,
47        up_time: u64,
48    ) -> Result<impl TelemetryToProtobuf>;
49
50    fn report_type(&self) -> &str;
51}
52
53pub async fn start_telemetry_reporting<F, I>(
54    info_fetcher: Arc<I>,
55    report_creator: Arc<F>,
56) -> (JoinHandle<()>, Sender<()>)
57where
58    F: TelemetryReportCreator + Send + Sync + 'static,
59    I: TelemetryInfoFetcher + Send + Sync + 'static,
60{
61    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
62
63    let join_handle = tokio::spawn(async move {
64        tracing::info!("start telemetry reporting");
65
66        let begin_time = std::time::Instant::now();
67        let session_id = Uuid::new_v4().to_string();
68        let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
69        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
70
71        let mut event_interval =
72            tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL));
73        event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
74
75        // fetch telemetry tracking_id from the meta node only at the beginning
76        // There is only one case tracking_id updated at the runtime ---- metastore data has been
77        // cleaned. There is no way that metastore has been cleaned but nodes are still running
78        let tracking_id = {
79            match (
80                info_fetcher.fetch_telemetry_info().await,
81                get_telemetry_risingwave_cloud_uuid(),
82            ) {
83                (Ok(None), _) => {
84                    tracing::info!("Telemetry is disabled");
85                    return;
86                }
87                (Err(err), _) => {
88                    tracing::error!("Telemetry failed to get tracking_id, err {}", err);
89                    return;
90                }
91                (Ok(Some(_)), Some(cloud_uuid)) => cloud_uuid,
92                (Ok(Some(id)), None) => id,
93            }
94        };
95
96        TELEMETRY_TRACKING_ID
97            .set(tracking_id.clone())
98            .unwrap_or_else(|_| {
99                tracing::warn!(
100                    "Telemetry failed to set tracking_id, event reporting will be disabled"
101                )
102            });
103
104        let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<PbEventMessage>();
105
106        let mut enable_event_report = true;
107        TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| {
108            tracing::warn!(
109                "Telemetry failed to set event reporting tx, event reporting will be disabled"
110            );
111            // possible failure:
112            // When running in standalone mode, the static TELEMETRY_EVENT_REPORT_TX is shared
113            // and can be set by meta/compute nodes.
114            // In such case, the one first set the static will do the event reporting and others'
115            // event report is disabled.
116            enable_event_report = false;
117        });
118        let mut event_stash = Vec::new();
119
120        loop {
121            tokio::select! {
122                _ = interval.tick() => {},
123                event = event_rx.recv(), if enable_event_report => {
124                    debug_assert!(event.is_some());
125                    event_stash.push(event.unwrap());
126                    if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE {
127                        do_telemetry_event_report(&mut event_stash).await;
128                    }
129                    continue;
130                }
131                _ = event_interval.tick(), if enable_event_report => {
132                    do_telemetry_event_report(&mut event_stash).await;
133                    continue;
134                },
135                _ = &mut shutdown_rx => {
136                    tracing::info!("Telemetry exit");
137                    return;
138                }
139            }
140
141            // create a report and serialize to json
142            let bin_report = match report_creator
143                .create_report(
144                    tracking_id.clone(),
145                    session_id.clone(),
146                    begin_time.elapsed().as_secs(),
147                )
148                .await
149                .map(TelemetryToProtobuf::to_pb_bytes)
150            {
151                Ok(bin_report) => bin_report,
152                Err(e) => {
153                    tracing::error!("Telemetry failed to create report {}", e);
154                    continue;
155                }
156            };
157
158            let url =
159                (TELEMETRY_REPORT_URL.to_owned() + "/" + report_creator.report_type()).to_owned();
160
161            match post_telemetry_report_pb(&url, bin_report).await {
162                Ok(_) => tracing::info!("Telemetry post success, id {}", tracking_id),
163                Err(e) => tracing::error!("Telemetry post error, {}", e),
164            }
165        }
166    });
167    (join_handle, shutdown_tx)
168}