risingwave_ctl/cmd_impl/
profile.rs1use std::path::PathBuf;
16
17use chrono::prelude::Local;
18use futures::future::try_join_all;
19use risingwave_pb::common::WorkerType;
20use risingwave_pb::monitor_service::ProfilingResponse;
21use risingwave_rpc_client::ComputeClientPool;
22use thiserror_ext::AsReport;
23use tokio::fs::{File, create_dir_all};
24use tokio::io::AsyncWriteExt;
25
26use crate::CtlContext;
27
28pub async fn cpu_profile(context: &CtlContext, sleep_s: u64) -> anyhow::Result<()> {
29 let meta_client = context.meta_client().await?;
30
31 let workers = meta_client.get_cluster_info().await?.worker_nodes;
32 let compute_nodes = workers
33 .into_iter()
34 .filter(|w| w.r#type() == WorkerType::ComputeNode);
35
36 let clients = ComputeClientPool::adhoc();
37
38 let profile_root_path = std::env::var("PREFIX_PROFILING").unwrap_or_else(|_| {
39 tracing::info!("PREFIX_PROFILING is not set, using current directory");
40 "./".to_owned()
41 });
42 let profile_root_path = PathBuf::from(&profile_root_path);
43 let dir_name = Local::now().format("%Y-%m-%d-%H-%M-%S").to_string();
44 let dir_path = profile_root_path.join(dir_name);
45 create_dir_all(&dir_path).await?;
46
47 let mut profile_futs = vec![];
48
49 for cn in compute_nodes {
52 let client = clients.get(&cn).await?;
53
54 let dir_path_ref = &dir_path;
55
56 let fut = async move {
57 let response = client.profile(sleep_s).await;
58 let host_addr = cn.get_host().expect("Should have host address");
59 let node_name = format!(
60 "compute-node-{}-{}",
61 host_addr.get_host().replace('.', "-"),
62 host_addr.get_port()
63 );
64 let svg_file_name = format!("{}.svg", node_name);
65 match response {
66 Ok(ProfilingResponse { result }) => {
67 let mut file = File::create(dir_path_ref.join(svg_file_name)).await?;
68 file.write_all(&result).await?;
69 }
70 Err(err) => {
71 tracing::error!(
72 error = %err.as_report(),
73 %node_name,
74 "Failed to get profiling result",
75 );
76 }
77 }
78 Ok::<_, anyhow::Error>(())
79 };
80 profile_futs.push(fut);
81 }
82
83 try_join_all(profile_futs).await?;
84
85 println!("Profiling results are saved at {}", dir_path.display());
86
87 Ok(())
88}
89
90pub async fn heap_profile(context: &CtlContext, dir: Option<String>) -> anyhow::Result<()> {
91 let dir = dir.unwrap_or_default();
92 let meta_client = context.meta_client().await?;
93
94 let workers = meta_client.get_cluster_info().await?.worker_nodes;
95 let compute_nodes = workers
96 .into_iter()
97 .filter(|w| w.r#type() == WorkerType::ComputeNode);
98
99 let clients = ComputeClientPool::adhoc();
100
101 let mut profile_futs = vec![];
102
103 for cn in compute_nodes {
106 let client = clients.get(&cn).await?;
107 let dir = &dir;
108
109 let fut = async move {
110 let response = client.heap_profile(dir.clone()).await;
111 let host_addr = cn.get_host().expect("Should have host address");
112
113 let node_name = format!(
114 "compute-node-{}-{}",
115 host_addr.get_host().replace('.', "-"),
116 host_addr.get_port()
117 );
118
119 if let Err(err) = response {
120 tracing::error!(
121 error = %err.as_report(),
122 %node_name,
123 "Failed to dump profile",
124 );
125 }
126 Ok::<_, anyhow::Error>(())
127 };
128 profile_futs.push(fut);
129 }
130
131 try_join_all(profile_futs).await?;
132
133 println!(
134 "Profiling results are saved at {} on each compute nodes",
135 PathBuf::from(dir).display()
136 );
137
138 Ok(())
139}