risingwave_ctl/cmd_impl/
await_tree.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::StackTraceResponseExt;
16use risingwave_pb::common::WorkerType;
17use risingwave_pb::monitor_service::StackTraceRequest;
18use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
19use risingwave_rpc_client::ComputeClientPool;
20use rw_diagnose_tools::await_tree::AnalyzeSummary;
21
22use crate::CtlContext;
23
24pub async fn dump(context: &CtlContext, actor_traces_format: Option<String>) -> anyhow::Result<()> {
25    let actor_traces_format = match actor_traces_format.as_deref() {
26        Some("text") => ActorTracesFormat::Text,
27        Some("json") | None => ActorTracesFormat::Json,
28        _ => return Err(anyhow::anyhow!("Invalid actor traces format")),
29    };
30
31    // Query the meta node for the await tree of all nodes in the cluster.
32    let meta_client = context.meta_client().await?;
33    let all = meta_client
34        .get_cluster_stack_trace(actor_traces_format)
35        .await?;
36
37    if all.actor_traces.is_empty()
38        && all.rpc_traces.is_empty()
39        && all.compaction_task_traces.is_empty()
40        && all.inflight_barrier_traces.is_empty()
41    {
42        eprintln!("No actors are running, or `--async-stack-trace` not set?");
43    }
44    println!("{}", all.output());
45
46    Ok(())
47}
48
49pub async fn bottleneck_detect(context: &CtlContext, path: Option<String>) -> anyhow::Result<()> {
50    let summary = if let Some(path) = path {
51        rw_diagnose_tools::await_tree::bottleneck_detect_from_file(&path)?
52    } else {
53        bottleneck_detect_real_time(context).await?
54    };
55    println!("{}", summary);
56    Ok(())
57}
58
59async fn bottleneck_detect_real_time(context: &CtlContext) -> anyhow::Result<AnalyzeSummary> {
60    let meta_client = context.meta_client().await?;
61
62    let compute_nodes = meta_client
63        .list_worker_nodes(Some(WorkerType::ComputeNode))
64        .await?;
65    let clients = ComputeClientPool::adhoc();
66
67    // request for json actor traces
68    let req = StackTraceRequest::default();
69
70    let mut summary = AnalyzeSummary::new();
71    for cn in compute_nodes {
72        let client = clients.get(&cn).await?;
73        let response = client.stack_trace(req).await?;
74        let partial_summary = AnalyzeSummary::from_traces(&response.actor_traces)?;
75        summary.merge_other(&partial_summary);
76    }
77    Ok(summary)
78}