risingwave_ctl/cmd_impl/
profile.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::path::PathBuf;
17
18use chrono::prelude::Local;
19use clap::ValueEnum;
20use futures::future::try_join_all;
21use risingwave_pb::common::WorkerType;
22use risingwave_pb::monitor_service::ProfilingResponse;
23use risingwave_rpc_client::MonitorClientPool;
24use thiserror_ext::AsReport;
25use tokio::fs::{File, create_dir_all};
26use tokio::io::AsyncWriteExt;
27
28use crate::CtlContext;
29
30#[derive(Clone, Copy, Debug, ValueEnum)]
31#[clap(rename_all = "kebab-case")]
32pub enum ProfileWorkerType {
33    Frontend,
34    ComputeNode,
35    Compactor,
36    Meta,
37}
38
39pub async fn cpu_profile(
40    context: &CtlContext,
41    sleep_s: u64,
42    worker_types: Vec<ProfileWorkerType>,
43) -> anyhow::Result<()> {
44    let meta_client = context.meta_client().await?;
45
46    let workers = meta_client.list_worker_nodes(None).await?;
47    let target_types = selected_worker_types(&worker_types);
48    let target_nodes = workers
49        .into_iter()
50        .filter(|w| target_types.contains(&w.r#type()));
51
52    let clients = MonitorClientPool::adhoc();
53
54    let profile_root_path = std::env::var("PREFIX_PROFILING").unwrap_or_else(|_| {
55        tracing::info!("PREFIX_PROFILING is not set, using current directory");
56        "./".to_owned()
57    });
58    let profile_root_path = PathBuf::from(&profile_root_path);
59    let dir_name = Local::now().format("%Y-%m-%d-%H-%M-%S").to_string();
60    let dir_path = profile_root_path.join(dir_name);
61    create_dir_all(&dir_path).await?;
62
63    let mut profile_futs = vec![];
64
65    // FIXME: the node may not be accessible directly from risectl, we may let the meta
66    // service collect the reports from all nodes in the future.
67    for cn in target_nodes {
68        let client = clients.get(&cn).await?;
69
70        let dir_path_ref = &dir_path;
71
72        let fut = async move {
73            let response = client.profile(sleep_s).await;
74            let host_addr = cn.get_host().expect("Should have host address");
75            let node_name = format!(
76                "{}-{}-{}",
77                worker_type_label(cn.r#type()),
78                host_addr.get_host().replace('.', "-"),
79                host_addr.get_port()
80            );
81            let svg_file_name = format!("{}.svg", node_name);
82            match response {
83                Ok(ProfilingResponse { result }) => {
84                    let mut file = File::create(dir_path_ref.join(svg_file_name)).await?;
85                    file.write_all(&result).await?;
86                }
87                Err(err) => {
88                    tracing::error!(
89                        error = %err.as_report(),
90                        node_name,
91                        "Failed to get profiling result",
92                    );
93                }
94            }
95            Ok::<_, anyhow::Error>(())
96        };
97        profile_futs.push(fut);
98    }
99
100    try_join_all(profile_futs).await?;
101
102    println!("Profiling results are saved at {}", dir_path.display());
103
104    Ok(())
105}
106
107pub async fn heap_profile(
108    context: &CtlContext,
109    dir: Option<String>,
110    worker_types: Vec<ProfileWorkerType>,
111) -> anyhow::Result<()> {
112    let dir = dir.unwrap_or_default();
113    let meta_client = context.meta_client().await?;
114
115    let workers = meta_client.list_worker_nodes(None).await?;
116    let target_types = selected_worker_types(&worker_types);
117    let target_nodes = workers
118        .into_iter()
119        .filter(|w| target_types.contains(&w.r#type()));
120
121    let clients = MonitorClientPool::adhoc();
122
123    let mut profile_futs = vec![];
124
125    // FIXME: the node may not be accessible directly from risectl, we may let the meta
126    // service collect the reports from all nodes in the future.
127    for cn in target_nodes {
128        let client = clients.get(&cn).await?;
129        let dir = &dir;
130
131        let fut = async move {
132            let response = client.heap_profile(dir.clone()).await;
133            let host_addr = cn.get_host().expect("Should have host address");
134
135            let node_name = format!(
136                "{}-{}-{}",
137                worker_type_label(cn.r#type()),
138                host_addr.get_host().replace('.', "-"),
139                host_addr.get_port()
140            );
141
142            if let Err(err) = response {
143                tracing::error!(
144                    error = %err.as_report(),
145                    node_name,
146                    "Failed to dump profile",
147                );
148            }
149            Ok::<_, anyhow::Error>(())
150        };
151        profile_futs.push(fut);
152    }
153
154    try_join_all(profile_futs).await?;
155
156    println!(
157        "Profiling results are saved at {} on each target node",
158        PathBuf::from(dir).display()
159    );
160
161    Ok(())
162}
163
164fn selected_worker_types(values: &[ProfileWorkerType]) -> HashSet<WorkerType> {
165    // Use default set when no filter is provided.
166    if values.is_empty() {
167        return HashSet::from([
168            WorkerType::Frontend,
169            WorkerType::ComputeNode,
170            WorkerType::Compactor,
171            WorkerType::Meta,
172        ]);
173    }
174    values.iter().map(|value| to_worker_type(*value)).collect()
175}
176
177fn to_worker_type(value: ProfileWorkerType) -> WorkerType {
178    match value {
179        ProfileWorkerType::Frontend => WorkerType::Frontend,
180        ProfileWorkerType::ComputeNode => WorkerType::ComputeNode,
181        ProfileWorkerType::Compactor => WorkerType::Compactor,
182        ProfileWorkerType::Meta => WorkerType::Meta,
183    }
184}
185
186fn worker_type_label(worker_type: WorkerType) -> &'static str {
187    match worker_type {
188        WorkerType::Frontend => "frontend",
189        WorkerType::ComputeNode => "compute-node",
190        WorkerType::Compactor => "compactor",
191        WorkerType::Meta => "meta",
192        _ => "unknown",
193    }
194}