risingwave_common_service/
profile_service.rs1use std::ffi::CString;
16use std::fs;
17use std::path::Path;
18
19use itertools::Itertools;
20use risingwave_common::config::ServerConfig;
21use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
22use risingwave_pb::monitor_service::{
23 AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse,
24 ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
25};
26use risingwave_rpc_client::error::ToTonicStatus as _;
27use thiserror_ext::AsReport;
28use tokio::time::Duration;
29use tonic::{Code, Request, Response, Status};
30
31#[derive(Clone)]
34pub struct ProfileServiceImpl {
35 server_config: ServerConfig,
36}
37
38#[allow(clippy::unused_async)]
39impl ProfileServiceImpl {
40 pub fn new(server_config: ServerConfig) -> Self {
41 Self { server_config }
42 }
43
44 pub async fn profiling(
45 &self,
46 request: Request<ProfilingRequest>,
47 ) -> Result<Response<ProfilingResponse>, Status> {
48 if std::env::var("RW_PROFILE_PATH").is_ok() {
49 return Err(Status::internal(
50 "Profiling is already running by setting RW_PROFILE_PATH",
51 ));
52 }
53 let time = request.into_inner().get_sleep_s();
54 let guard = pprof::ProfilerGuardBuilder::default()
55 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
56 .build()
57 .unwrap();
58 tokio::time::sleep(Duration::from_secs(time)).await;
59 let mut buf = vec![];
60 match guard.report().build() {
61 Ok(report) => {
62 report.flamegraph(&mut buf).unwrap();
63 tracing::info!("succeed to generate flamegraph");
64 Ok(Response::new(ProfilingResponse { result: buf }))
65 }
66 Err(err) => {
67 tracing::warn!(error = %err.as_report(), "failed to generate flamegraph");
68 Err(err.to_status(Code::Internal, "monitor"))
69 }
70 }
71 }
72
73 pub async fn heap_profiling(
74 &self,
75 request: Request<HeapProfilingRequest>,
76 ) -> Result<Response<HeapProfilingResponse>, Status> {
77 use std::fs::create_dir_all;
78 use std::path::PathBuf;
79
80 use tikv_jemalloc_ctl;
81
82 if !cfg!(target_os = "linux") {
83 return Err(Status::unimplemented(
84 "heap profiling is only implemented on Linux",
85 ));
86 }
87
88 if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
89 return Err(Status::failed_precondition(
90 "Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
91 ));
92 }
93
94 let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
95 let file_name = format!("{}.{}", time_prefix, MANUALLY_DUMP_SUFFIX);
96 let arg_dir = request.into_inner().dir;
97 let dir = PathBuf::from(if arg_dir.is_empty() {
98 &self.server_config.heap_profiling.dir
99 } else {
100 &arg_dir
101 });
102 create_dir_all(&dir)?;
103
104 let file_path_buf = dir.join(file_name);
105 let file_path = file_path_buf
106 .to_str()
107 .ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;
108 let file_path_c =
109 CString::new(file_path).map_err(|_| Status::internal("0 byte in file path"))?;
110
111 if let Err(e) =
113 tikv_jemalloc_ctl::prof::dump::write(unsafe { &*(file_path_c.as_c_str() as *const _) })
114 {
115 tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
116 Err(Status::internal(e.to_string()))
117 } else {
118 tracing::info!("Manually Jemalloc dump heap file created: {}", file_path);
119 Ok(Response::new(HeapProfilingResponse {}))
120 }
121 }
122
123 pub async fn list_heap_profiling(
124 &self,
125 _request: Request<ListHeapProfilingRequest>,
126 ) -> Result<Response<ListHeapProfilingResponse>, Status> {
127 let dump_dir = self.server_config.heap_profiling.dir.clone();
128 let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
129 .map(|entry| {
130 let entry = entry?;
131 Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
132 })
133 .filter(|name| {
134 if let Ok(name) = name {
135 name.contains(AUTO_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
136 } else {
137 true
138 }
139 })
140 .try_collect()?;
141 let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
142 .map(|entry| {
143 let entry = entry?;
144 Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
145 })
146 .filter(|name| {
147 if let Ok(name) = name {
148 name.contains(MANUALLY_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
149 } else {
150 true
151 }
152 })
153 .try_collect()?;
154
155 Ok(Response::new(ListHeapProfilingResponse {
156 dir: dump_dir,
157 name_auto: auto_dump_files_name,
158 name_manually: manually_dump_files_name,
159 }))
160 }
161
162 pub async fn analyze_heap(
163 &self,
164 request: Request<AnalyzeHeapRequest>,
165 ) -> Result<Response<AnalyzeHeapResponse>, Status> {
166 let dumped_path_str = request.into_inner().get_path().clone();
167 let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
168 let collapsed_path = Path::new(&collapsed_path_str);
169
170 if !collapsed_path.exists() {
172 risingwave_common_heap_profiling::jeprof::run(
173 dumped_path_str,
174 collapsed_path_str.clone(),
175 )
176 .await
177 .map_err(|e| e.to_status(Code::Internal, "monitor"))?;
178 }
179
180 let file = fs::read(Path::new(&collapsed_path_str))?;
181 Ok(Response::new(AnalyzeHeapResponse { result: file }))
182 }
183}