risingwave_meta/
telemetry.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use prost::Message;
16use risingwave_common::config::MetaBackend;
17use risingwave_common::id::TableId;
18use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
19use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator};
20use risingwave_common::telemetry::{
21    SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
22    report_event_common, telemetry_cluster_type_from_env_var,
23};
24use risingwave_common::{GIT_SHA, RW_VERSION};
25use risingwave_license::LicenseManager;
26use risingwave_pb::common::WorkerType;
27use risingwave_pb::telemetry::{
28    PbTelemetryClusterType, PbTelemetryDatabaseObject, PbTelemetryEventStage,
29};
30use serde::{Deserialize, Serialize};
31use thiserror_ext::AsReport;
32
33use crate::manager::MetadataManager;
34use crate::model::ClusterId;
35
36const TELEMETRY_META_REPORT_TYPE: &str = "meta";
37
38pub(crate) fn report_event(
39    event_stage: PbTelemetryEventStage,
40    event_name: &str,
41    catalog_id: i64,
42    connector_name: Option<String>,
43    component: Option<PbTelemetryDatabaseObject>,
44    attributes: Option<jsonbb::Value>, // any json string
45) {
46    report_event_common(
47        event_stage,
48        event_name,
49        catalog_id,
50        connector_name,
51        component,
52        attributes,
53        TELEMETRY_META_REPORT_TYPE.to_owned(),
54    );
55}
56
57#[derive(Debug, Serialize, Deserialize)]
58struct NodeCount {
59    meta_count: u64,
60    compute_count: u64,
61    frontend_count: u64,
62    compactor_count: u64,
63}
64
65#[derive(Debug, Serialize, Deserialize)]
66struct RwVersion {
67    version: String,
68    git_sha: String,
69}
70
71#[derive(Debug, Serialize, Deserialize)]
72pub enum PlanOptimization {
73    // todo: add optimization applied to each job
74    Placeholder,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78pub struct MetaTelemetryJobDesc {
79    pub table_id: TableId,
80    pub connector: Option<String>,
81    pub optimization: Vec<PlanOptimization>,
82}
83
84#[derive(Debug, Serialize, Deserialize)]
85pub struct MetaTelemetryReport {
86    #[serde(flatten)]
87    base: TelemetryReportBase,
88    node_count: NodeCount,
89    streaming_job_count: u64,
90    meta_backend: MetaBackend,
91    rw_version: RwVersion,
92    job_desc: Vec<MetaTelemetryJobDesc>,
93
94    // Get the ENV from key `TELEMETRY_CLUSTER_TYPE`
95    cluster_type: PbTelemetryClusterType,
96    object_store_media_type: &'static str,
97    connector_usage_json_str: String,
98    license_info_json_str: String,
99}
100
101impl From<MetaTelemetryJobDesc> for risingwave_pb::telemetry::StreamJobDesc {
102    fn from(val: MetaTelemetryJobDesc) -> Self {
103        risingwave_pb::telemetry::StreamJobDesc {
104            table_id: val.table_id.as_i32_id(),
105            connector_name: val.connector,
106            plan_optimizations: val
107                .optimization
108                .iter()
109                .map(|opt| match opt {
110                    PlanOptimization::Placeholder => {
111                        risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified
112                            as i32
113                    }
114                })
115                .collect(),
116        }
117    }
118}
119
120impl TelemetryToProtobuf for MetaTelemetryReport {
121    fn to_pb_bytes(self) -> Vec<u8> {
122        let pb_report = risingwave_pb::telemetry::MetaReport {
123            base: Some(self.base.into()),
124            meta_backend: match self.meta_backend {
125                MetaBackend::Mem => risingwave_pb::telemetry::MetaBackend::Memory as i32,
126                MetaBackend::Sql
127                | MetaBackend::Sqlite
128                | MetaBackend::Postgres
129                | MetaBackend::Mysql => risingwave_pb::telemetry::MetaBackend::Rdb as i32,
130            },
131            node_count: Some(risingwave_pb::telemetry::NodeCount {
132                meta: self.node_count.meta_count as u32,
133                compute: self.node_count.compute_count as u32,
134                frontend: self.node_count.frontend_count as u32,
135                compactor: self.node_count.compactor_count as u32,
136            }),
137            rw_version: Some(risingwave_pb::telemetry::RwVersion {
138                rw_version: self.rw_version.version,
139                git_sha: self.rw_version.git_sha,
140            }),
141            stream_job_count: self.streaming_job_count as u32,
142            stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(),
143            cluster_type: self.cluster_type as i32,
144            object_store_media_type: self.object_store_media_type.to_owned(),
145            connector_usage_json_str: self.connector_usage_json_str,
146            license_info_json_str: self.license_info_json_str,
147        };
148        pb_report.encode_to_vec()
149    }
150}
151
152pub struct MetaTelemetryInfoFetcher {
153    tracking_id: ClusterId,
154}
155
156impl MetaTelemetryInfoFetcher {
157    pub fn new(tracking_id: ClusterId) -> Self {
158        Self { tracking_id }
159    }
160}
161
162#[async_trait::async_trait]
163impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher {
164    async fn fetch_telemetry_info(&self) -> TelemetryResult<Option<String>> {
165        // the err here means building cluster on test env, so we don't need to report telemetry
166        if telemetry_cluster_type_from_env_var().is_err() {
167            return Ok(None);
168        }
169        Ok(Some(self.tracking_id.clone().into()))
170    }
171}
172
173#[derive(Clone)]
174pub struct MetaReportCreator {
175    metadata_manager: MetadataManager,
176    object_store_media_type: &'static str,
177}
178
179impl MetaReportCreator {
180    pub fn new(metadata_manager: MetadataManager, object_store_media_type: &'static str) -> Self {
181        Self {
182            metadata_manager,
183            object_store_media_type,
184        }
185    }
186}
187
188#[async_trait::async_trait]
189impl TelemetryReportCreator for MetaReportCreator {
190    async fn create_report(
191        &self,
192        tracking_id: String,
193        session_id: String,
194        up_time: u64,
195    ) -> TelemetryResult<MetaTelemetryReport> {
196        let node_map = self
197            .metadata_manager
198            .count_worker_node()
199            .await
200            .map_err(|err| err.as_report().to_string())?;
201
202        let streaming_job_count = self
203            .metadata_manager
204            .count_streaming_job()
205            .await
206            .map_err(|err| err.as_report().to_string())? as u64;
207        let stream_job_desc = self
208            .metadata_manager
209            .list_stream_job_desc()
210            .await
211            .map_err(|err| err.as_report().to_string())?;
212        let connector_usage = self
213            .metadata_manager
214            .catalog_controller
215            .get_connector_usage()
216            .await
217            .map_err(|err| err.as_report().to_string())?
218            .to_string();
219
220        let license_info_json_str = match LicenseManager::get().license() {
221            Ok(license) => serde_json::to_string(&license).unwrap(),
222            Err(err) => serde_json::json!({
223                "error": err.to_report_string(),
224            })
225            .to_string(),
226        };
227
228        Ok(MetaTelemetryReport {
229            rw_version: RwVersion {
230                version: RW_VERSION.to_owned(),
231                git_sha: GIT_SHA.to_owned(),
232            },
233            base: TelemetryReportBase {
234                tracking_id,
235                session_id,
236                system_data: SystemData::new(),
237                up_time,
238                time_stamp: current_timestamp(),
239                node_type: TelemetryNodeType::Meta,
240                is_test: false,
241            },
242            node_count: NodeCount {
243                meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0),
244                compute_count: *node_map.get(&WorkerType::ComputeNode).unwrap_or(&0),
245                frontend_count: *node_map.get(&WorkerType::Frontend).unwrap_or(&0),
246                compactor_count: *node_map.get(&WorkerType::Compactor).unwrap_or(&0),
247            },
248            streaming_job_count,
249            meta_backend: MetaBackend::Sql,
250            job_desc: stream_job_desc,
251            // it blocks the report if the cluster type is not valid or leak from test env
252            cluster_type: telemetry_cluster_type_from_env_var()?,
253            object_store_media_type: self.object_store_media_type,
254            connector_usage_json_str: connector_usage,
255            license_info_json_str,
256        })
257    }
258
259    fn report_type(&self) -> &str {
260        TELEMETRY_META_REPORT_TYPE
261    }
262}