1use prost::Message;
16use risingwave_common::config::MetaBackend;
17use risingwave_common::id::TableId;
18use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
19use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator};
20use risingwave_common::telemetry::{
21 SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
22 report_event_common, telemetry_cluster_type_from_env_var,
23};
24use risingwave_common::{GIT_SHA, RW_VERSION};
25use risingwave_license::LicenseManager;
26use risingwave_pb::common::WorkerType;
27use risingwave_pb::telemetry::{
28 PbTelemetryClusterType, PbTelemetryDatabaseObject, PbTelemetryEventStage,
29};
30use serde::{Deserialize, Serialize};
31use thiserror_ext::AsReport;
32
33use crate::manager::MetadataManager;
34use crate::model::ClusterId;
35
36const TELEMETRY_META_REPORT_TYPE: &str = "meta";
37
38pub(crate) fn report_event(
39 event_stage: PbTelemetryEventStage,
40 event_name: &str,
41 catalog_id: i64,
42 connector_name: Option<String>,
43 component: Option<PbTelemetryDatabaseObject>,
44 attributes: Option<jsonbb::Value>, ) {
46 report_event_common(
47 event_stage,
48 event_name,
49 catalog_id,
50 connector_name,
51 component,
52 attributes,
53 TELEMETRY_META_REPORT_TYPE.to_owned(),
54 );
55}
56
57#[derive(Debug, Serialize, Deserialize)]
58struct NodeCount {
59 meta_count: u64,
60 compute_count: u64,
61 frontend_count: u64,
62 compactor_count: u64,
63}
64
65#[derive(Debug, Serialize, Deserialize)]
66struct RwVersion {
67 version: String,
68 git_sha: String,
69}
70
71#[derive(Debug, Serialize, Deserialize)]
72pub enum PlanOptimization {
73 Placeholder,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78pub struct MetaTelemetryJobDesc {
79 pub table_id: TableId,
80 pub connector: Option<String>,
81 pub optimization: Vec<PlanOptimization>,
82}
83
84#[derive(Debug, Serialize, Deserialize)]
85pub struct MetaTelemetryReport {
86 #[serde(flatten)]
87 base: TelemetryReportBase,
88 node_count: NodeCount,
89 streaming_job_count: u64,
90 meta_backend: MetaBackend,
91 rw_version: RwVersion,
92 job_desc: Vec<MetaTelemetryJobDesc>,
93
94 cluster_type: PbTelemetryClusterType,
96 object_store_media_type: &'static str,
97 connector_usage_json_str: String,
98 license_info_json_str: String,
99}
100
101impl From<MetaTelemetryJobDesc> for risingwave_pb::telemetry::StreamJobDesc {
102 fn from(val: MetaTelemetryJobDesc) -> Self {
103 risingwave_pb::telemetry::StreamJobDesc {
104 table_id: val.table_id.as_i32_id(),
105 connector_name: val.connector,
106 plan_optimizations: val
107 .optimization
108 .iter()
109 .map(|opt| match opt {
110 PlanOptimization::Placeholder => {
111 risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified
112 as i32
113 }
114 })
115 .collect(),
116 }
117 }
118}
119
120impl TelemetryToProtobuf for MetaTelemetryReport {
121 fn to_pb_bytes(self) -> Vec<u8> {
122 let pb_report = risingwave_pb::telemetry::MetaReport {
123 base: Some(self.base.into()),
124 meta_backend: match self.meta_backend {
125 MetaBackend::Mem => risingwave_pb::telemetry::MetaBackend::Memory as i32,
126 MetaBackend::Sql
127 | MetaBackend::Sqlite
128 | MetaBackend::Postgres
129 | MetaBackend::Mysql => risingwave_pb::telemetry::MetaBackend::Rdb as i32,
130 },
131 node_count: Some(risingwave_pb::telemetry::NodeCount {
132 meta: self.node_count.meta_count as u32,
133 compute: self.node_count.compute_count as u32,
134 frontend: self.node_count.frontend_count as u32,
135 compactor: self.node_count.compactor_count as u32,
136 }),
137 rw_version: Some(risingwave_pb::telemetry::RwVersion {
138 rw_version: self.rw_version.version,
139 git_sha: self.rw_version.git_sha,
140 }),
141 stream_job_count: self.streaming_job_count as u32,
142 stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(),
143 cluster_type: self.cluster_type as i32,
144 object_store_media_type: self.object_store_media_type.to_owned(),
145 connector_usage_json_str: self.connector_usage_json_str,
146 license_info_json_str: self.license_info_json_str,
147 };
148 pb_report.encode_to_vec()
149 }
150}
151
152pub struct MetaTelemetryInfoFetcher {
153 tracking_id: ClusterId,
154}
155
156impl MetaTelemetryInfoFetcher {
157 pub fn new(tracking_id: ClusterId) -> Self {
158 Self { tracking_id }
159 }
160}
161
162#[async_trait::async_trait]
163impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher {
164 async fn fetch_telemetry_info(&self) -> TelemetryResult<Option<String>> {
165 if telemetry_cluster_type_from_env_var().is_err() {
167 return Ok(None);
168 }
169 Ok(Some(self.tracking_id.clone().into()))
170 }
171}
172
173#[derive(Clone)]
174pub struct MetaReportCreator {
175 metadata_manager: MetadataManager,
176 object_store_media_type: &'static str,
177}
178
179impl MetaReportCreator {
180 pub fn new(metadata_manager: MetadataManager, object_store_media_type: &'static str) -> Self {
181 Self {
182 metadata_manager,
183 object_store_media_type,
184 }
185 }
186}
187
188#[async_trait::async_trait]
189impl TelemetryReportCreator for MetaReportCreator {
190 async fn create_report(
191 &self,
192 tracking_id: String,
193 session_id: String,
194 up_time: u64,
195 ) -> TelemetryResult<MetaTelemetryReport> {
196 let node_map = self
197 .metadata_manager
198 .count_worker_node()
199 .await
200 .map_err(|err| err.as_report().to_string())?;
201
202 let streaming_job_count = self
203 .metadata_manager
204 .count_streaming_job()
205 .await
206 .map_err(|err| err.as_report().to_string())? as u64;
207 let stream_job_desc = self
208 .metadata_manager
209 .list_stream_job_desc()
210 .await
211 .map_err(|err| err.as_report().to_string())?;
212 let connector_usage = self
213 .metadata_manager
214 .catalog_controller
215 .get_connector_usage()
216 .await
217 .map_err(|err| err.as_report().to_string())?
218 .to_string();
219
220 let license_info_json_str = match LicenseManager::get().license() {
221 Ok(license) => serde_json::to_string(&license).unwrap(),
222 Err(err) => serde_json::json!({
223 "error": err.to_report_string(),
224 })
225 .to_string(),
226 };
227
228 Ok(MetaTelemetryReport {
229 rw_version: RwVersion {
230 version: RW_VERSION.to_owned(),
231 git_sha: GIT_SHA.to_owned(),
232 },
233 base: TelemetryReportBase {
234 tracking_id,
235 session_id,
236 system_data: SystemData::new(),
237 up_time,
238 time_stamp: current_timestamp(),
239 node_type: TelemetryNodeType::Meta,
240 is_test: false,
241 },
242 node_count: NodeCount {
243 meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0),
244 compute_count: *node_map.get(&WorkerType::ComputeNode).unwrap_or(&0),
245 frontend_count: *node_map.get(&WorkerType::Frontend).unwrap_or(&0),
246 compactor_count: *node_map.get(&WorkerType::Compactor).unwrap_or(&0),
247 },
248 streaming_job_count,
249 meta_backend: MetaBackend::Sql,
250 job_desc: stream_job_desc,
251 cluster_type: telemetry_cluster_type_from_env_var()?,
253 object_store_media_type: self.object_store_media_type,
254 connector_usage_json_str: connector_usage,
255 license_info_json_str,
256 })
257 }
258
259 fn report_type(&self) -> &str {
260 TELEMETRY_META_REPORT_TYPE
261 }
262}