risingwave_ctl/cmd_impl/
profile.rs1use std::collections::HashSet;
16use std::path::PathBuf;
17
18use chrono::prelude::Local;
19use clap::ValueEnum;
20use futures::future::try_join_all;
21use risingwave_pb::common::WorkerType;
22use risingwave_pb::monitor_service::ProfilingResponse;
23use risingwave_rpc_client::MonitorClientPool;
24use thiserror_ext::AsReport;
25use tokio::fs::{File, create_dir_all};
26use tokio::io::AsyncWriteExt;
27
28use crate::CtlContext;
29
30#[derive(Clone, Copy, Debug, ValueEnum)]
31#[clap(rename_all = "kebab-case")]
32pub enum ProfileWorkerType {
33 Frontend,
34 ComputeNode,
35 Compactor,
36 Meta,
37}
38
39pub async fn cpu_profile(
40 context: &CtlContext,
41 sleep_s: u64,
42 worker_types: Vec<ProfileWorkerType>,
43) -> anyhow::Result<()> {
44 let meta_client = context.meta_client().await?;
45
46 let workers = meta_client.list_worker_nodes(None).await?;
47 let target_types = selected_worker_types(&worker_types);
48 let target_nodes = workers
49 .into_iter()
50 .filter(|w| target_types.contains(&w.r#type()));
51
52 let clients = MonitorClientPool::adhoc();
53
54 let profile_root_path = std::env::var("PREFIX_PROFILING").unwrap_or_else(|_| {
55 tracing::info!("PREFIX_PROFILING is not set, using current directory");
56 "./".to_owned()
57 });
58 let profile_root_path = PathBuf::from(&profile_root_path);
59 let dir_name = Local::now().format("%Y-%m-%d-%H-%M-%S").to_string();
60 let dir_path = profile_root_path.join(dir_name);
61 create_dir_all(&dir_path).await?;
62
63 let mut profile_futs = vec![];
64
65 for cn in target_nodes {
68 let client = clients.get(&cn).await?;
69
70 let dir_path_ref = &dir_path;
71
72 let fut = async move {
73 let response = client.profile(sleep_s).await;
74 let host_addr = cn.get_host().expect("Should have host address");
75 let node_name = format!(
76 "{}-{}-{}",
77 worker_type_label(cn.r#type()),
78 host_addr.get_host().replace('.', "-"),
79 host_addr.get_port()
80 );
81 let svg_file_name = format!("{}.svg", node_name);
82 match response {
83 Ok(ProfilingResponse { result }) => {
84 let mut file = File::create(dir_path_ref.join(svg_file_name)).await?;
85 file.write_all(&result).await?;
86 }
87 Err(err) => {
88 tracing::error!(
89 error = %err.as_report(),
90 node_name,
91 "Failed to get profiling result",
92 );
93 }
94 }
95 Ok::<_, anyhow::Error>(())
96 };
97 profile_futs.push(fut);
98 }
99
100 try_join_all(profile_futs).await?;
101
102 println!("Profiling results are saved at {}", dir_path.display());
103
104 Ok(())
105}
106
107pub async fn heap_profile(
108 context: &CtlContext,
109 dir: Option<String>,
110 worker_types: Vec<ProfileWorkerType>,
111) -> anyhow::Result<()> {
112 let dir = dir.unwrap_or_default();
113 let meta_client = context.meta_client().await?;
114
115 let workers = meta_client.list_worker_nodes(None).await?;
116 let target_types = selected_worker_types(&worker_types);
117 let target_nodes = workers
118 .into_iter()
119 .filter(|w| target_types.contains(&w.r#type()));
120
121 let clients = MonitorClientPool::adhoc();
122
123 let mut profile_futs = vec![];
124
125 for cn in target_nodes {
128 let client = clients.get(&cn).await?;
129 let dir = &dir;
130
131 let fut = async move {
132 let response = client.heap_profile(dir.clone()).await;
133 let host_addr = cn.get_host().expect("Should have host address");
134
135 let node_name = format!(
136 "{}-{}-{}",
137 worker_type_label(cn.r#type()),
138 host_addr.get_host().replace('.', "-"),
139 host_addr.get_port()
140 );
141
142 if let Err(err) = response {
143 tracing::error!(
144 error = %err.as_report(),
145 node_name,
146 "Failed to dump profile",
147 );
148 }
149 Ok::<_, anyhow::Error>(())
150 };
151 profile_futs.push(fut);
152 }
153
154 try_join_all(profile_futs).await?;
155
156 println!(
157 "Profiling results are saved at {} on each target node",
158 PathBuf::from(dir).display()
159 );
160
161 Ok(())
162}
163
164fn selected_worker_types(values: &[ProfileWorkerType]) -> HashSet<WorkerType> {
165 if values.is_empty() {
167 return HashSet::from([
168 WorkerType::Frontend,
169 WorkerType::ComputeNode,
170 WorkerType::Compactor,
171 WorkerType::Meta,
172 ]);
173 }
174 values.iter().map(|value| to_worker_type(*value)).collect()
175}
176
177fn to_worker_type(value: ProfileWorkerType) -> WorkerType {
178 match value {
179 ProfileWorkerType::Frontend => WorkerType::Frontend,
180 ProfileWorkerType::ComputeNode => WorkerType::ComputeNode,
181 ProfileWorkerType::Compactor => WorkerType::Compactor,
182 ProfileWorkerType::Meta => WorkerType::Meta,
183 }
184}
185
186fn worker_type_label(worker_type: WorkerType) -> &'static str {
187 match worker_type {
188 WorkerType::Frontend => "frontend",
189 WorkerType::ComputeNode => "compute-node",
190 WorkerType::Compactor => "compactor",
191 WorkerType::Meta => "meta",
192 _ => "unknown",
193 }
194}