risingwave_meta/
telemetry.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use prost::Message;
16use risingwave_common::config::MetaBackend;
17use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
18use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator};
19use risingwave_common::telemetry::{
20    SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
21    report_event_common, telemetry_cluster_type_from_env_var,
22};
23use risingwave_common::{GIT_SHA, RW_VERSION};
24use risingwave_license::LicenseManager;
25use risingwave_pb::common::WorkerType;
26use risingwave_pb::telemetry::{
27    PbTelemetryClusterType, PbTelemetryDatabaseObject, PbTelemetryEventStage,
28};
29use serde::{Deserialize, Serialize};
30use thiserror_ext::AsReport;
31
32use crate::manager::MetadataManager;
33use crate::model::ClusterId;
34
35const TELEMETRY_META_REPORT_TYPE: &str = "meta";
36
37pub(crate) fn report_event(
38    event_stage: PbTelemetryEventStage,
39    event_name: &str,
40    catalog_id: i64,
41    connector_name: Option<String>,
42    component: Option<PbTelemetryDatabaseObject>,
43    attributes: Option<jsonbb::Value>, // any json string
44) {
45    report_event_common(
46        event_stage,
47        event_name,
48        catalog_id,
49        connector_name,
50        component,
51        attributes,
52        TELEMETRY_META_REPORT_TYPE.to_owned(),
53    );
54}
55
56#[derive(Debug, Serialize, Deserialize)]
57struct NodeCount {
58    meta_count: u64,
59    compute_count: u64,
60    frontend_count: u64,
61    compactor_count: u64,
62}
63
64#[derive(Debug, Serialize, Deserialize)]
65struct RwVersion {
66    version: String,
67    git_sha: String,
68}
69
70#[derive(Debug, Serialize, Deserialize)]
71pub enum PlanOptimization {
72    // todo: add optimization applied to each job
73    Placeholder,
74}
75
76#[derive(Debug, Serialize, Deserialize)]
77pub struct MetaTelemetryJobDesc {
78    pub table_id: i32,
79    pub connector: Option<String>,
80    pub optimization: Vec<PlanOptimization>,
81}
82
83#[derive(Debug, Serialize, Deserialize)]
84pub struct MetaTelemetryReport {
85    #[serde(flatten)]
86    base: TelemetryReportBase,
87    node_count: NodeCount,
88    streaming_job_count: u64,
89    meta_backend: MetaBackend,
90    rw_version: RwVersion,
91    job_desc: Vec<MetaTelemetryJobDesc>,
92
93    // Get the ENV from key `TELEMETRY_CLUSTER_TYPE`
94    cluster_type: PbTelemetryClusterType,
95    object_store_media_type: &'static str,
96    connector_usage_json_str: String,
97    license_info_json_str: String,
98}
99
100impl From<MetaTelemetryJobDesc> for risingwave_pb::telemetry::StreamJobDesc {
101    fn from(val: MetaTelemetryJobDesc) -> Self {
102        risingwave_pb::telemetry::StreamJobDesc {
103            table_id: val.table_id,
104            connector_name: val.connector,
105            plan_optimizations: val
106                .optimization
107                .iter()
108                .map(|opt| match opt {
109                    PlanOptimization::Placeholder => {
110                        risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified
111                            as i32
112                    }
113                })
114                .collect(),
115        }
116    }
117}
118
119impl TelemetryToProtobuf for MetaTelemetryReport {
120    fn to_pb_bytes(self) -> Vec<u8> {
121        let pb_report = risingwave_pb::telemetry::MetaReport {
122            base: Some(self.base.into()),
123            meta_backend: match self.meta_backend {
124                MetaBackend::Mem => risingwave_pb::telemetry::MetaBackend::Memory as i32,
125                MetaBackend::Sql
126                | MetaBackend::Sqlite
127                | MetaBackend::Postgres
128                | MetaBackend::Mysql => risingwave_pb::telemetry::MetaBackend::Rdb as i32,
129            },
130            node_count: Some(risingwave_pb::telemetry::NodeCount {
131                meta: self.node_count.meta_count as u32,
132                compute: self.node_count.compute_count as u32,
133                frontend: self.node_count.frontend_count as u32,
134                compactor: self.node_count.compactor_count as u32,
135            }),
136            rw_version: Some(risingwave_pb::telemetry::RwVersion {
137                rw_version: self.rw_version.version,
138                git_sha: self.rw_version.git_sha,
139            }),
140            stream_job_count: self.streaming_job_count as u32,
141            stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(),
142            cluster_type: self.cluster_type as i32,
143            object_store_media_type: self.object_store_media_type.to_owned(),
144            connector_usage_json_str: self.connector_usage_json_str,
145            license_info_json_str: self.license_info_json_str,
146        };
147        pb_report.encode_to_vec()
148    }
149}
150
151pub struct MetaTelemetryInfoFetcher {
152    tracking_id: ClusterId,
153}
154
155impl MetaTelemetryInfoFetcher {
156    pub fn new(tracking_id: ClusterId) -> Self {
157        Self { tracking_id }
158    }
159}
160
161#[async_trait::async_trait]
162impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher {
163    async fn fetch_telemetry_info(&self) -> TelemetryResult<Option<String>> {
164        // the err here means building cluster on test env, so we don't need to report telemetry
165        if telemetry_cluster_type_from_env_var().is_err() {
166            return Ok(None);
167        }
168        Ok(Some(self.tracking_id.clone().into()))
169    }
170}
171
172#[derive(Clone)]
173pub struct MetaReportCreator {
174    metadata_manager: MetadataManager,
175    object_store_media_type: &'static str,
176}
177
178impl MetaReportCreator {
179    pub fn new(metadata_manager: MetadataManager, object_store_media_type: &'static str) -> Self {
180        Self {
181            metadata_manager,
182            object_store_media_type,
183        }
184    }
185}
186
187#[async_trait::async_trait]
188impl TelemetryReportCreator for MetaReportCreator {
189    async fn create_report(
190        &self,
191        tracking_id: String,
192        session_id: String,
193        up_time: u64,
194    ) -> TelemetryResult<MetaTelemetryReport> {
195        let node_map = self
196            .metadata_manager
197            .count_worker_node()
198            .await
199            .map_err(|err| err.as_report().to_string())?;
200
201        let streaming_job_count = self
202            .metadata_manager
203            .count_streaming_job()
204            .await
205            .map_err(|err| err.as_report().to_string())? as u64;
206        let stream_job_desc = self
207            .metadata_manager
208            .list_stream_job_desc()
209            .await
210            .map_err(|err| err.as_report().to_string())?;
211        let connector_usage = self
212            .metadata_manager
213            .catalog_controller
214            .get_connector_usage()
215            .await
216            .map_err(|err| err.as_report().to_string())?
217            .to_string();
218
219        let license_info_json_str = match LicenseManager::get().license() {
220            Ok(license) => serde_json::to_string(&license).unwrap(),
221            Err(err) => serde_json::json!({
222                "error": err.to_report_string(),
223            })
224            .to_string(),
225        };
226
227        Ok(MetaTelemetryReport {
228            rw_version: RwVersion {
229                version: RW_VERSION.to_owned(),
230                git_sha: GIT_SHA.to_owned(),
231            },
232            base: TelemetryReportBase {
233                tracking_id,
234                session_id,
235                system_data: SystemData::new(),
236                up_time,
237                time_stamp: current_timestamp(),
238                node_type: TelemetryNodeType::Meta,
239                is_test: false,
240            },
241            node_count: NodeCount {
242                meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0),
243                compute_count: *node_map.get(&WorkerType::ComputeNode).unwrap_or(&0),
244                frontend_count: *node_map.get(&WorkerType::Frontend).unwrap_or(&0),
245                compactor_count: *node_map.get(&WorkerType::Compactor).unwrap_or(&0),
246            },
247            streaming_job_count,
248            meta_backend: MetaBackend::Sql,
249            job_desc: stream_job_desc,
250            // it blocks the report if the cluster type is not valid or leak from test env
251            cluster_type: telemetry_cluster_type_from_env_var()?,
252            object_store_media_type: self.object_store_media_type,
253            connector_usage_json_str: connector_usage,
254            license_info_json_str,
255        })
256    }
257
258    fn report_type(&self) -> &str {
259        TELEMETRY_META_REPORT_TYPE
260    }
261}