risingwave_compactor/
telemetry.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use prost::Message;
16use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
17use risingwave_common::telemetry::report::TelemetryReportCreator;
18use risingwave_common::telemetry::{
19    SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
20    report_event_common,
21};
22use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
23use serde::{Deserialize, Serialize};
24
25const TELEMETRY_COMPACTOR_REPORT_TYPE: &str = "compactor";
26
27#[allow(dead_code)] // please remove when used
28pub(crate) fn report_event(
29    event_stage: PbTelemetryEventStage,
30    event_name: &str,
31    catalog_id: i64,
32    connector_name: Option<String>,
33    object: Option<PbTelemetryDatabaseObject>,
34    attributes: Option<jsonbb::Value>, // json object
35) {
36    report_event_common(
37        event_stage,
38        event_name,
39        catalog_id,
40        connector_name,
41        object,
42        attributes,
43        TELEMETRY_COMPACTOR_REPORT_TYPE.to_owned(),
44    );
45}
46
47#[derive(Clone, Copy)]
48pub(crate) struct CompactorTelemetryCreator {}
49
50impl CompactorTelemetryCreator {
51    pub(crate) fn new() -> Self {
52        Self {}
53    }
54}
55
56#[async_trait::async_trait]
57impl TelemetryReportCreator for CompactorTelemetryCreator {
58    #[allow(refining_impl_trait)]
59    async fn create_report(
60        &self,
61        tracking_id: String,
62        session_id: String,
63        up_time: u64,
64    ) -> TelemetryResult<CompactorTelemetryReport> {
65        Ok(CompactorTelemetryReport::new(
66            tracking_id,
67            session_id,
68            up_time,
69        ))
70    }
71
72    fn report_type(&self) -> &str {
73        TELEMETRY_COMPACTOR_REPORT_TYPE
74    }
75}
76
77impl TelemetryToProtobuf for CompactorTelemetryReport {
78    fn to_pb_bytes(self) -> Vec<u8> {
79        let pb_report = risingwave_pb::telemetry::CompactorReport {
80            base: Some(self.base.into()),
81        };
82        pb_report.encode_to_vec()
83    }
84}
85
86#[derive(Serialize, Deserialize)]
87pub(crate) struct CompactorTelemetryReport {
88    #[serde(flatten)]
89    base: TelemetryReportBase,
90}
91
92impl CompactorTelemetryReport {
93    pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
94        Self {
95            base: TelemetryReportBase {
96                tracking_id,
97                session_id,
98                system_data: SystemData::new(),
99                up_time,
100                time_stamp: current_timestamp(),
101                node_type: TelemetryNodeType::Compactor,
102                is_test: false,
103            },
104        }
105    }
106}