risingwave_compactor/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::compactor::compactor_service_server::CompactorService;
16use risingwave_pb::compactor::{
17    DispatchCompactionTaskRequest, DispatchCompactionTaskResponse, EchoRequest, EchoResponse,
18};
19use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
20use risingwave_pb::monitor_service::{
21    AnalyzeHeapRequest, AnalyzeHeapResponse, GetProfileStatsRequest, GetProfileStatsResponse,
22    GetStreamingStatsRequest, GetStreamingStatsResponse, HeapProfilingRequest,
23    HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest,
24    ProfilingResponse, StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest,
25    TieredCacheTracingResponse,
26};
27use risingwave_storage::hummock::compactor::CompactionAwaitTreeRegRef;
28use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
29use tokio::sync::mpsc;
30use tonic::{Request, Response, Status};
31
32#[derive(Default)]
33pub struct CompactorServiceImpl {
34    sender: Option<mpsc::UnboundedSender<Request<DispatchCompactionTaskRequest>>>,
35}
36impl CompactorServiceImpl {
37    pub fn new(sender: mpsc::UnboundedSender<Request<DispatchCompactionTaskRequest>>) -> Self {
38        Self {
39            sender: Some(sender),
40        }
41    }
42}
43#[async_trait::async_trait]
44impl CompactorService for CompactorServiceImpl {
45    async fn echo(&self, _request: Request<EchoRequest>) -> Result<Response<EchoResponse>, Status> {
46        Ok(Response::new(EchoResponse {}))
47    }
48
49    async fn dispatch_compaction_task(
50        &self,
51        request: Request<DispatchCompactionTaskRequest>,
52    ) -> Result<Response<DispatchCompactionTaskResponse>, Status> {
53        match &self.sender.as_ref() {
54            Some(sender) => {
55                sender
56                    .send(request)
57                    .expect("DispatchCompactionTaskRequest should be able to send");
58            }
59            None => {
60                tracing::error!(
61                    "fail to send DispatchCompactionTaskRequest, sender has not been initialized."
62                );
63            }
64        }
65        Ok(Response::new(DispatchCompactionTaskResponse {
66            status: None,
67        }))
68    }
69}
70
71pub struct MonitorServiceImpl {
72    await_tree_reg: Option<CompactionAwaitTreeRegRef>,
73}
74
75impl MonitorServiceImpl {
76    pub fn new(await_tree_reg: Option<CompactionAwaitTreeRegRef>) -> Self {
77        Self { await_tree_reg }
78    }
79}
80
81#[async_trait::async_trait]
82impl MonitorService for MonitorServiceImpl {
83    async fn stack_trace(
84        &self,
85        _request: Request<StackTraceRequest>,
86    ) -> Result<Response<StackTraceResponse>, Status> {
87        let compaction_task_traces = match &self.await_tree_reg {
88            None => Default::default(),
89            Some(await_tree_reg) => await_tree_reg
90                .collect::<Compaction>()
91                .into_iter()
92                .map(|(k, v)| (format!("{k:?}"), v.to_string()))
93                .collect(),
94        };
95        Ok(Response::new(StackTraceResponse {
96            compaction_task_traces,
97            ..Default::default()
98        }))
99    }
100
101    async fn profiling(
102        &self,
103        _request: Request<ProfilingRequest>,
104    ) -> Result<Response<ProfilingResponse>, Status> {
105        Err(Status::unimplemented(
106            "CPU profiling unimplemented in compactor",
107        ))
108    }
109
110    async fn heap_profiling(
111        &self,
112        _request: Request<HeapProfilingRequest>,
113    ) -> Result<Response<HeapProfilingResponse>, Status> {
114        Err(Status::unimplemented(
115            "Heap profiling unimplemented in compactor",
116        ))
117    }
118
119    async fn list_heap_profiling(
120        &self,
121        _request: Request<ListHeapProfilingRequest>,
122    ) -> Result<Response<ListHeapProfilingResponse>, Status> {
123        Err(Status::unimplemented(
124            "Heap profiling unimplemented in compactor",
125        ))
126    }
127
128    async fn analyze_heap(
129        &self,
130        _request: Request<AnalyzeHeapRequest>,
131    ) -> Result<Response<AnalyzeHeapResponse>, Status> {
132        Err(Status::unimplemented(
133            "Heap profiling unimplemented in compactor",
134        ))
135    }
136
137    async fn get_streaming_stats(
138        &self,
139        _request: Request<GetStreamingStatsRequest>,
140    ) -> Result<Response<GetStreamingStatsResponse>, Status> {
141        Err(Status::unimplemented(
142            "Get Back Pressure unimplemented in compactor",
143        ))
144    }
145
146    async fn tiered_cache_tracing(
147        &self,
148        _: Request<TieredCacheTracingRequest>,
149    ) -> Result<Response<TieredCacheTracingResponse>, Status> {
150        Err(Status::unimplemented(
151            "Tiered Cache Tracing unimplemented in compactor",
152        ))
153    }
154
155    async fn get_profile_stats(
156        &self,
157        _request: Request<GetProfileStatsRequest>,
158    ) -> Result<Response<GetProfileStatsResponse>, Status> {
159        Err(Status::unimplemented(
160            "Get Profile Stats unimplemented in compactor",
161        ))
162    }
163}