risingwave_common_heap_profiling/
profile_service.rs1use std::ffi::CString;
16use std::fs;
17use std::path::Path;
18
19use itertools::Itertools;
20use risingwave_common::config::ServerConfig;
21use risingwave_pb::monitor_service::{
22 AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse,
23 ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
24};
25use risingwave_rpc_client::error::ToTonicStatus as _;
26use thiserror_ext::AsReport;
27use tokio::time::Duration;
28use tonic::{Code, Request, Response, Status};
29
30use crate::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
31
32#[derive(Clone)]
35pub struct ProfileServiceImpl {
36 server_config: ServerConfig,
37}
38
39#[allow(clippy::unused_async)]
40impl ProfileServiceImpl {
41 pub fn new(server_config: ServerConfig) -> Self {
42 Self { server_config }
43 }
44
45 pub async fn profiling(
46 &self,
47 request: Request<ProfilingRequest>,
48 ) -> Result<Response<ProfilingResponse>, Status> {
49 if std::env::var("RW_PROFILE_PATH").is_ok() {
50 return Err(Status::internal(
51 "Profiling is already running by setting RW_PROFILE_PATH",
52 ));
53 }
54 let time = request.into_inner().get_sleep_s();
55 let guard = pprof::ProfilerGuardBuilder::default()
56 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
57 .build()
58 .unwrap();
59 tokio::time::sleep(Duration::from_secs(time)).await;
60 let mut buf = vec![];
61 match guard.report().build() {
62 Ok(report) => {
63 report.flamegraph(&mut buf).unwrap();
64 tracing::info!("succeed to generate flamegraph");
65 Ok(Response::new(ProfilingResponse { result: buf }))
66 }
67 Err(err) => {
68 tracing::warn!(error = %err.as_report(), "failed to generate flamegraph");
69 Err(err.to_status(Code::Internal, "monitor"))
70 }
71 }
72 }
73
74 pub async fn heap_profiling(
75 &self,
76 request: Request<HeapProfilingRequest>,
77 ) -> Result<Response<HeapProfilingResponse>, Status> {
78 use std::fs::create_dir_all;
79 use std::path::PathBuf;
80
81 use tikv_jemalloc_ctl;
82
83 if !cfg!(target_os = "linux") {
84 return Err(Status::unimplemented(
85 "heap profiling is only implemented on Linux",
86 ));
87 }
88
89 if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
90 return Err(Status::failed_precondition(
91 "Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
92 ));
93 }
94
95 let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
96 let file_name = format!("{}.{}", time_prefix, MANUALLY_DUMP_SUFFIX);
97 let arg_dir = request.into_inner().dir;
98 let dir = PathBuf::from(if arg_dir.is_empty() {
99 &self.server_config.heap_profiling.dir
100 } else {
101 &arg_dir
102 });
103 create_dir_all(&dir)?;
104
105 let file_path_buf = dir.join(file_name);
106 let file_path = file_path_buf
107 .to_str()
108 .ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;
109 let file_path_c =
110 CString::new(file_path).map_err(|_| Status::internal("0 byte in file path"))?;
111
112 if let Err(e) =
114 tikv_jemalloc_ctl::prof::dump::write(unsafe { &*(file_path_c.as_c_str() as *const _) })
115 {
116 tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
117 Err(Status::internal(e.to_string()))
118 } else {
119 tracing::info!("Manually Jemalloc dump heap file created: {}", file_path);
120 Ok(Response::new(HeapProfilingResponse {}))
121 }
122 }
123
124 pub async fn list_heap_profiling(
125 &self,
126 _request: Request<ListHeapProfilingRequest>,
127 ) -> Result<Response<ListHeapProfilingResponse>, Status> {
128 let dump_dir = self.server_config.heap_profiling.dir.clone();
129 let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
130 .map(|entry| {
131 let entry = entry?;
132 Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
133 })
134 .filter(|name| {
135 if let Ok(name) = name {
136 name.contains(AUTO_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
137 } else {
138 true
139 }
140 })
141 .try_collect()?;
142 let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
143 .map(|entry| {
144 let entry = entry?;
145 Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
146 })
147 .filter(|name| {
148 if let Ok(name) = name {
149 name.contains(MANUALLY_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
150 } else {
151 true
152 }
153 })
154 .try_collect()?;
155
156 Ok(Response::new(ListHeapProfilingResponse {
157 dir: dump_dir,
158 name_auto: auto_dump_files_name,
159 name_manually: manually_dump_files_name,
160 }))
161 }
162
163 pub async fn analyze_heap(
164 &self,
165 request: Request<AnalyzeHeapRequest>,
166 ) -> Result<Response<AnalyzeHeapResponse>, Status> {
167 let dumped_path_str = request.into_inner().get_path().clone();
168 let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
169 let collapsed_path = Path::new(&collapsed_path_str);
170
171 if !collapsed_path.exists() {
173 crate::jeprof::run(dumped_path_str, collapsed_path_str.clone())
174 .await
175 .map_err(|e| e.to_status(Code::Internal, "monitor"))?;
176 }
177
178 let file = fs::read(Path::new(&collapsed_path_str))?;
179 Ok(Response::new(AnalyzeHeapResponse { result: file }))
180 }
181}