risingwave_meta/rpc/
await_tree.rs1use risingwave_common::util::StackTraceResponseExt as _;
16use risingwave_pb::common::{WorkerNode, WorkerType};
17use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
18use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
19use risingwave_rpc_client::ComputeClientPool;
20
21use crate::MetaResult;
22use crate::manager::MetadataManager;
23
24pub async fn dump_cluster_await_tree(
27 metadata_manager: &MetadataManager,
28 meta_node_registry: &await_tree::Registry,
29 actor_traces_format: ActorTracesFormat,
30) -> MetaResult<StackTraceResponse> {
31 let mut all = StackTraceResponse::default();
32
33 let compute_nodes = metadata_manager
34 .list_worker_node(Some(WorkerType::ComputeNode), None)
35 .await?;
36 let compute_traces = dump_worker_node_await_tree(&compute_nodes, actor_traces_format).await?;
37 all.merge_other(compute_traces);
38
39 let compactor_nodes = metadata_manager
40 .list_worker_node(Some(WorkerType::Compactor), None)
41 .await?;
42 let compactor_traces =
43 dump_worker_node_await_tree(&compactor_nodes, actor_traces_format).await?;
44 all.merge_other(compactor_traces);
45
46 let meta_traces = dump_meta_node_await_tree(meta_node_registry)?;
47 all.merge_other(meta_traces);
48
49 Ok(all)
50}
51
52pub fn dump_meta_node_await_tree(
54 meta_node_registry: &await_tree::Registry,
55) -> MetaResult<StackTraceResponse> {
56 let meta_traces = meta_node_registry
57 .collect_all()
58 .into_iter()
59 .map(|(k, v)| (k.to_string(), v.to_string()))
60 .collect();
61
62 Ok(StackTraceResponse {
63 meta_traces,
64 ..Default::default()
65 })
66}
67
68pub async fn dump_worker_node_await_tree(
70 worker_nodes: impl IntoIterator<Item = &WorkerNode>,
71 actor_traces_format: ActorTracesFormat,
72) -> MetaResult<StackTraceResponse> {
73 let mut all = StackTraceResponse::default();
74
75 let req = StackTraceRequest {
76 actor_traces_format: actor_traces_format as i32,
77 };
78 let compute_clients = ComputeClientPool::adhoc();
80
81 for worker_node in worker_nodes {
82 let client = compute_clients.get(worker_node).await?;
83 let result = client.stack_trace(req).await?;
84
85 all.merge_other(result);
86 }
87
88 Ok(all)
89}