risingwave_meta/rpc/
await_tree.rs1use risingwave_common::util::StackTraceResponseExt as _;
16use risingwave_pb::common::{WorkerNode, WorkerType};
17use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
18use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
19use risingwave_rpc_client::MonitorClientPool;
20use thiserror_ext::AsReport as _;
21
22use crate::MetaResult;
23use crate::manager::MetadataManager;
24
25pub async fn dump_cluster_await_tree(
28 metadata_manager: &MetadataManager,
29 meta_node_registry: &await_tree::Registry,
30 actor_traces_format: ActorTracesFormat,
31) -> MetaResult<StackTraceResponse> {
32 let mut all = StackTraceResponse::default();
33
34 let compute_nodes = metadata_manager
35 .list_worker_node(Some(WorkerType::ComputeNode), None)
36 .await?;
37 let compute_traces = dump_worker_node_await_tree(&compute_nodes, actor_traces_format).await?;
38 all.merge_other(compute_traces);
39
40 let compactor_nodes = metadata_manager
41 .list_worker_node(Some(WorkerType::Compactor), None)
42 .await?;
43 let compactor_traces =
44 dump_worker_node_await_tree(&compactor_nodes, actor_traces_format).await?;
45 all.merge_other(compactor_traces);
46
47 let meta_traces = dump_meta_node_await_tree(meta_node_registry)?;
48 all.merge_other(meta_traces);
49
50 Ok(all)
51}
52
53pub fn dump_meta_node_await_tree(
55 meta_node_registry: &await_tree::Registry,
56) -> MetaResult<StackTraceResponse> {
57 let meta_traces = meta_node_registry
58 .collect_all()
59 .into_iter()
60 .map(|(k, v)| (k.to_string(), v.to_string()))
61 .collect();
62
63 Ok(StackTraceResponse {
64 meta_traces,
65 ..Default::default()
66 })
67}
68
69pub async fn dump_worker_node_await_tree(
71 worker_nodes: impl IntoIterator<Item = &WorkerNode>,
72 actor_traces_format: ActorTracesFormat,
73) -> MetaResult<StackTraceResponse> {
74 let mut all = StackTraceResponse::default();
75
76 let req = StackTraceRequest {
77 actor_traces_format: actor_traces_format as i32,
78 };
79 let clients = MonitorClientPool::adhoc();
80
81 for worker_node in worker_nodes {
82 let worker_id = worker_node.id;
83
84 let client = match clients.get(worker_node).await {
85 Ok(client) => client,
86 Err(e) => {
87 all.node_errors
88 .insert(worker_id, format!("failed to connect: {}", e.as_report()));
89 continue;
90 }
91 };
92
93 match client.await_tree(req).await {
94 Ok(result) => {
95 all.merge_other(result);
96 }
97 Err(e) => {
98 all.node_errors.insert(
99 worker_id,
100 format!("failed to collect stack trace: {}", e.as_report()),
101 );
102 }
103 }
104 }
105
106 Ok(all)
107}