risingwave_meta/
telemetry.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use prost::Message;
16use risingwave_common::config::MetaBackend;
17use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
18use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator};
19use risingwave_common::telemetry::{
20    SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
21    report_event_common, telemetry_cluster_type_from_env_var,
22};
23use risingwave_common::{GIT_SHA, RW_VERSION};
24use risingwave_pb::common::WorkerType;
25use risingwave_pb::telemetry::{
26    PbTelemetryClusterType, PbTelemetryDatabaseObject, PbTelemetryEventStage,
27};
28use serde::{Deserialize, Serialize};
29use thiserror_ext::AsReport;
30
31use crate::manager::MetadataManager;
32use crate::model::ClusterId;
33
34const TELEMETRY_META_REPORT_TYPE: &str = "meta";
35
36pub(crate) fn report_event(
37    event_stage: PbTelemetryEventStage,
38    event_name: &str,
39    catalog_id: i64,
40    connector_name: Option<String>,
41    component: Option<PbTelemetryDatabaseObject>,
42    attributes: Option<jsonbb::Value>, // any json string
43) {
44    report_event_common(
45        event_stage,
46        event_name,
47        catalog_id,
48        connector_name,
49        component,
50        attributes,
51        TELEMETRY_META_REPORT_TYPE.to_owned(),
52    );
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56struct NodeCount {
57    meta_count: u64,
58    compute_count: u64,
59    frontend_count: u64,
60    compactor_count: u64,
61}
62
63#[derive(Debug, Serialize, Deserialize)]
64struct RwVersion {
65    version: String,
66    git_sha: String,
67}
68
69#[derive(Debug, Serialize, Deserialize)]
70pub enum PlanOptimization {
71    // todo: add optimization applied to each job
72    Placeholder,
73}
74
75#[derive(Debug, Serialize, Deserialize)]
76pub struct MetaTelemetryJobDesc {
77    pub table_id: i32,
78    pub connector: Option<String>,
79    pub optimization: Vec<PlanOptimization>,
80}
81
82#[derive(Debug, Serialize, Deserialize)]
83pub struct MetaTelemetryReport {
84    #[serde(flatten)]
85    base: TelemetryReportBase,
86    node_count: NodeCount,
87    streaming_job_count: u64,
88    meta_backend: MetaBackend,
89    rw_version: RwVersion,
90    job_desc: Vec<MetaTelemetryJobDesc>,
91
92    // Get the ENV from key `TELEMETRY_CLUSTER_TYPE`
93    cluster_type: PbTelemetryClusterType,
94    object_store_media_type: &'static str,
95    connector_usage_json_str: String,
96}
97
98impl From<MetaTelemetryJobDesc> for risingwave_pb::telemetry::StreamJobDesc {
99    fn from(val: MetaTelemetryJobDesc) -> Self {
100        risingwave_pb::telemetry::StreamJobDesc {
101            table_id: val.table_id,
102            connector_name: val.connector,
103            plan_optimizations: val
104                .optimization
105                .iter()
106                .map(|opt| match opt {
107                    PlanOptimization::Placeholder => {
108                        risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified
109                            as i32
110                    }
111                })
112                .collect(),
113        }
114    }
115}
116
117impl TelemetryToProtobuf for MetaTelemetryReport {
118    fn to_pb_bytes(self) -> Vec<u8> {
119        let pb_report = risingwave_pb::telemetry::MetaReport {
120            base: Some(self.base.into()),
121            meta_backend: match self.meta_backend {
122                MetaBackend::Mem => risingwave_pb::telemetry::MetaBackend::Memory as i32,
123                MetaBackend::Sql
124                | MetaBackend::Sqlite
125                | MetaBackend::Postgres
126                | MetaBackend::Mysql => risingwave_pb::telemetry::MetaBackend::Rdb as i32,
127            },
128            node_count: Some(risingwave_pb::telemetry::NodeCount {
129                meta: self.node_count.meta_count as u32,
130                compute: self.node_count.compute_count as u32,
131                frontend: self.node_count.frontend_count as u32,
132                compactor: self.node_count.compactor_count as u32,
133            }),
134            rw_version: Some(risingwave_pb::telemetry::RwVersion {
135                rw_version: self.rw_version.version,
136                git_sha: self.rw_version.git_sha,
137            }),
138            stream_job_count: self.streaming_job_count as u32,
139            stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(),
140            cluster_type: self.cluster_type as i32,
141            object_store_media_type: self.object_store_media_type.to_owned(),
142            connector_usage_json_str: self.connector_usage_json_str,
143        };
144        pb_report.encode_to_vec()
145    }
146}
147
148pub struct MetaTelemetryInfoFetcher {
149    tracking_id: ClusterId,
150}
151
152impl MetaTelemetryInfoFetcher {
153    pub fn new(tracking_id: ClusterId) -> Self {
154        Self { tracking_id }
155    }
156}
157
158#[async_trait::async_trait]
159impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher {
160    async fn fetch_telemetry_info(&self) -> TelemetryResult<Option<String>> {
161        // the err here means building cluster on test env, so we don't need to report telemetry
162        if telemetry_cluster_type_from_env_var().is_err() {
163            return Ok(None);
164        }
165        Ok(Some(self.tracking_id.clone().into()))
166    }
167}
168
169#[derive(Clone)]
170pub struct MetaReportCreator {
171    metadata_manager: MetadataManager,
172    object_store_media_type: &'static str,
173}
174
175impl MetaReportCreator {
176    pub fn new(metadata_manager: MetadataManager, object_store_media_type: &'static str) -> Self {
177        Self {
178            metadata_manager,
179            object_store_media_type,
180        }
181    }
182}
183
184#[async_trait::async_trait]
185impl TelemetryReportCreator for MetaReportCreator {
186    async fn create_report(
187        &self,
188        tracking_id: String,
189        session_id: String,
190        up_time: u64,
191    ) -> TelemetryResult<MetaTelemetryReport> {
192        let node_map = self
193            .metadata_manager
194            .count_worker_node()
195            .await
196            .map_err(|err| err.as_report().to_string())?;
197
198        let streaming_job_count = self
199            .metadata_manager
200            .count_streaming_job()
201            .await
202            .map_err(|err| err.as_report().to_string())? as u64;
203        let stream_job_desc = self
204            .metadata_manager
205            .list_stream_job_desc()
206            .await
207            .map_err(|err| err.as_report().to_string())?;
208        let connector_usage = self
209            .metadata_manager
210            .catalog_controller
211            .get_connector_usage()
212            .await
213            .map_err(|err| err.as_report().to_string())?
214            .to_string();
215
216        Ok(MetaTelemetryReport {
217            rw_version: RwVersion {
218                version: RW_VERSION.to_owned(),
219                git_sha: GIT_SHA.to_owned(),
220            },
221            base: TelemetryReportBase {
222                tracking_id,
223                session_id,
224                system_data: SystemData::new(),
225                up_time,
226                time_stamp: current_timestamp(),
227                node_type: TelemetryNodeType::Meta,
228                is_test: false,
229            },
230            node_count: NodeCount {
231                meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0),
232                compute_count: *node_map.get(&WorkerType::ComputeNode).unwrap_or(&0),
233                frontend_count: *node_map.get(&WorkerType::Frontend).unwrap_or(&0),
234                compactor_count: *node_map.get(&WorkerType::Compactor).unwrap_or(&0),
235            },
236            streaming_job_count,
237            meta_backend: MetaBackend::Sql,
238            job_desc: stream_job_desc,
239            // it blocks the report if the cluster type is not valid or leak from test env
240            cluster_type: telemetry_cluster_type_from_env_var()?,
241            object_store_media_type: self.object_store_media_type,
242            connector_usage_json_str: connector_usage,
243        })
244    }
245
246    fn report_type(&self) -> &str {
247        TELEMETRY_META_REPORT_TYPE
248    }
249}