risingwave_compute/
telemetry.rs1use prost::Message;
16use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
17use risingwave_common::telemetry::report::TelemetryReportCreator;
18use risingwave_common::telemetry::{
19 SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
20};
21use serde::{Deserialize, Serialize};
22
23const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute";
24
25#[derive(Clone, Copy)]
26pub(crate) struct ComputeTelemetryCreator {}
27
28impl ComputeTelemetryCreator {
29 pub(crate) fn new() -> Self {
30 Self {}
31 }
32}
33
34#[async_trait::async_trait]
35impl TelemetryReportCreator for ComputeTelemetryCreator {
36 async fn create_report(
37 &self,
38 tracking_id: String,
39 session_id: String,
40 up_time: u64,
41 ) -> TelemetryResult<ComputeTelemetryReport> {
42 Ok(ComputeTelemetryReport::new(
43 tracking_id,
44 session_id,
45 up_time,
46 ))
47 }
48
49 fn report_type(&self) -> &str {
50 TELEMETRY_COMPUTE_REPORT_TYPE
51 }
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub(crate) struct ComputeTelemetryReport {
56 #[serde(flatten)]
57 base: TelemetryReportBase,
58}
59
60impl TelemetryToProtobuf for ComputeTelemetryReport {
61 fn to_pb_bytes(self) -> Vec<u8> {
62 let pb_report = risingwave_pb::telemetry::ComputeReport {
63 base: Some(self.base.into()),
64 };
65 pb_report.encode_to_vec()
66 }
67}
68
69impl ComputeTelemetryReport {
70 pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
71 Self {
72 base: TelemetryReportBase {
73 tracking_id,
74 session_id,
75 up_time,
76 system_data: SystemData::new(),
77 time_stamp: current_timestamp(),
78 node_type: TelemetryNodeType::Compute,
79 is_test: false,
80 },
81 }
82 }
83}