risingwave_compactor/
rpc.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::config::ServerConfig;
16use risingwave_common_heap_profiling::ProfileServiceImpl;
17use risingwave_pb::compactor::compactor_service_server::CompactorService;
18use risingwave_pb::compactor::{
19    DispatchCompactionTaskRequest, DispatchCompactionTaskResponse, EchoRequest, EchoResponse,
20};
21use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
22use risingwave_pb::monitor_service::{
23    AnalyzeHeapRequest, AnalyzeHeapResponse, GetProfileStatsRequest, GetProfileStatsResponse,
24    GetStreamingStatsRequest, GetStreamingStatsResponse, HeapProfilingRequest,
25    HeapProfilingResponse, ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest,
26    ProfilingResponse, StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest,
27    TieredCacheTracingResponse,
28};
29use risingwave_storage::hummock::compactor::CompactionAwaitTreeRegRef;
30use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
31use tokio::sync::mpsc;
32use tonic::{Request, Response, Status};
33
34#[derive(Default)]
35pub struct CompactorServiceImpl {
36    sender: Option<mpsc::UnboundedSender<Request<DispatchCompactionTaskRequest>>>,
37}
38impl CompactorServiceImpl {
39    pub fn new(sender: mpsc::UnboundedSender<Request<DispatchCompactionTaskRequest>>) -> Self {
40        Self {
41            sender: Some(sender),
42        }
43    }
44}
45#[async_trait::async_trait]
46impl CompactorService for CompactorServiceImpl {
47    async fn echo(&self, _request: Request<EchoRequest>) -> Result<Response<EchoResponse>, Status> {
48        Ok(Response::new(EchoResponse {}))
49    }
50
51    async fn dispatch_compaction_task(
52        &self,
53        request: Request<DispatchCompactionTaskRequest>,
54    ) -> Result<Response<DispatchCompactionTaskResponse>, Status> {
55        match &self.sender.as_ref() {
56            Some(sender) => {
57                sender
58                    .send(request)
59                    .expect("DispatchCompactionTaskRequest should be able to send");
60            }
61            None => {
62                tracing::error!(
63                    "fail to send DispatchCompactionTaskRequest, sender has not been initialized."
64                );
65            }
66        }
67        Ok(Response::new(DispatchCompactionTaskResponse {
68            status: None,
69        }))
70    }
71}
72
73/// Compactor implementation of monitor RPCs, including profiling and compaction await-tree.
74pub struct MonitorServiceImpl {
75    await_tree_reg: Option<CompactionAwaitTreeRegRef>,
76    profile_service: ProfileServiceImpl,
77}
78
79impl MonitorServiceImpl {
80    pub fn new(
81        await_tree_reg: Option<CompactionAwaitTreeRegRef>,
82        server_config: ServerConfig,
83    ) -> Self {
84        Self {
85            await_tree_reg,
86            profile_service: ProfileServiceImpl::new(server_config),
87        }
88    }
89}
90
91#[async_trait::async_trait]
92impl MonitorService for MonitorServiceImpl {
93    async fn stack_trace(
94        &self,
95        _request: Request<StackTraceRequest>,
96    ) -> Result<Response<StackTraceResponse>, Status> {
97        let compaction_task_traces = match &self.await_tree_reg {
98            None => Default::default(),
99            Some(await_tree_reg) => await_tree_reg
100                .collect::<Compaction>()
101                .into_iter()
102                .map(|(k, v)| (format!("{k:?}"), v.to_string()))
103                .collect(),
104        };
105        Ok(Response::new(StackTraceResponse {
106            compaction_task_traces,
107            ..Default::default()
108        }))
109    }
110
111    async fn profiling(
112        &self,
113        request: Request<ProfilingRequest>,
114    ) -> Result<Response<ProfilingResponse>, Status> {
115        self.profile_service.profiling(request).await
116    }
117
118    async fn heap_profiling(
119        &self,
120        request: Request<HeapProfilingRequest>,
121    ) -> Result<Response<HeapProfilingResponse>, Status> {
122        self.profile_service.heap_profiling(request).await
123    }
124
125    async fn list_heap_profiling(
126        &self,
127        request: Request<ListHeapProfilingRequest>,
128    ) -> Result<Response<ListHeapProfilingResponse>, Status> {
129        self.profile_service.list_heap_profiling(request).await
130    }
131
132    async fn analyze_heap(
133        &self,
134        request: Request<AnalyzeHeapRequest>,
135    ) -> Result<Response<AnalyzeHeapResponse>, Status> {
136        self.profile_service.analyze_heap(request).await
137    }
138
139    async fn get_streaming_stats(
140        &self,
141        _request: Request<GetStreamingStatsRequest>,
142    ) -> Result<Response<GetStreamingStatsResponse>, Status> {
143        Err(Status::unimplemented(
144            "Get Back Pressure unimplemented in compactor",
145        ))
146    }
147
148    async fn tiered_cache_tracing(
149        &self,
150        _: Request<TieredCacheTracingRequest>,
151    ) -> Result<Response<TieredCacheTracingResponse>, Status> {
152        Err(Status::unimplemented(
153            "Tiered Cache Tracing unimplemented in compactor",
154        ))
155    }
156
157    async fn get_profile_stats(
158        &self,
159        _request: Request<GetProfileStatsRequest>,
160    ) -> Result<Response<GetProfileStatsResponse>, Status> {
161        Err(Status::unimplemented(
162            "Get Profile Stats unimplemented in compactor",
163        ))
164    }
165}