risingwave_rpc_client/
monitor_client.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::RpcClientConfig;
20use risingwave_common::monitor::EndpointExt as _;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
23use risingwave_pb::monitor_service::{
24    AnalyzeHeapRequest, AnalyzeHeapResponse, GetProfileStatsRequest, GetProfileStatsResponse,
25    GetStreamingStatsRequest, GetStreamingStatsResponse, HeapProfilingRequest,
26    HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest,
27    ProfilingResponse, StackTraceRequest, StackTraceResponse,
28};
29use tonic::transport::Endpoint;
30
31use crate::channel::{Channel, WrappedChannelExt};
32use crate::error::{Result, RpcError};
33use crate::{RpcClient, RpcClientPool};
34
35/// Client for monitoring and profiling.
36///
37/// Can be applied to any type of worker node, though some methods may not be supported. See
38/// documentation of each method for availability.
39#[derive(Clone)]
40pub struct MonitorClient {
41    pub monitor_client: MonitorServiceClient<Channel>,
42}
43
44impl MonitorClient {
45    pub async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
46        let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
47            .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
48            .monitored_connect("grpc-monitor-client", Default::default())
49            .await?
50            .wrapped();
51
52        Ok(Self {
53            monitor_client: MonitorServiceClient::new(channel),
54        })
55    }
56
57    /// Available on meta node, compute node, and compactor node.
58    ///
59    /// Note: for meta node, it will gather await tree from all nodes in the cluster besides itself.
60    pub async fn await_tree(&self, req: StackTraceRequest) -> Result<StackTraceResponse> {
61        Ok(self
62            .monitor_client
63            .clone()
64            .stack_trace(req)
65            .await
66            .map_err(RpcError::from_monitor_status)?
67            .into_inner())
68    }
69
70    /// Available on compute node.
71    pub async fn get_streaming_stats(&self) -> Result<GetStreamingStatsResponse> {
72        Ok(self
73            .monitor_client
74            .clone()
75            .get_streaming_stats(GetStreamingStatsRequest::default())
76            .await
77            .map_err(RpcError::from_monitor_status)?
78            .into_inner())
79    }
80
81    /// Available on compute node.
82    pub async fn get_profile_stats(
83        &self,
84        request: GetProfileStatsRequest,
85    ) -> Result<GetProfileStatsResponse> {
86        Ok(self
87            .monitor_client
88            .clone()
89            .get_profile_stats(request)
90            .await
91            .map_err(RpcError::from_monitor_status)?
92            .into_inner())
93    }
94
95    /// Available on meta node, compute node, compactor node, and frontend node.
96    pub async fn profile(&self, sleep_s: u64) -> Result<ProfilingResponse> {
97        Ok(self
98            .monitor_client
99            .clone()
100            .profiling(ProfilingRequest { sleep_s })
101            .await
102            .map_err(RpcError::from_monitor_status)?
103            .into_inner())
104    }
105
106    /// Available on meta node, compute node, compactor node, and frontend node.
107    pub async fn heap_profile(&self, dir: String) -> Result<HeapProfilingResponse> {
108        Ok(self
109            .monitor_client
110            .clone()
111            .heap_profiling(HeapProfilingRequest { dir })
112            .await
113            .map_err(RpcError::from_monitor_status)?
114            .into_inner())
115    }
116
117    /// Available on meta node, compute node, compactor node, and frontend node.
118    pub async fn list_heap_profile(&self) -> Result<ListHeapProfilingResponse> {
119        Ok(self
120            .monitor_client
121            .clone()
122            .list_heap_profiling(ListHeapProfilingRequest {})
123            .await
124            .map_err(RpcError::from_monitor_status)?
125            .into_inner())
126    }
127
128    /// Available on meta node, compute node, compactor node, and frontend node.
129    pub async fn analyze_heap(&self, path: String) -> Result<AnalyzeHeapResponse> {
130        Ok(self
131            .monitor_client
132            .clone()
133            .analyze_heap(AnalyzeHeapRequest { path })
134            .await
135            .map_err(RpcError::from_monitor_status)?
136            .into_inner())
137    }
138}
139
140pub type MonitorClientPool = RpcClientPool<MonitorClient>;
141pub type MonitorClientPoolRef = Arc<MonitorClientPool>;
142
143#[async_trait]
144impl RpcClient for MonitorClient {
145    async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
146        Self::new(host_addr, opts).await
147    }
148}