risingwave_rpc_client/
monitor_client.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::RpcClientConfig;
20use risingwave_common::monitor::EndpointExt as _;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::configured_monitor_service_client;
23use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
24use risingwave_pb::monitor_service::{
25 AnalyzeHeapRequest, AnalyzeHeapResponse, GetProfileStatsRequest, GetProfileStatsResponse,
26 GetStreamingStatsRequest, GetStreamingStatsResponse, HeapProfilingRequest,
27 HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest,
28 ProfilingResponse, StackTraceRequest, StackTraceResponse,
29};
30use tonic::transport::Endpoint;
31
32use crate::channel::{Channel, WrappedChannelExt};
33use crate::error::{Result, RpcError};
34use crate::{RpcClient, RpcClientPool};
35
36#[derive(Clone)]
41pub struct MonitorClient {
42 pub monitor_client: MonitorServiceClient<Channel>,
43}
44
45impl MonitorClient {
46 pub async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
47 let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
48 .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
49 .monitored_connect("grpc-monitor-client", Default::default())
50 .await?
51 .wrapped();
52
53 Ok(Self {
54 monitor_client: configured_monitor_service_client(MonitorServiceClient::new(channel)),
55 })
56 }
57
58 pub async fn await_tree(&self, req: StackTraceRequest) -> Result<StackTraceResponse> {
62 Ok(self
63 .monitor_client
64 .clone()
65 .stack_trace(req)
66 .await
67 .map_err(RpcError::from_monitor_status)?
68 .into_inner())
69 }
70
71 pub async fn get_streaming_stats(&self) -> Result<GetStreamingStatsResponse> {
73 Ok(self
74 .monitor_client
75 .clone()
76 .get_streaming_stats(GetStreamingStatsRequest::default())
77 .await
78 .map_err(RpcError::from_monitor_status)?
79 .into_inner())
80 }
81
82 pub async fn get_profile_stats(
84 &self,
85 request: GetProfileStatsRequest,
86 ) -> Result<GetProfileStatsResponse> {
87 Ok(self
88 .monitor_client
89 .clone()
90 .get_profile_stats(request)
91 .await
92 .map_err(RpcError::from_monitor_status)?
93 .into_inner())
94 }
95
96 pub async fn profile(&self, sleep_s: u64) -> Result<ProfilingResponse> {
98 Ok(self
99 .monitor_client
100 .clone()
101 .profiling(ProfilingRequest { sleep_s })
102 .await
103 .map_err(RpcError::from_monitor_status)?
104 .into_inner())
105 }
106
107 pub async fn heap_profile(&self, dir: String) -> Result<HeapProfilingResponse> {
109 Ok(self
110 .monitor_client
111 .clone()
112 .heap_profiling(HeapProfilingRequest { dir })
113 .await
114 .map_err(RpcError::from_monitor_status)?
115 .into_inner())
116 }
117
118 pub async fn list_heap_profile(&self) -> Result<ListHeapProfilingResponse> {
120 Ok(self
121 .monitor_client
122 .clone()
123 .list_heap_profiling(ListHeapProfilingRequest {})
124 .await
125 .map_err(RpcError::from_monitor_status)?
126 .into_inner())
127 }
128
129 pub async fn analyze_heap(&self, path: String) -> Result<AnalyzeHeapResponse> {
131 Ok(self
132 .monitor_client
133 .clone()
134 .analyze_heap(AnalyzeHeapRequest { path })
135 .await
136 .map_err(RpcError::from_monitor_status)?
137 .into_inner())
138 }
139}
140
141pub type MonitorClientPool = RpcClientPool<MonitorClient>;
142pub type MonitorClientPoolRef = Arc<MonitorClientPool>;
143
144#[async_trait]
145impl RpcClient for MonitorClient {
146 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
147 Self::new(host_addr, opts).await
148 }
149}