risingwave_rpc_client/
monitor_client.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::RpcClientConfig;
20use risingwave_common::monitor::EndpointExt as _;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::configured_monitor_service_client;
23use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
24use risingwave_pb::monitor_service::{
25    AnalyzeHeapRequest, AnalyzeHeapResponse, GetProfileStatsRequest, GetProfileStatsResponse,
26    GetStreamingStatsRequest, GetStreamingStatsResponse, HeapProfilingRequest,
27    HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest,
28    ProfilingResponse, StackTraceRequest, StackTraceResponse,
29};
30use tonic::transport::Endpoint;
31
32use crate::channel::{Channel, WrappedChannelExt};
33use crate::error::{Result, RpcError};
34use crate::{RpcClient, RpcClientPool};
35
36/// Client for monitoring and profiling.
37///
38/// Can be applied to any type of worker node, though some methods may not be supported. See
39/// documentation of each method for availability.
40#[derive(Clone)]
41pub struct MonitorClient {
42    pub monitor_client: MonitorServiceClient<Channel>,
43}
44
45impl MonitorClient {
46    pub async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
47        let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
48            .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
49            .monitored_connect("grpc-monitor-client", Default::default())
50            .await?
51            .wrapped();
52
53        Ok(Self {
54            monitor_client: configured_monitor_service_client(MonitorServiceClient::new(channel)),
55        })
56    }
57
58    /// Available on meta node, compute node, and compactor node.
59    ///
60    /// Note: for meta node, it will gather await tree from all nodes in the cluster besides itself.
61    pub async fn await_tree(&self, req: StackTraceRequest) -> Result<StackTraceResponse> {
62        Ok(self
63            .monitor_client
64            .clone()
65            .stack_trace(req)
66            .await
67            .map_err(RpcError::from_monitor_status)?
68            .into_inner())
69    }
70
71    /// Available on compute node.
72    pub async fn get_streaming_stats(&self) -> Result<GetStreamingStatsResponse> {
73        Ok(self
74            .monitor_client
75            .clone()
76            .get_streaming_stats(GetStreamingStatsRequest::default())
77            .await
78            .map_err(RpcError::from_monitor_status)?
79            .into_inner())
80    }
81
82    /// Available on compute node.
83    pub async fn get_profile_stats(
84        &self,
85        request: GetProfileStatsRequest,
86    ) -> Result<GetProfileStatsResponse> {
87        Ok(self
88            .monitor_client
89            .clone()
90            .get_profile_stats(request)
91            .await
92            .map_err(RpcError::from_monitor_status)?
93            .into_inner())
94    }
95
96    /// Available on meta node, compute node, compactor node, and frontend node.
97    pub async fn profile(&self, sleep_s: u64) -> Result<ProfilingResponse> {
98        Ok(self
99            .monitor_client
100            .clone()
101            .profiling(ProfilingRequest { sleep_s })
102            .await
103            .map_err(RpcError::from_monitor_status)?
104            .into_inner())
105    }
106
107    /// Available on meta node, compute node, compactor node, and frontend node.
108    pub async fn heap_profile(&self, dir: String) -> Result<HeapProfilingResponse> {
109        Ok(self
110            .monitor_client
111            .clone()
112            .heap_profiling(HeapProfilingRequest { dir })
113            .await
114            .map_err(RpcError::from_monitor_status)?
115            .into_inner())
116    }
117
118    /// Available on meta node, compute node, compactor node, and frontend node.
119    pub async fn list_heap_profile(&self) -> Result<ListHeapProfilingResponse> {
120        Ok(self
121            .monitor_client
122            .clone()
123            .list_heap_profiling(ListHeapProfilingRequest {})
124            .await
125            .map_err(RpcError::from_monitor_status)?
126            .into_inner())
127    }
128
129    /// Available on meta node, compute node, compactor node, and frontend node.
130    pub async fn analyze_heap(&self, path: String) -> Result<AnalyzeHeapResponse> {
131        Ok(self
132            .monitor_client
133            .clone()
134            .analyze_heap(AnalyzeHeapRequest { path })
135            .await
136            .map_err(RpcError::from_monitor_status)?
137            .into_inner())
138    }
139}
140
141pub type MonitorClientPool = RpcClientPool<MonitorClient>;
142pub type MonitorClientPoolRef = Arc<MonitorClientPool>;
143
144#[async_trait]
145impl RpcClient for MonitorClient {
146    async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
147        Self::new(host_addr, opts).await
148    }
149}