risingwave_rpc_client/
monitor_client.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::RpcClientConfig;
20use risingwave_common::monitor::EndpointExt as _;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
23use risingwave_pb::monitor_service::{
24 AnalyzeHeapRequest, AnalyzeHeapResponse, GetProfileStatsRequest, GetProfileStatsResponse,
25 GetStreamingStatsRequest, GetStreamingStatsResponse, HeapProfilingRequest,
26 HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest,
27 ProfilingResponse, StackTraceRequest, StackTraceResponse,
28};
29use tonic::transport::Endpoint;
30
31use crate::channel::{Channel, WrappedChannelExt};
32use crate::error::{Result, RpcError};
33use crate::{RpcClient, RpcClientPool};
34
35#[derive(Clone)]
40pub struct MonitorClient {
41 pub monitor_client: MonitorServiceClient<Channel>,
42}
43
44impl MonitorClient {
45 pub async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
46 let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
47 .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
48 .monitored_connect("grpc-monitor-client", Default::default())
49 .await?
50 .wrapped();
51
52 Ok(Self {
53 monitor_client: MonitorServiceClient::new(channel),
54 })
55 }
56
57 pub async fn await_tree(&self, req: StackTraceRequest) -> Result<StackTraceResponse> {
61 Ok(self
62 .monitor_client
63 .clone()
64 .stack_trace(req)
65 .await
66 .map_err(RpcError::from_monitor_status)?
67 .into_inner())
68 }
69
70 pub async fn get_streaming_stats(&self) -> Result<GetStreamingStatsResponse> {
72 Ok(self
73 .monitor_client
74 .clone()
75 .get_streaming_stats(GetStreamingStatsRequest::default())
76 .await
77 .map_err(RpcError::from_monitor_status)?
78 .into_inner())
79 }
80
81 pub async fn get_profile_stats(
83 &self,
84 request: GetProfileStatsRequest,
85 ) -> Result<GetProfileStatsResponse> {
86 Ok(self
87 .monitor_client
88 .clone()
89 .get_profile_stats(request)
90 .await
91 .map_err(RpcError::from_monitor_status)?
92 .into_inner())
93 }
94
95 pub async fn profile(&self, sleep_s: u64) -> Result<ProfilingResponse> {
97 Ok(self
98 .monitor_client
99 .clone()
100 .profiling(ProfilingRequest { sleep_s })
101 .await
102 .map_err(RpcError::from_monitor_status)?
103 .into_inner())
104 }
105
106 pub async fn heap_profile(&self, dir: String) -> Result<HeapProfilingResponse> {
108 Ok(self
109 .monitor_client
110 .clone()
111 .heap_profiling(HeapProfilingRequest { dir })
112 .await
113 .map_err(RpcError::from_monitor_status)?
114 .into_inner())
115 }
116
117 pub async fn list_heap_profile(&self) -> Result<ListHeapProfilingResponse> {
119 Ok(self
120 .monitor_client
121 .clone()
122 .list_heap_profiling(ListHeapProfilingRequest {})
123 .await
124 .map_err(RpcError::from_monitor_status)?
125 .into_inner())
126 }
127
128 pub async fn analyze_heap(&self, path: String) -> Result<AnalyzeHeapResponse> {
130 Ok(self
131 .monitor_client
132 .clone()
133 .analyze_heap(AnalyzeHeapRequest { path })
134 .await
135 .map_err(RpcError::from_monitor_status)?
136 .into_inner())
137 }
138}
139
140pub type MonitorClientPool = RpcClientPool<MonitorClient>;
141pub type MonitorClientPoolRef = Arc<MonitorClientPool>;
142
143#[async_trait]
144impl RpcClient for MonitorClient {
145 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
146 Self::new(host_addr, opts).await
147 }
148}