risingwave_ctl/cmd_impl/
await_tree.rs1use risingwave_common::util::StackTraceResponseExt;
16use risingwave_pb::common::WorkerType;
17use risingwave_pb::id::WorkerId;
18use risingwave_pb::monitor_service::StackTraceRequest;
19use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
20use risingwave_rpc_client::MonitorClientPool;
21use rw_diagnose_tools::await_tree::AnalyzeSummary;
22use thiserror_ext::AsReport as _;
23
24use crate::CtlContext;
25
26pub async fn dump(context: &CtlContext, actor_traces_format: Option<String>) -> anyhow::Result<()> {
27 let actor_traces_format = match actor_traces_format.as_deref() {
28 Some("text") => ActorTracesFormat::Text,
29 Some("json") | None => ActorTracesFormat::Json,
30 _ => return Err(anyhow::anyhow!("Invalid actor traces format")),
31 };
32
33 let meta_client = context.meta_client().await?;
35 let all = meta_client
36 .get_cluster_stack_trace(actor_traces_format)
37 .await?;
38
39 if all.actor_traces.is_empty()
40 && all.rpc_traces.is_empty()
41 && all.compaction_task_traces.is_empty()
42 && all.inflight_barrier_traces.is_empty()
43 {
44 eprintln!("No actors are running, or `--async-stack-trace` not set?");
45 }
46 println!("{}", all.output());
47
48 Ok(())
49}
50
51pub async fn bottleneck_detect(context: &CtlContext, path: Option<String>) -> anyhow::Result<()> {
52 let summary = if let Some(path) = path {
53 rw_diagnose_tools::await_tree::bottleneck_detect_from_file(&path)?
54 } else {
55 bottleneck_detect_real_time(context).await?
56 };
57 println!("{}", summary);
58 Ok(())
59}
60
61async fn bottleneck_detect_real_time(context: &CtlContext) -> anyhow::Result<AnalyzeSummary> {
62 let meta_client = context.meta_client().await?;
63
64 let compute_nodes = meta_client
65 .list_worker_nodes(Some(WorkerType::ComputeNode))
66 .await?;
67 let clients = MonitorClientPool::adhoc();
68
69 let req = StackTraceRequest::default();
71
72 let mut summary = AnalyzeSummary::new();
73 let mut errors: Vec<(WorkerId, String)> = Vec::new();
74 for cn in compute_nodes {
75 let worker_id = cn.id;
76
77 let client = match clients.get(&cn).await {
78 Ok(client) => client,
79 Err(e) => {
80 errors.push((worker_id, format!("failed to connect: {}", e.as_report())));
81 continue;
82 }
83 };
84
85 let response = match client.await_tree(req).await {
86 Ok(resp) => resp,
87 Err(e) => {
88 errors.push((
89 worker_id,
90 format!("failed to collect stack trace: {}", e.as_report()),
91 ));
92 continue;
93 }
94 };
95
96 let partial_summary = AnalyzeSummary::from_traces(&response.actor_traces)?;
97 summary.merge_other(&partial_summary);
98 }
99 if !errors.is_empty() {
100 eprintln!("Some compute nodes failed to dump await tree:");
101 for (worker_id, err) in errors {
102 eprintln!(" - worker {worker_id}: {err}");
103 }
104 }
105 Ok(summary)
106}