1use prost::Message;
16use risingwave_common::config::MetaBackend;
17use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
18use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator};
19use risingwave_common::telemetry::{
20 SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
21 report_event_common, telemetry_cluster_type_from_env_var,
22};
23use risingwave_common::{GIT_SHA, RW_VERSION};
24use risingwave_pb::common::WorkerType;
25use risingwave_pb::telemetry::{
26 PbTelemetryClusterType, PbTelemetryDatabaseObject, PbTelemetryEventStage,
27};
28use serde::{Deserialize, Serialize};
29use thiserror_ext::AsReport;
30
31use crate::manager::MetadataManager;
32use crate::model::ClusterId;
33
34const TELEMETRY_META_REPORT_TYPE: &str = "meta";
35
36pub(crate) fn report_event(
37 event_stage: PbTelemetryEventStage,
38 event_name: &str,
39 catalog_id: i64,
40 connector_name: Option<String>,
41 component: Option<PbTelemetryDatabaseObject>,
42 attributes: Option<jsonbb::Value>, ) {
44 report_event_common(
45 event_stage,
46 event_name,
47 catalog_id,
48 connector_name,
49 component,
50 attributes,
51 TELEMETRY_META_REPORT_TYPE.to_owned(),
52 );
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56struct NodeCount {
57 meta_count: u64,
58 compute_count: u64,
59 frontend_count: u64,
60 compactor_count: u64,
61}
62
63#[derive(Debug, Serialize, Deserialize)]
64struct RwVersion {
65 version: String,
66 git_sha: String,
67}
68
69#[derive(Debug, Serialize, Deserialize)]
70pub enum PlanOptimization {
71 Placeholder,
73}
74
75#[derive(Debug, Serialize, Deserialize)]
76pub struct MetaTelemetryJobDesc {
77 pub table_id: i32,
78 pub connector: Option<String>,
79 pub optimization: Vec<PlanOptimization>,
80}
81
82#[derive(Debug, Serialize, Deserialize)]
83pub struct MetaTelemetryReport {
84 #[serde(flatten)]
85 base: TelemetryReportBase,
86 node_count: NodeCount,
87 streaming_job_count: u64,
88 meta_backend: MetaBackend,
89 rw_version: RwVersion,
90 job_desc: Vec<MetaTelemetryJobDesc>,
91
92 cluster_type: PbTelemetryClusterType,
94 object_store_media_type: &'static str,
95 connector_usage_json_str: String,
96}
97
98impl From<MetaTelemetryJobDesc> for risingwave_pb::telemetry::StreamJobDesc {
99 fn from(val: MetaTelemetryJobDesc) -> Self {
100 risingwave_pb::telemetry::StreamJobDesc {
101 table_id: val.table_id,
102 connector_name: val.connector,
103 plan_optimizations: val
104 .optimization
105 .iter()
106 .map(|opt| match opt {
107 PlanOptimization::Placeholder => {
108 risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified
109 as i32
110 }
111 })
112 .collect(),
113 }
114 }
115}
116
117impl TelemetryToProtobuf for MetaTelemetryReport {
118 fn to_pb_bytes(self) -> Vec<u8> {
119 let pb_report = risingwave_pb::telemetry::MetaReport {
120 base: Some(self.base.into()),
121 meta_backend: match self.meta_backend {
122 MetaBackend::Mem => risingwave_pb::telemetry::MetaBackend::Memory as i32,
123 MetaBackend::Sql
124 | MetaBackend::Sqlite
125 | MetaBackend::Postgres
126 | MetaBackend::Mysql => risingwave_pb::telemetry::MetaBackend::Rdb as i32,
127 },
128 node_count: Some(risingwave_pb::telemetry::NodeCount {
129 meta: self.node_count.meta_count as u32,
130 compute: self.node_count.compute_count as u32,
131 frontend: self.node_count.frontend_count as u32,
132 compactor: self.node_count.compactor_count as u32,
133 }),
134 rw_version: Some(risingwave_pb::telemetry::RwVersion {
135 rw_version: self.rw_version.version,
136 git_sha: self.rw_version.git_sha,
137 }),
138 stream_job_count: self.streaming_job_count as u32,
139 stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(),
140 cluster_type: self.cluster_type as i32,
141 object_store_media_type: self.object_store_media_type.to_owned(),
142 connector_usage_json_str: self.connector_usage_json_str,
143 };
144 pb_report.encode_to_vec()
145 }
146}
147
148pub struct MetaTelemetryInfoFetcher {
149 tracking_id: ClusterId,
150}
151
152impl MetaTelemetryInfoFetcher {
153 pub fn new(tracking_id: ClusterId) -> Self {
154 Self { tracking_id }
155 }
156}
157
158#[async_trait::async_trait]
159impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher {
160 async fn fetch_telemetry_info(&self) -> TelemetryResult<Option<String>> {
161 if telemetry_cluster_type_from_env_var().is_err() {
163 return Ok(None);
164 }
165 Ok(Some(self.tracking_id.clone().into()))
166 }
167}
168
169#[derive(Clone)]
170pub struct MetaReportCreator {
171 metadata_manager: MetadataManager,
172 object_store_media_type: &'static str,
173}
174
175impl MetaReportCreator {
176 pub fn new(metadata_manager: MetadataManager, object_store_media_type: &'static str) -> Self {
177 Self {
178 metadata_manager,
179 object_store_media_type,
180 }
181 }
182}
183
184#[async_trait::async_trait]
185impl TelemetryReportCreator for MetaReportCreator {
186 async fn create_report(
187 &self,
188 tracking_id: String,
189 session_id: String,
190 up_time: u64,
191 ) -> TelemetryResult<MetaTelemetryReport> {
192 let node_map = self
193 .metadata_manager
194 .count_worker_node()
195 .await
196 .map_err(|err| err.as_report().to_string())?;
197
198 let streaming_job_count = self
199 .metadata_manager
200 .count_streaming_job()
201 .await
202 .map_err(|err| err.as_report().to_string())? as u64;
203 let stream_job_desc = self
204 .metadata_manager
205 .list_stream_job_desc()
206 .await
207 .map_err(|err| err.as_report().to_string())?;
208 let connector_usage = self
209 .metadata_manager
210 .catalog_controller
211 .get_connector_usage()
212 .await
213 .map_err(|err| err.as_report().to_string())?
214 .to_string();
215
216 Ok(MetaTelemetryReport {
217 rw_version: RwVersion {
218 version: RW_VERSION.to_owned(),
219 git_sha: GIT_SHA.to_owned(),
220 },
221 base: TelemetryReportBase {
222 tracking_id,
223 session_id,
224 system_data: SystemData::new(),
225 up_time,
226 time_stamp: current_timestamp(),
227 node_type: TelemetryNodeType::Meta,
228 is_test: false,
229 },
230 node_count: NodeCount {
231 meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0),
232 compute_count: *node_map.get(&WorkerType::ComputeNode).unwrap_or(&0),
233 frontend_count: *node_map.get(&WorkerType::Frontend).unwrap_or(&0),
234 compactor_count: *node_map.get(&WorkerType::Compactor).unwrap_or(&0),
235 },
236 streaming_job_count,
237 meta_backend: MetaBackend::Sql,
238 job_desc: stream_job_desc,
239 cluster_type: telemetry_cluster_type_from_env_var()?,
241 object_store_media_type: self.object_store_media_type,
242 connector_usage_json_str: connector_usage,
243 })
244 }
245
246 fn report_type(&self) -> &str {
247 TELEMETRY_META_REPORT_TYPE
248 }
249}