risingwave_meta_service/
telemetry_service.rs1use risingwave_common::telemetry::telemetry_cluster_type_from_env_var;
16use risingwave_meta::controller::SqlMetaStore;
17use risingwave_meta_model::prelude::Cluster;
18use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoService;
19use risingwave_pb::meta::{GetTelemetryInfoRequest, TelemetryInfoResponse};
20use sea_orm::EntityTrait;
21use tonic::{Request, Response, Status};
22
23use crate::MetaResult;
24use crate::model::ClusterId;
25
26pub struct TelemetryInfoServiceImpl {
27 meta_store_impl: SqlMetaStore,
28}
29
30impl TelemetryInfoServiceImpl {
31 pub fn new(meta_store_impl: SqlMetaStore) -> Self {
32 Self { meta_store_impl }
33 }
34
35 async fn get_tracking_id(&self) -> MetaResult<Option<ClusterId>> {
36 let cluster = Cluster::find().one(&self.meta_store_impl.conn).await?;
37 let cluster_id = cluster.map(|c| c.cluster_id.to_string().into());
38 Ok(cluster_id)
39 }
40}
41
42#[async_trait::async_trait]
43impl TelemetryInfoService for TelemetryInfoServiceImpl {
44 async fn get_telemetry_info(
45 &self,
46 _request: Request<GetTelemetryInfoRequest>,
47 ) -> Result<Response<TelemetryInfoResponse>, Status> {
48 if telemetry_cluster_type_from_env_var().is_err() {
49 return Ok(Response::new(TelemetryInfoResponse { tracking_id: None }));
50 }
51 match self.get_tracking_id().await? {
52 Some(tracking_id) => Ok(Response::new(TelemetryInfoResponse {
53 tracking_id: Some(tracking_id.into()),
54 })),
55 None => Ok(Response::new(TelemetryInfoResponse { tracking_id: None })),
56 }
57 }
58}