risingwave_compactor/
telemetry.rs1use prost::Message;
16use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
17use risingwave_common::telemetry::report::TelemetryReportCreator;
18use risingwave_common::telemetry::{
19 SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
20 report_event_common,
21};
22use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
23use serde::{Deserialize, Serialize};
24
25const TELEMETRY_COMPACTOR_REPORT_TYPE: &str = "compactor";
26
27#[allow(dead_code)] pub(crate) fn report_event(
29 event_stage: PbTelemetryEventStage,
30 event_name: &str,
31 catalog_id: i64,
32 connector_name: Option<String>,
33 object: Option<PbTelemetryDatabaseObject>,
34 attributes: Option<jsonbb::Value>, ) {
36 report_event_common(
37 event_stage,
38 event_name,
39 catalog_id,
40 connector_name,
41 object,
42 attributes,
43 TELEMETRY_COMPACTOR_REPORT_TYPE.to_owned(),
44 );
45}
46
47#[derive(Clone, Copy)]
48pub(crate) struct CompactorTelemetryCreator {}
49
50impl CompactorTelemetryCreator {
51 pub(crate) fn new() -> Self {
52 Self {}
53 }
54}
55
56#[async_trait::async_trait]
57impl TelemetryReportCreator for CompactorTelemetryCreator {
58 #[allow(refining_impl_trait)]
59 async fn create_report(
60 &self,
61 tracking_id: String,
62 session_id: String,
63 up_time: u64,
64 ) -> TelemetryResult<CompactorTelemetryReport> {
65 Ok(CompactorTelemetryReport::new(
66 tracking_id,
67 session_id,
68 up_time,
69 ))
70 }
71
72 fn report_type(&self) -> &str {
73 TELEMETRY_COMPACTOR_REPORT_TYPE
74 }
75}
76
77impl TelemetryToProtobuf for CompactorTelemetryReport {
78 fn to_pb_bytes(self) -> Vec<u8> {
79 let pb_report = risingwave_pb::telemetry::CompactorReport {
80 base: Some(self.base.into()),
81 };
82 pb_report.encode_to_vec()
83 }
84}
85
86#[derive(Serialize, Deserialize)]
87pub(crate) struct CompactorTelemetryReport {
88 #[serde(flatten)]
89 base: TelemetryReportBase,
90}
91
92impl CompactorTelemetryReport {
93 pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
94 Self {
95 base: TelemetryReportBase {
96 tracking_id,
97 session_id,
98 system_data: SystemData::new(),
99 up_time,
100 time_stamp: current_timestamp(),
101 node_type: TelemetryNodeType::Compactor,
102 is_test: false,
103 },
104 }
105 }
106}