risingwave_compute/
telemetry.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use prost::Message;
16use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
17use risingwave_common::telemetry::report::TelemetryReportCreator;
18use risingwave_common::telemetry::{
19    SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, current_timestamp,
20};
21use serde::{Deserialize, Serialize};
22
23const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute";
24
25#[derive(Clone, Copy)]
26pub(crate) struct ComputeTelemetryCreator {}
27
28impl ComputeTelemetryCreator {
29    pub(crate) fn new() -> Self {
30        Self {}
31    }
32}
33
34#[async_trait::async_trait]
35impl TelemetryReportCreator for ComputeTelemetryCreator {
36    #[allow(refining_impl_trait)]
37    async fn create_report(
38        &self,
39        tracking_id: String,
40        session_id: String,
41        up_time: u64,
42    ) -> TelemetryResult<ComputeTelemetryReport> {
43        Ok(ComputeTelemetryReport::new(
44            tracking_id,
45            session_id,
46            up_time,
47        ))
48    }
49
50    fn report_type(&self) -> &str {
51        TELEMETRY_COMPUTE_REPORT_TYPE
52    }
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56pub(crate) struct ComputeTelemetryReport {
57    #[serde(flatten)]
58    base: TelemetryReportBase,
59}
60
61impl TelemetryToProtobuf for ComputeTelemetryReport {
62    fn to_pb_bytes(self) -> Vec<u8> {
63        let pb_report = risingwave_pb::telemetry::ComputeReport {
64            base: Some(self.base.into()),
65        };
66        pb_report.encode_to_vec()
67    }
68}
69
70impl ComputeTelemetryReport {
71    pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
72        Self {
73            base: TelemetryReportBase {
74                tracking_id,
75                session_id,
76                up_time,
77                system_data: SystemData::new(),
78                time_stamp: current_timestamp(),
79                node_type: TelemetryNodeType::Compute,
80                is_test: false,
81            },
82        }
83    }
84}