risingwave_compactor/
rpc.rs1use risingwave_pb::compactor::compactor_service_server::CompactorService;
16use risingwave_pb::compactor::{
17 DispatchCompactionTaskRequest, DispatchCompactionTaskResponse, EchoRequest, EchoResponse,
18};
19use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
20use risingwave_pb::monitor_service::{
21 AnalyzeHeapRequest, AnalyzeHeapResponse, GetProfileStatsRequest, GetProfileStatsResponse,
22 GetStreamingStatsRequest, GetStreamingStatsResponse, HeapProfilingRequest,
23 HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest,
24 ProfilingResponse, StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest,
25 TieredCacheTracingResponse,
26};
27use risingwave_storage::hummock::compactor::CompactionAwaitTreeRegRef;
28use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
29use tokio::sync::mpsc;
30use tonic::{Request, Response, Status};
31
32#[derive(Default)]
33pub struct CompactorServiceImpl {
34 sender: Option<mpsc::UnboundedSender<Request<DispatchCompactionTaskRequest>>>,
35}
36impl CompactorServiceImpl {
37 pub fn new(sender: mpsc::UnboundedSender<Request<DispatchCompactionTaskRequest>>) -> Self {
38 Self {
39 sender: Some(sender),
40 }
41 }
42}
43#[async_trait::async_trait]
44impl CompactorService for CompactorServiceImpl {
45 async fn echo(&self, _request: Request<EchoRequest>) -> Result<Response<EchoResponse>, Status> {
46 Ok(Response::new(EchoResponse {}))
47 }
48
49 async fn dispatch_compaction_task(
50 &self,
51 request: Request<DispatchCompactionTaskRequest>,
52 ) -> Result<Response<DispatchCompactionTaskResponse>, Status> {
53 match &self.sender.as_ref() {
54 Some(sender) => {
55 sender
56 .send(request)
57 .expect("DispatchCompactionTaskRequest should be able to send");
58 }
59 None => {
60 tracing::error!(
61 "fail to send DispatchCompactionTaskRequest, sender has not been initialized."
62 );
63 }
64 }
65 Ok(Response::new(DispatchCompactionTaskResponse {
66 status: None,
67 }))
68 }
69}
70
71pub struct MonitorServiceImpl {
72 await_tree_reg: Option<CompactionAwaitTreeRegRef>,
73}
74
75impl MonitorServiceImpl {
76 pub fn new(await_tree_reg: Option<CompactionAwaitTreeRegRef>) -> Self {
77 Self { await_tree_reg }
78 }
79}
80
81#[async_trait::async_trait]
82impl MonitorService for MonitorServiceImpl {
83 async fn stack_trace(
84 &self,
85 _request: Request<StackTraceRequest>,
86 ) -> Result<Response<StackTraceResponse>, Status> {
87 let compaction_task_traces = match &self.await_tree_reg {
88 None => Default::default(),
89 Some(await_tree_reg) => await_tree_reg
90 .collect::<Compaction>()
91 .into_iter()
92 .map(|(k, v)| (format!("{k:?}"), v.to_string()))
93 .collect(),
94 };
95 Ok(Response::new(StackTraceResponse {
96 compaction_task_traces,
97 ..Default::default()
98 }))
99 }
100
101 async fn profiling(
102 &self,
103 _request: Request<ProfilingRequest>,
104 ) -> Result<Response<ProfilingResponse>, Status> {
105 Err(Status::unimplemented(
106 "CPU profiling unimplemented in compactor",
107 ))
108 }
109
110 async fn heap_profiling(
111 &self,
112 _request: Request<HeapProfilingRequest>,
113 ) -> Result<Response<HeapProfilingResponse>, Status> {
114 Err(Status::unimplemented(
115 "Heap profiling unimplemented in compactor",
116 ))
117 }
118
119 async fn list_heap_profiling(
120 &self,
121 _request: Request<ListHeapProfilingRequest>,
122 ) -> Result<Response<ListHeapProfilingResponse>, Status> {
123 Err(Status::unimplemented(
124 "Heap profiling unimplemented in compactor",
125 ))
126 }
127
128 async fn analyze_heap(
129 &self,
130 _request: Request<AnalyzeHeapRequest>,
131 ) -> Result<Response<AnalyzeHeapResponse>, Status> {
132 Err(Status::unimplemented(
133 "Heap profiling unimplemented in compactor",
134 ))
135 }
136
137 async fn get_streaming_stats(
138 &self,
139 _request: Request<GetStreamingStatsRequest>,
140 ) -> Result<Response<GetStreamingStatsResponse>, Status> {
141 Err(Status::unimplemented(
142 "Get Back Pressure unimplemented in compactor",
143 ))
144 }
145
146 async fn tiered_cache_tracing(
147 &self,
148 _: Request<TieredCacheTracingRequest>,
149 ) -> Result<Response<TieredCacheTracingResponse>, Status> {
150 Err(Status::unimplemented(
151 "Tiered Cache Tracing unimplemented in compactor",
152 ))
153 }
154
155 async fn get_profile_stats(
156 &self,
157 _request: Request<GetProfileStatsRequest>,
158 ) -> Result<Response<GetProfileStatsResponse>, Status> {
159 Err(Status::unimplemented(
160 "Get Profile Stats unimplemented in compactor",
161 ))
162 }
163}