risingwave_meta/rpc/
await_tree.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::StackTraceResponseExt as _;
16use risingwave_pb::common::{WorkerNode, WorkerType};
17use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
18use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
19use risingwave_rpc_client::ComputeClientPool;
20
21use crate::MetaResult;
22use crate::manager::MetadataManager;
23
24/// Dump the await tree of all nodes in the cluster, including compute nodes, compactor nodes,
25/// and the current meta node.
26pub async fn dump_cluster_await_tree(
27    metadata_manager: &MetadataManager,
28    meta_node_registry: &await_tree::Registry,
29    actor_traces_format: ActorTracesFormat,
30) -> MetaResult<StackTraceResponse> {
31    let mut all = StackTraceResponse::default();
32
33    let compute_nodes = metadata_manager
34        .list_worker_node(Some(WorkerType::ComputeNode), None)
35        .await?;
36    let compute_traces = dump_worker_node_await_tree(&compute_nodes, actor_traces_format).await?;
37    all.merge_other(compute_traces);
38
39    let compactor_nodes = metadata_manager
40        .list_worker_node(Some(WorkerType::Compactor), None)
41        .await?;
42    let compactor_traces =
43        dump_worker_node_await_tree(&compactor_nodes, actor_traces_format).await?;
44    all.merge_other(compactor_traces);
45
46    let meta_traces = dump_meta_node_await_tree(meta_node_registry)?;
47    all.merge_other(meta_traces);
48
49    Ok(all)
50}
51
52/// Dump the await tree of the current meta node.
53pub fn dump_meta_node_await_tree(
54    meta_node_registry: &await_tree::Registry,
55) -> MetaResult<StackTraceResponse> {
56    let meta_traces = meta_node_registry
57        .collect_all()
58        .into_iter()
59        .map(|(k, v)| (k.to_string(), v.to_string()))
60        .collect();
61
62    Ok(StackTraceResponse {
63        meta_traces,
64        ..Default::default()
65    })
66}
67
68/// Dump the await tree of the given worker nodes (compute nodes or compactor nodes).
69pub async fn dump_worker_node_await_tree(
70    worker_nodes: impl IntoIterator<Item = &WorkerNode>,
71    actor_traces_format: ActorTracesFormat,
72) -> MetaResult<StackTraceResponse> {
73    let mut all = StackTraceResponse::default();
74
75    let req = StackTraceRequest {
76        actor_traces_format: actor_traces_format as i32,
77    };
78    // This is also applicable to compactor.
79    let compute_clients = ComputeClientPool::adhoc();
80
81    for worker_node in worker_nodes {
82        let client = compute_clients.get(worker_node).await?;
83        let result = client.stack_trace(req).await?;
84
85        all.merge_other(result);
86    }
87
88    Ok(all)
89}