risingwave_ctl/cmd_impl/
await_tree.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::StackTraceResponseExt;
16use risingwave_pb::common::WorkerType;
17use risingwave_pb::id::WorkerId;
18use risingwave_pb::monitor_service::StackTraceRequest;
19use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
20use risingwave_rpc_client::MonitorClientPool;
21use rw_diagnose_tools::await_tree::AnalyzeSummary;
22use thiserror_ext::AsReport as _;
23
24use crate::CtlContext;
25
26pub async fn dump(context: &CtlContext, actor_traces_format: Option<String>) -> anyhow::Result<()> {
27    let actor_traces_format = match actor_traces_format.as_deref() {
28        Some("text") => ActorTracesFormat::Text,
29        Some("json") | None => ActorTracesFormat::Json,
30        _ => return Err(anyhow::anyhow!("Invalid actor traces format")),
31    };
32
33    // Query the meta node for the await tree of all nodes in the cluster.
34    let meta_client = context.meta_client().await?;
35    let all = meta_client
36        .get_cluster_stack_trace(actor_traces_format)
37        .await?;
38
39    if all.actor_traces.is_empty()
40        && all.rpc_traces.is_empty()
41        && all.compaction_task_traces.is_empty()
42        && all.inflight_barrier_traces.is_empty()
43    {
44        eprintln!("No actors are running, or `--async-stack-trace` not set?");
45    }
46    println!("{}", all.output());
47
48    Ok(())
49}
50
51pub async fn bottleneck_detect(context: &CtlContext, path: Option<String>) -> anyhow::Result<()> {
52    let summary = if let Some(path) = path {
53        rw_diagnose_tools::await_tree::bottleneck_detect_from_file(&path)?
54    } else {
55        bottleneck_detect_real_time(context).await?
56    };
57    println!("{}", summary);
58    Ok(())
59}
60
61async fn bottleneck_detect_real_time(context: &CtlContext) -> anyhow::Result<AnalyzeSummary> {
62    let meta_client = context.meta_client().await?;
63
64    let compute_nodes = meta_client
65        .list_worker_nodes(Some(WorkerType::ComputeNode))
66        .await?;
67    let clients = MonitorClientPool::adhoc();
68
69    // request for json actor traces
70    let req = StackTraceRequest::default();
71
72    let mut summary = AnalyzeSummary::new();
73    let mut errors: Vec<(WorkerId, String)> = Vec::new();
74    for cn in compute_nodes {
75        let worker_id = cn.id;
76
77        let client = match clients.get(&cn).await {
78            Ok(client) => client,
79            Err(e) => {
80                errors.push((worker_id, format!("failed to connect: {}", e.as_report())));
81                continue;
82            }
83        };
84
85        let response = match client.await_tree(req).await {
86            Ok(resp) => resp,
87            Err(e) => {
88                errors.push((
89                    worker_id,
90                    format!("failed to collect stack trace: {}", e.as_report()),
91                ));
92                continue;
93            }
94        };
95
96        let partial_summary = AnalyzeSummary::from_traces(&response.actor_traces)?;
97        summary.merge_other(&partial_summary);
98    }
99    if !errors.is_empty() {
100        eprintln!("Some compute nodes failed to dump await tree:");
101        for (worker_id, err) in errors {
102            eprintln!("  - worker {worker_id}: {err}");
103        }
104    }
105    Ok(summary)
106}