risingwave_meta_service/
monitor_service.rs1use risingwave_common::config::ServerConfig;
16use risingwave_common_service::ProfileServiceImpl;
17use risingwave_meta::manager::MetadataManager;
18use risingwave_meta::rpc::await_tree::dump_cluster_await_tree;
19use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
20use risingwave_pb::monitor_service::{self, StackTraceRequest, StackTraceResponse};
21use tonic::{Request, Response, Status};
22
23pub struct MonitorServiceImpl {
28 pub metadata_manager: MetadataManager,
29 pub await_tree_reg: await_tree::Registry,
30 pub profile_service: ProfileServiceImpl,
31}
32
33impl MonitorServiceImpl {
34 pub fn new(
35 metadata_manager: MetadataManager,
36 await_tree_reg: await_tree::Registry,
37 server_config: ServerConfig,
38 ) -> Self {
39 Self {
40 metadata_manager,
41 await_tree_reg,
42 profile_service: ProfileServiceImpl::new(server_config),
43 }
44 }
45}
46
47#[tonic::async_trait]
48impl MonitorService for MonitorServiceImpl {
49 async fn stack_trace(
50 &self,
51 request: Request<StackTraceRequest>,
52 ) -> Result<Response<StackTraceResponse>, Status> {
53 let request = request.into_inner();
54 let actor_traces_format = request.actor_traces_format();
55
56 let result = dump_cluster_await_tree(
57 &self.metadata_manager,
58 &self.await_tree_reg,
59 actor_traces_format,
60 )
61 .await?;
62
63 Ok(Response::new(result))
64 }
65
66 async fn profiling(
67 &self,
68 request: Request<monitor_service::ProfilingRequest>,
69 ) -> Result<Response<monitor_service::ProfilingResponse>, Status> {
70 self.profile_service.profiling(request).await
71 }
72
73 async fn heap_profiling(
74 &self,
75 request: Request<monitor_service::HeapProfilingRequest>,
76 ) -> Result<Response<monitor_service::HeapProfilingResponse>, Status> {
77 self.profile_service.heap_profiling(request).await
78 }
79
80 async fn list_heap_profiling(
81 &self,
82 request: Request<monitor_service::ListHeapProfilingRequest>,
83 ) -> Result<Response<monitor_service::ListHeapProfilingResponse>, Status> {
84 self.profile_service.list_heap_profiling(request).await
85 }
86
87 async fn analyze_heap(
88 &self,
89 request: Request<monitor_service::AnalyzeHeapRequest>,
90 ) -> Result<Response<monitor_service::AnalyzeHeapResponse>, Status> {
91 self.profile_service.analyze_heap(request).await
92 }
93
94 async fn get_streaming_stats(
95 &self,
96 _request: Request<monitor_service::GetStreamingStatsRequest>,
97 ) -> Result<Response<monitor_service::GetStreamingStatsResponse>, Status> {
98 Err(Status::unimplemented("not implemented in meta node"))
99 }
100
101 async fn tiered_cache_tracing(
102 &self,
103 _request: Request<monitor_service::TieredCacheTracingRequest>,
104 ) -> Result<Response<monitor_service::TieredCacheTracingResponse>, Status> {
105 Err(Status::unimplemented("not implemented in meta node"))
106 }
107
108 async fn get_profile_stats(
109 &self,
110 _request: Request<monitor_service::GetProfileStatsRequest>,
111 ) -> Result<Response<monitor_service::GetProfileStatsResponse>, Status> {
112 Err(Status::unimplemented("not implemented in meta node"))
113 }
114}