risingwave_compute/
telemetry.rs1use prost::Message;
16use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
17use risingwave_common::telemetry::report::TelemetryReportCreator;
18use risingwave_common::telemetry::{
19 SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
20};
21use serde::{Deserialize, Serialize};
22
23const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute";
24
25#[derive(Clone, Copy)]
26pub(crate) struct ComputeTelemetryCreator {}
27
28impl ComputeTelemetryCreator {
29 pub(crate) fn new() -> Self {
30 Self {}
31 }
32}
33
34#[async_trait::async_trait]
35impl TelemetryReportCreator for ComputeTelemetryCreator {
36 #[allow(refining_impl_trait)]
37 async fn create_report(
38 &self,
39 tracking_id: String,
40 session_id: String,
41 up_time: u64,
42 ) -> TelemetryResult<ComputeTelemetryReport> {
43 Ok(ComputeTelemetryReport::new(
44 tracking_id,
45 session_id,
46 up_time,
47 ))
48 }
49
50 fn report_type(&self) -> &str {
51 TELEMETRY_COMPUTE_REPORT_TYPE
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56pub(crate) struct ComputeTelemetryReport {
57 #[serde(flatten)]
58 base: TelemetryReportBase,
59}
60
61impl TelemetryToProtobuf for ComputeTelemetryReport {
62 fn to_pb_bytes(self) -> Vec<u8> {
63 let pb_report = risingwave_pb::telemetry::ComputeReport {
64 base: Some(self.base.into()),
65 };
66 pb_report.encode_to_vec()
67 }
68}
69
70impl ComputeTelemetryReport {
71 pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
72 Self {
73 base: TelemetryReportBase {
74 tracking_id,
75 session_id,
76 up_time,
77 system_data: SystemData::new(),
78 time_stamp: current_timestamp(),
79 node_type: TelemetryNodeType::Compute,
80 is_test: false,
81 },
82 }
83 }
84}