risingwave_compactor/
rpc.rs1use risingwave_common::config::ServerConfig;
16use risingwave_common_heap_profiling::ProfileServiceImpl;
17use risingwave_pb::compactor::compactor_service_server::CompactorService;
18use risingwave_pb::compactor::{
19 DispatchCompactionTaskRequest, DispatchCompactionTaskResponse, EchoRequest, EchoResponse,
20};
21use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
22use risingwave_pb::monitor_service::{
23 AnalyzeHeapRequest, AnalyzeHeapResponse, GetProfileStatsRequest, GetProfileStatsResponse,
24 GetStreamingStatsRequest, GetStreamingStatsResponse, HeapProfilingRequest,
25 HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest,
26 ProfilingResponse, StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest,
27 TieredCacheTracingResponse,
28};
29use risingwave_storage::hummock::compactor::CompactionAwaitTreeRegRef;
30use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
31use tokio::sync::mpsc;
32use tonic::{Request, Response, Status};
33
34#[derive(Default)]
35pub struct CompactorServiceImpl {
36 sender: Option<mpsc::UnboundedSender<Request<DispatchCompactionTaskRequest>>>,
37}
38impl CompactorServiceImpl {
39 pub fn new(sender: mpsc::UnboundedSender<Request<DispatchCompactionTaskRequest>>) -> Self {
40 Self {
41 sender: Some(sender),
42 }
43 }
44}
45#[async_trait::async_trait]
46impl CompactorService for CompactorServiceImpl {
47 async fn echo(&self, _request: Request<EchoRequest>) -> Result<Response<EchoResponse>, Status> {
48 Ok(Response::new(EchoResponse {}))
49 }
50
51 async fn dispatch_compaction_task(
52 &self,
53 request: Request<DispatchCompactionTaskRequest>,
54 ) -> Result<Response<DispatchCompactionTaskResponse>, Status> {
55 match &self.sender.as_ref() {
56 Some(sender) => {
57 sender
58 .send(request)
59 .expect("DispatchCompactionTaskRequest should be able to send");
60 }
61 None => {
62 tracing::error!(
63 "fail to send DispatchCompactionTaskRequest, sender has not been initialized."
64 );
65 }
66 }
67 Ok(Response::new(DispatchCompactionTaskResponse {
68 status: None,
69 }))
70 }
71}
72
73pub struct MonitorServiceImpl {
75 await_tree_reg: Option<CompactionAwaitTreeRegRef>,
76 profile_service: ProfileServiceImpl,
77}
78
79impl MonitorServiceImpl {
80 pub fn new(
81 await_tree_reg: Option<CompactionAwaitTreeRegRef>,
82 server_config: ServerConfig,
83 ) -> Self {
84 Self {
85 await_tree_reg,
86 profile_service: ProfileServiceImpl::new(server_config),
87 }
88 }
89}
90
91#[async_trait::async_trait]
92impl MonitorService for MonitorServiceImpl {
93 async fn stack_trace(
94 &self,
95 _request: Request<StackTraceRequest>,
96 ) -> Result<Response<StackTraceResponse>, Status> {
97 let compaction_task_traces = match &self.await_tree_reg {
98 None => Default::default(),
99 Some(await_tree_reg) => await_tree_reg
100 .collect::<Compaction>()
101 .into_iter()
102 .map(|(k, v)| (format!("{k:?}"), v.to_string()))
103 .collect(),
104 };
105 Ok(Response::new(StackTraceResponse {
106 compaction_task_traces,
107 ..Default::default()
108 }))
109 }
110
111 async fn profiling(
112 &self,
113 request: Request<ProfilingRequest>,
114 ) -> Result<Response<ProfilingResponse>, Status> {
115 self.profile_service.profiling(request).await
116 }
117
118 async fn heap_profiling(
119 &self,
120 request: Request<HeapProfilingRequest>,
121 ) -> Result<Response<HeapProfilingResponse>, Status> {
122 self.profile_service.heap_profiling(request).await
123 }
124
125 async fn list_heap_profiling(
126 &self,
127 request: Request<ListHeapProfilingRequest>,
128 ) -> Result<Response<ListHeapProfilingResponse>, Status> {
129 self.profile_service.list_heap_profiling(request).await
130 }
131
132 async fn analyze_heap(
133 &self,
134 request: Request<AnalyzeHeapRequest>,
135 ) -> Result<Response<AnalyzeHeapResponse>, Status> {
136 self.profile_service.analyze_heap(request).await
137 }
138
139 async fn get_streaming_stats(
140 &self,
141 _request: Request<GetStreamingStatsRequest>,
142 ) -> Result<Response<GetStreamingStatsResponse>, Status> {
143 Err(Status::unimplemented(
144 "Get Back Pressure unimplemented in compactor",
145 ))
146 }
147
148 async fn tiered_cache_tracing(
149 &self,
150 _: Request<TieredCacheTracingRequest>,
151 ) -> Result<Response<TieredCacheTracingResponse>, Status> {
152 Err(Status::unimplemented(
153 "Tiered Cache Tracing unimplemented in compactor",
154 ))
155 }
156
157 async fn get_profile_stats(
158 &self,
159 _request: Request<GetProfileStatsRequest>,
160 ) -> Result<Response<GetProfileStatsResponse>, Status> {
161 Err(Status::unimplemented(
162 "Get Profile Stats unimplemented in compactor",
163 ))
164 }
165}