risingwave_common_heap_profiling/
profile_service.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ffi::CString;
16use std::fs;
17use std::path::Path;
18
19use itertools::Itertools;
20use risingwave_common::config::ServerConfig;
21use risingwave_pb::monitor_service::{
22    AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse,
23    ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
24};
25use risingwave_rpc_client::error::ToTonicStatus as _;
26use thiserror_ext::AsReport;
27use tokio::time::Duration;
28use tonic::{Code, Request, Response, Status};
29
30use crate::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
31
32/// Implementation of the profiling related services in `MonitorService`.
33/// Can be reused to implement the same services in different types of worker nodes.
34#[derive(Clone)]
35pub struct ProfileServiceImpl {
36    server_config: ServerConfig,
37}
38
39#[allow(clippy::unused_async)]
40impl ProfileServiceImpl {
41    pub fn new(server_config: ServerConfig) -> Self {
42        Self { server_config }
43    }
44
45    pub async fn profiling(
46        &self,
47        request: Request<ProfilingRequest>,
48    ) -> Result<Response<ProfilingResponse>, Status> {
49        if std::env::var("RW_PROFILE_PATH").is_ok() {
50            return Err(Status::internal(
51                "Profiling is already running by setting RW_PROFILE_PATH",
52            ));
53        }
54        let time = request.into_inner().get_sleep_s();
55        let guard = pprof::ProfilerGuardBuilder::default()
56            .blocklist(&["libc", "libgcc", "pthread", "vdso"])
57            .build()
58            .unwrap();
59        tokio::time::sleep(Duration::from_secs(time)).await;
60        let mut buf = vec![];
61        match guard.report().build() {
62            Ok(report) => {
63                report.flamegraph(&mut buf).unwrap();
64                tracing::info!("succeed to generate flamegraph");
65                Ok(Response::new(ProfilingResponse { result: buf }))
66            }
67            Err(err) => {
68                tracing::warn!(error = %err.as_report(), "failed to generate flamegraph");
69                Err(err.to_status(Code::Internal, "monitor"))
70            }
71        }
72    }
73
74    pub async fn heap_profiling(
75        &self,
76        request: Request<HeapProfilingRequest>,
77    ) -> Result<Response<HeapProfilingResponse>, Status> {
78        use std::fs::create_dir_all;
79        use std::path::PathBuf;
80
81        use tikv_jemalloc_ctl;
82
83        if !cfg!(target_os = "linux") {
84            return Err(Status::unimplemented(
85                "heap profiling is only implemented on Linux",
86            ));
87        }
88
89        if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
90            return Err(Status::failed_precondition(
91                "Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
92            ));
93        }
94
95        let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
96        let file_name = format!("{}.{}", time_prefix, MANUALLY_DUMP_SUFFIX);
97        let arg_dir = request.into_inner().dir;
98        let dir = PathBuf::from(if arg_dir.is_empty() {
99            &self.server_config.heap_profiling.dir
100        } else {
101            &arg_dir
102        });
103        create_dir_all(&dir)?;
104
105        let file_path_buf = dir.join(file_name);
106        let file_path = file_path_buf
107            .to_str()
108            .ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;
109        let file_path_c =
110            CString::new(file_path).map_err(|_| Status::internal("0 byte in file path"))?;
111
112        // FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime
113        if let Err(e) =
114            tikv_jemalloc_ctl::prof::dump::write(unsafe { &*(file_path_c.as_c_str() as *const _) })
115        {
116            tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
117            Err(Status::internal(e.to_string()))
118        } else {
119            tracing::info!("Manually Jemalloc dump heap file created: {}", file_path);
120            Ok(Response::new(HeapProfilingResponse {}))
121        }
122    }
123
124    pub async fn list_heap_profiling(
125        &self,
126        _request: Request<ListHeapProfilingRequest>,
127    ) -> Result<Response<ListHeapProfilingResponse>, Status> {
128        let dump_dir = self.server_config.heap_profiling.dir.clone();
129        let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
130            .map(|entry| {
131                let entry = entry?;
132                Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
133            })
134            .filter(|name| {
135                if let Ok(name) = name {
136                    name.contains(AUTO_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
137                } else {
138                    true
139                }
140            })
141            .try_collect()?;
142        let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
143            .map(|entry| {
144                let entry = entry?;
145                Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
146            })
147            .filter(|name| {
148                if let Ok(name) = name {
149                    name.contains(MANUALLY_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
150                } else {
151                    true
152                }
153            })
154            .try_collect()?;
155
156        Ok(Response::new(ListHeapProfilingResponse {
157            dir: dump_dir,
158            name_auto: auto_dump_files_name,
159            name_manually: manually_dump_files_name,
160        }))
161    }
162
163    pub async fn analyze_heap(
164        &self,
165        request: Request<AnalyzeHeapRequest>,
166    ) -> Result<Response<AnalyzeHeapResponse>, Status> {
167        let dumped_path_str = request.into_inner().get_path().clone();
168        let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
169        let collapsed_path = Path::new(&collapsed_path_str);
170
171        // run jeprof if the target was not analyzed before
172        if !collapsed_path.exists() {
173            crate::jeprof::run(dumped_path_str, collapsed_path_str.clone())
174                .await
175                .map_err(|e| e.to_status(Code::Internal, "monitor"))?;
176        }
177
178        let file = fs::read(Path::new(&collapsed_path_str))?;
179        Ok(Response::new(AnalyzeHeapResponse { result: file }))
180    }
181}