risingwave_common_service/
profile_service.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ffi::CString;
16use std::fs;
17use std::path::Path;
18
19use itertools::Itertools;
20use risingwave_common::config::ServerConfig;
21use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
22use risingwave_pb::monitor_service::{
23    AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse,
24    ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
25};
26use risingwave_rpc_client::error::ToTonicStatus as _;
27use thiserror_ext::AsReport;
28use tokio::time::Duration;
29use tonic::{Code, Request, Response, Status};
30
31/// Implementation of the profiling related services in `MonitorService`.
32/// Can be reused to implement the same services in different types of worker nodes.
33#[derive(Clone)]
34pub struct ProfileServiceImpl {
35    server_config: ServerConfig,
36}
37
38#[allow(clippy::unused_async)]
39impl ProfileServiceImpl {
40    pub fn new(server_config: ServerConfig) -> Self {
41        Self { server_config }
42    }
43
44    pub async fn profiling(
45        &self,
46        request: Request<ProfilingRequest>,
47    ) -> Result<Response<ProfilingResponse>, Status> {
48        if std::env::var("RW_PROFILE_PATH").is_ok() {
49            return Err(Status::internal(
50                "Profiling is already running by setting RW_PROFILE_PATH",
51            ));
52        }
53        let time = request.into_inner().get_sleep_s();
54        let guard = pprof::ProfilerGuardBuilder::default()
55            .blocklist(&["libc", "libgcc", "pthread", "vdso"])
56            .build()
57            .unwrap();
58        tokio::time::sleep(Duration::from_secs(time)).await;
59        let mut buf = vec![];
60        match guard.report().build() {
61            Ok(report) => {
62                report.flamegraph(&mut buf).unwrap();
63                tracing::info!("succeed to generate flamegraph");
64                Ok(Response::new(ProfilingResponse { result: buf }))
65            }
66            Err(err) => {
67                tracing::warn!(error = %err.as_report(), "failed to generate flamegraph");
68                Err(err.to_status(Code::Internal, "monitor"))
69            }
70        }
71    }
72
73    pub async fn heap_profiling(
74        &self,
75        request: Request<HeapProfilingRequest>,
76    ) -> Result<Response<HeapProfilingResponse>, Status> {
77        use std::fs::create_dir_all;
78        use std::path::PathBuf;
79
80        use tikv_jemalloc_ctl;
81
82        if !cfg!(target_os = "linux") {
83            return Err(Status::unimplemented(
84                "heap profiling is only implemented on Linux",
85            ));
86        }
87
88        if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
89            return Err(Status::failed_precondition(
90                "Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
91            ));
92        }
93
94        let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
95        let file_name = format!("{}.{}", time_prefix, MANUALLY_DUMP_SUFFIX);
96        let arg_dir = request.into_inner().dir;
97        let dir = PathBuf::from(if arg_dir.is_empty() {
98            &self.server_config.heap_profiling.dir
99        } else {
100            &arg_dir
101        });
102        create_dir_all(&dir)?;
103
104        let file_path_buf = dir.join(file_name);
105        let file_path = file_path_buf
106            .to_str()
107            .ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;
108        let file_path_c =
109            CString::new(file_path).map_err(|_| Status::internal("0 byte in file path"))?;
110
111        // FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime
112        if let Err(e) =
113            tikv_jemalloc_ctl::prof::dump::write(unsafe { &*(file_path_c.as_c_str() as *const _) })
114        {
115            tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
116            Err(Status::internal(e.to_string()))
117        } else {
118            tracing::info!("Manually Jemalloc dump heap file created: {}", file_path);
119            Ok(Response::new(HeapProfilingResponse {}))
120        }
121    }
122
123    pub async fn list_heap_profiling(
124        &self,
125        _request: Request<ListHeapProfilingRequest>,
126    ) -> Result<Response<ListHeapProfilingResponse>, Status> {
127        let dump_dir = self.server_config.heap_profiling.dir.clone();
128        let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
129            .map(|entry| {
130                let entry = entry?;
131                Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
132            })
133            .filter(|name| {
134                if let Ok(name) = name {
135                    name.contains(AUTO_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
136                } else {
137                    true
138                }
139            })
140            .try_collect()?;
141        let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
142            .map(|entry| {
143                let entry = entry?;
144                Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
145            })
146            .filter(|name| {
147                if let Ok(name) = name {
148                    name.contains(MANUALLY_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
149                } else {
150                    true
151                }
152            })
153            .try_collect()?;
154
155        Ok(Response::new(ListHeapProfilingResponse {
156            dir: dump_dir,
157            name_auto: auto_dump_files_name,
158            name_manually: manually_dump_files_name,
159        }))
160    }
161
162    pub async fn analyze_heap(
163        &self,
164        request: Request<AnalyzeHeapRequest>,
165    ) -> Result<Response<AnalyzeHeapResponse>, Status> {
166        let dumped_path_str = request.into_inner().get_path().clone();
167        let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
168        let collapsed_path = Path::new(&collapsed_path_str);
169
170        // run jeprof if the target was not analyzed before
171        if !collapsed_path.exists() {
172            risingwave_common_heap_profiling::jeprof::run(
173                dumped_path_str,
174                collapsed_path_str.clone(),
175            )
176            .await
177            .map_err(|e| e.to_status(Code::Internal, "monitor"))?;
178        }
179
180        let file = fs::read(Path::new(&collapsed_path_str))?;
181        Ok(Response::new(AnalyzeHeapResponse { result: file }))
182    }
183}