risingwave_ctl/cmd_impl/
profile.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16
17use chrono::prelude::Local;
18use futures::future::try_join_all;
19use risingwave_pb::common::WorkerType;
20use risingwave_pb::monitor_service::ProfilingResponse;
21use risingwave_rpc_client::ComputeClientPool;
22use thiserror_ext::AsReport;
23use tokio::fs::{File, create_dir_all};
24use tokio::io::AsyncWriteExt;
25
26use crate::CtlContext;
27
28pub async fn cpu_profile(context: &CtlContext, sleep_s: u64) -> anyhow::Result<()> {
29    let meta_client = context.meta_client().await?;
30
31    let workers = meta_client.get_cluster_info().await?.worker_nodes;
32    let compute_nodes = workers
33        .into_iter()
34        .filter(|w| w.r#type() == WorkerType::ComputeNode);
35
36    let clients = ComputeClientPool::adhoc();
37
38    let profile_root_path = std::env::var("PREFIX_PROFILING").unwrap_or_else(|_| {
39        tracing::info!("PREFIX_PROFILING is not set, using current directory");
40        "./".to_owned()
41    });
42    let profile_root_path = PathBuf::from(&profile_root_path);
43    let dir_name = Local::now().format("%Y-%m-%d-%H-%M-%S").to_string();
44    let dir_path = profile_root_path.join(dir_name);
45    create_dir_all(&dir_path).await?;
46
47    let mut profile_futs = vec![];
48
49    // FIXME: the compute node may not be accessible directly from risectl, we may let the meta
50    // service collect the reports from all compute nodes in the future.
51    for cn in compute_nodes {
52        let client = clients.get(&cn).await?;
53
54        let dir_path_ref = &dir_path;
55
56        let fut = async move {
57            let response = client.profile(sleep_s).await;
58            let host_addr = cn.get_host().expect("Should have host address");
59            let node_name = format!(
60                "compute-node-{}-{}",
61                host_addr.get_host().replace('.', "-"),
62                host_addr.get_port()
63            );
64            let svg_file_name = format!("{}.svg", node_name);
65            match response {
66                Ok(ProfilingResponse { result }) => {
67                    let mut file = File::create(dir_path_ref.join(svg_file_name)).await?;
68                    file.write_all(&result).await?;
69                }
70                Err(err) => {
71                    tracing::error!(
72                        error = %err.as_report(),
73                        %node_name,
74                        "Failed to get profiling result",
75                    );
76                }
77            }
78            Ok::<_, anyhow::Error>(())
79        };
80        profile_futs.push(fut);
81    }
82
83    try_join_all(profile_futs).await?;
84
85    println!("Profiling results are saved at {}", dir_path.display());
86
87    Ok(())
88}
89
90pub async fn heap_profile(context: &CtlContext, dir: Option<String>) -> anyhow::Result<()> {
91    let dir = dir.unwrap_or_default();
92    let meta_client = context.meta_client().await?;
93
94    let workers = meta_client.get_cluster_info().await?.worker_nodes;
95    let compute_nodes = workers
96        .into_iter()
97        .filter(|w| w.r#type() == WorkerType::ComputeNode);
98
99    let clients = ComputeClientPool::adhoc();
100
101    let mut profile_futs = vec![];
102
103    // FIXME: the compute node may not be accessible directly from risectl, we may let the meta
104    // service collect the reports from all compute nodes in the future.
105    for cn in compute_nodes {
106        let client = clients.get(&cn).await?;
107        let dir = &dir;
108
109        let fut = async move {
110            let response = client.heap_profile(dir.clone()).await;
111            let host_addr = cn.get_host().expect("Should have host address");
112
113            let node_name = format!(
114                "compute-node-{}-{}",
115                host_addr.get_host().replace('.', "-"),
116                host_addr.get_port()
117            );
118
119            if let Err(err) = response {
120                tracing::error!(
121                    error = %err.as_report(),
122                    %node_name,
123                    "Failed to dump profile",
124                );
125            }
126            Ok::<_, anyhow::Error>(())
127        };
128        profile_futs.push(fut);
129    }
130
131    try_join_all(profile_futs).await?;
132
133    println!(
134        "Profiling results are saved at {} on each compute nodes",
135        PathBuf::from(dir).display()
136    );
137
138    Ok(())
139}