risingwave_meta/rpc/
await_tree.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::StackTraceResponseExt as _;
16use risingwave_pb::common::{WorkerNode, WorkerType};
17use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
18use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
19use risingwave_rpc_client::MonitorClientPool;
20use thiserror_ext::AsReport as _;
21
22use crate::MetaResult;
23use crate::manager::MetadataManager;
24
25/// Dump the await tree of all nodes in the cluster, including compute nodes, compactor nodes,
26/// and the current meta node.
27pub async fn dump_cluster_await_tree(
28    metadata_manager: &MetadataManager,
29    meta_node_registry: &await_tree::Registry,
30    actor_traces_format: ActorTracesFormat,
31) -> MetaResult<StackTraceResponse> {
32    let mut all = StackTraceResponse::default();
33
34    let compute_nodes = metadata_manager
35        .list_worker_node(Some(WorkerType::ComputeNode), None)
36        .await?;
37    let compute_traces = dump_worker_node_await_tree(&compute_nodes, actor_traces_format).await?;
38    all.merge_other(compute_traces);
39
40    let compactor_nodes = metadata_manager
41        .list_worker_node(Some(WorkerType::Compactor), None)
42        .await?;
43    let compactor_traces =
44        dump_worker_node_await_tree(&compactor_nodes, actor_traces_format).await?;
45    all.merge_other(compactor_traces);
46
47    let meta_traces = dump_meta_node_await_tree(meta_node_registry)?;
48    all.merge_other(meta_traces);
49
50    Ok(all)
51}
52
53/// Dump the await tree of the current meta node.
54pub fn dump_meta_node_await_tree(
55    meta_node_registry: &await_tree::Registry,
56) -> MetaResult<StackTraceResponse> {
57    let meta_traces = meta_node_registry
58        .collect_all()
59        .into_iter()
60        .map(|(k, v)| (k.to_string(), v.to_string()))
61        .collect();
62
63    Ok(StackTraceResponse {
64        meta_traces,
65        ..Default::default()
66    })
67}
68
69/// Dump the await tree of the given worker nodes (compute nodes or compactor nodes).
70pub async fn dump_worker_node_await_tree(
71    worker_nodes: impl IntoIterator<Item = &WorkerNode>,
72    actor_traces_format: ActorTracesFormat,
73) -> MetaResult<StackTraceResponse> {
74    let mut all = StackTraceResponse::default();
75
76    let req = StackTraceRequest {
77        actor_traces_format: actor_traces_format as i32,
78    };
79    let clients = MonitorClientPool::adhoc();
80
81    for worker_node in worker_nodes {
82        let worker_id = worker_node.id;
83
84        let client = match clients.get(worker_node).await {
85            Ok(client) => client,
86            Err(e) => {
87                all.node_errors
88                    .insert(worker_id, format!("failed to connect: {}", e.as_report()));
89                continue;
90            }
91        };
92
93        match client.await_tree(req).await {
94            Ok(result) => {
95                all.merge_other(result);
96            }
97            Err(e) => {
98                all.node_errors.insert(
99                    worker_id,
100                    format!("failed to collect stack trace: {}", e.as_report()),
101                );
102            }
103        }
104    }
105
106    Ok(all)
107}