1use prost::Message;
16use risingwave_common::config::MetaBackend;
17use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
18use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator};
19use risingwave_common::telemetry::{
20 SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
21 report_event_common, telemetry_cluster_type_from_env_var,
22};
23use risingwave_common::{GIT_SHA, RW_VERSION};
24use risingwave_license::LicenseManager;
25use risingwave_pb::common::WorkerType;
26use risingwave_pb::telemetry::{
27 PbTelemetryClusterType, PbTelemetryDatabaseObject, PbTelemetryEventStage,
28};
29use serde::{Deserialize, Serialize};
30use thiserror_ext::AsReport;
31
32use crate::manager::MetadataManager;
33use crate::model::ClusterId;
34
35const TELEMETRY_META_REPORT_TYPE: &str = "meta";
36
37pub(crate) fn report_event(
38 event_stage: PbTelemetryEventStage,
39 event_name: &str,
40 catalog_id: i64,
41 connector_name: Option<String>,
42 component: Option<PbTelemetryDatabaseObject>,
43 attributes: Option<jsonbb::Value>, ) {
45 report_event_common(
46 event_stage,
47 event_name,
48 catalog_id,
49 connector_name,
50 component,
51 attributes,
52 TELEMETRY_META_REPORT_TYPE.to_owned(),
53 );
54}
55
56#[derive(Debug, Serialize, Deserialize)]
57struct NodeCount {
58 meta_count: u64,
59 compute_count: u64,
60 frontend_count: u64,
61 compactor_count: u64,
62}
63
64#[derive(Debug, Serialize, Deserialize)]
65struct RwVersion {
66 version: String,
67 git_sha: String,
68}
69
70#[derive(Debug, Serialize, Deserialize)]
71pub enum PlanOptimization {
72 Placeholder,
74}
75
76#[derive(Debug, Serialize, Deserialize)]
77pub struct MetaTelemetryJobDesc {
78 pub table_id: i32,
79 pub connector: Option<String>,
80 pub optimization: Vec<PlanOptimization>,
81}
82
83#[derive(Debug, Serialize, Deserialize)]
84pub struct MetaTelemetryReport {
85 #[serde(flatten)]
86 base: TelemetryReportBase,
87 node_count: NodeCount,
88 streaming_job_count: u64,
89 meta_backend: MetaBackend,
90 rw_version: RwVersion,
91 job_desc: Vec<MetaTelemetryJobDesc>,
92
93 cluster_type: PbTelemetryClusterType,
95 object_store_media_type: &'static str,
96 connector_usage_json_str: String,
97 license_info_json_str: String,
98}
99
100impl From<MetaTelemetryJobDesc> for risingwave_pb::telemetry::StreamJobDesc {
101 fn from(val: MetaTelemetryJobDesc) -> Self {
102 risingwave_pb::telemetry::StreamJobDesc {
103 table_id: val.table_id,
104 connector_name: val.connector,
105 plan_optimizations: val
106 .optimization
107 .iter()
108 .map(|opt| match opt {
109 PlanOptimization::Placeholder => {
110 risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified
111 as i32
112 }
113 })
114 .collect(),
115 }
116 }
117}
118
119impl TelemetryToProtobuf for MetaTelemetryReport {
120 fn to_pb_bytes(self) -> Vec<u8> {
121 let pb_report = risingwave_pb::telemetry::MetaReport {
122 base: Some(self.base.into()),
123 meta_backend: match self.meta_backend {
124 MetaBackend::Mem => risingwave_pb::telemetry::MetaBackend::Memory as i32,
125 MetaBackend::Sql
126 | MetaBackend::Sqlite
127 | MetaBackend::Postgres
128 | MetaBackend::Mysql => risingwave_pb::telemetry::MetaBackend::Rdb as i32,
129 },
130 node_count: Some(risingwave_pb::telemetry::NodeCount {
131 meta: self.node_count.meta_count as u32,
132 compute: self.node_count.compute_count as u32,
133 frontend: self.node_count.frontend_count as u32,
134 compactor: self.node_count.compactor_count as u32,
135 }),
136 rw_version: Some(risingwave_pb::telemetry::RwVersion {
137 rw_version: self.rw_version.version,
138 git_sha: self.rw_version.git_sha,
139 }),
140 stream_job_count: self.streaming_job_count as u32,
141 stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(),
142 cluster_type: self.cluster_type as i32,
143 object_store_media_type: self.object_store_media_type.to_owned(),
144 connector_usage_json_str: self.connector_usage_json_str,
145 license_info_json_str: self.license_info_json_str,
146 };
147 pb_report.encode_to_vec()
148 }
149}
150
151pub struct MetaTelemetryInfoFetcher {
152 tracking_id: ClusterId,
153}
154
155impl MetaTelemetryInfoFetcher {
156 pub fn new(tracking_id: ClusterId) -> Self {
157 Self { tracking_id }
158 }
159}
160
161#[async_trait::async_trait]
162impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher {
163 async fn fetch_telemetry_info(&self) -> TelemetryResult<Option<String>> {
164 if telemetry_cluster_type_from_env_var().is_err() {
166 return Ok(None);
167 }
168 Ok(Some(self.tracking_id.clone().into()))
169 }
170}
171
172#[derive(Clone)]
173pub struct MetaReportCreator {
174 metadata_manager: MetadataManager,
175 object_store_media_type: &'static str,
176}
177
178impl MetaReportCreator {
179 pub fn new(metadata_manager: MetadataManager, object_store_media_type: &'static str) -> Self {
180 Self {
181 metadata_manager,
182 object_store_media_type,
183 }
184 }
185}
186
187#[async_trait::async_trait]
188impl TelemetryReportCreator for MetaReportCreator {
189 async fn create_report(
190 &self,
191 tracking_id: String,
192 session_id: String,
193 up_time: u64,
194 ) -> TelemetryResult<MetaTelemetryReport> {
195 let node_map = self
196 .metadata_manager
197 .count_worker_node()
198 .await
199 .map_err(|err| err.as_report().to_string())?;
200
201 let streaming_job_count = self
202 .metadata_manager
203 .count_streaming_job()
204 .await
205 .map_err(|err| err.as_report().to_string())? as u64;
206 let stream_job_desc = self
207 .metadata_manager
208 .list_stream_job_desc()
209 .await
210 .map_err(|err| err.as_report().to_string())?;
211 let connector_usage = self
212 .metadata_manager
213 .catalog_controller
214 .get_connector_usage()
215 .await
216 .map_err(|err| err.as_report().to_string())?
217 .to_string();
218
219 let license_info_json_str = match LicenseManager::get().license() {
220 Ok(license) => serde_json::to_string(&license).unwrap(),
221 Err(err) => serde_json::json!({
222 "error": err.to_report_string(),
223 })
224 .to_string(),
225 };
226
227 Ok(MetaTelemetryReport {
228 rw_version: RwVersion {
229 version: RW_VERSION.to_owned(),
230 git_sha: GIT_SHA.to_owned(),
231 },
232 base: TelemetryReportBase {
233 tracking_id,
234 session_id,
235 system_data: SystemData::new(),
236 up_time,
237 time_stamp: current_timestamp(),
238 node_type: TelemetryNodeType::Meta,
239 is_test: false,
240 },
241 node_count: NodeCount {
242 meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0),
243 compute_count: *node_map.get(&WorkerType::ComputeNode).unwrap_or(&0),
244 frontend_count: *node_map.get(&WorkerType::Frontend).unwrap_or(&0),
245 compactor_count: *node_map.get(&WorkerType::Compactor).unwrap_or(&0),
246 },
247 streaming_job_count,
248 meta_backend: MetaBackend::Sql,
249 job_desc: stream_job_desc,
250 cluster_type: telemetry_cluster_type_from_env_var()?,
252 object_store_media_type: self.object_store_media_type,
253 connector_usage_json_str: connector_usage,
254 license_info_json_str,
255 })
256 }
257
258 fn report_type(&self) -> &str {
259 TELEMETRY_META_REPORT_TYPE
260 }
261}