risingwave_ctl/cmd_impl/
await_tree.rs1use risingwave_common::util::StackTraceResponseExt;
16use risingwave_pb::common::WorkerType;
17use risingwave_pb::monitor_service::StackTraceRequest;
18use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
19use risingwave_rpc_client::ComputeClientPool;
20use rw_diagnose_tools::await_tree::AnalyzeSummary;
21
22use crate::CtlContext;
23
24pub async fn dump(context: &CtlContext, actor_traces_format: Option<String>) -> anyhow::Result<()> {
25 let actor_traces_format = match actor_traces_format.as_deref() {
26 Some("text") => ActorTracesFormat::Text,
27 Some("json") | None => ActorTracesFormat::Json,
28 _ => return Err(anyhow::anyhow!("Invalid actor traces format")),
29 };
30
31 let meta_client = context.meta_client().await?;
33 let all = meta_client
34 .get_cluster_stack_trace(actor_traces_format)
35 .await?;
36
37 if all.actor_traces.is_empty()
38 && all.rpc_traces.is_empty()
39 && all.compaction_task_traces.is_empty()
40 && all.inflight_barrier_traces.is_empty()
41 {
42 eprintln!("No actors are running, or `--async-stack-trace` not set?");
43 }
44 println!("{}", all.output());
45
46 Ok(())
47}
48
49pub async fn bottleneck_detect(context: &CtlContext, path: Option<String>) -> anyhow::Result<()> {
50 let summary = if let Some(path) = path {
51 rw_diagnose_tools::await_tree::bottleneck_detect_from_file(&path)?
52 } else {
53 bottleneck_detect_real_time(context).await?
54 };
55 println!("{}", summary);
56 Ok(())
57}
58
59async fn bottleneck_detect_real_time(context: &CtlContext) -> anyhow::Result<AnalyzeSummary> {
60 let meta_client = context.meta_client().await?;
61
62 let compute_nodes = meta_client
63 .list_worker_nodes(Some(WorkerType::ComputeNode))
64 .await?;
65 let clients = ComputeClientPool::adhoc();
66
67 let req = StackTraceRequest::default();
69
70 let mut summary = AnalyzeSummary::new();
71 for cn in compute_nodes {
72 let client = clients.get(&cn).await?;
73 let response = client.stack_trace(req).await?;
74 let partial_summary = AnalyzeSummary::from_traces(&response.actor_traces)?;
75 summary.merge_other(&partial_summary);
76 }
77 Ok(summary)
78}