risingwave_common/util/
prost.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::fmt::{Display, Formatter};
17use std::ops::Deref;
18
19use risingwave_pb::batch_plan;
20use risingwave_pb::monitor_service::StackTraceResponse;
21use tracing::warn;
22
23pub trait TypeUrl {
24    fn type_url() -> &'static str;
25}
26
27impl TypeUrl for batch_plan::ExchangeNode {
28    fn type_url() -> &'static str {
29        "type.googleapis.com/plan.ExchangeNode"
30    }
31}
32
33pub struct StackTraceResponseOutput<'a>(&'a StackTraceResponse);
34
35impl Deref for StackTraceResponseOutput<'_> {
36    type Target = StackTraceResponse;
37
38    fn deref(&self) -> &Self::Target {
39        self.0
40    }
41}
42
43impl Display for StackTraceResponseOutput<'_> {
44    fn fmt(&self, s: &mut Formatter<'_>) -> std::fmt::Result {
45        if !self.actor_traces.is_empty() {
46            writeln!(s, "--- Actor Traces ---")?;
47            for (actor_id, trace) in &self.actor_traces {
48                writeln!(s, ">> Actor {}", *actor_id)?;
49                writeln!(s, "{trace}")?;
50            }
51        }
52        if !self.rpc_traces.is_empty() {
53            let _ = writeln!(s, "--- RPC Traces ---");
54            for (name, trace) in &self.rpc_traces {
55                writeln!(s, ">> RPC {name}")?;
56                writeln!(s, "{trace}")?;
57            }
58        }
59        if !self.compaction_task_traces.is_empty() {
60            writeln!(s, "--- Compactor Traces ---")?;
61            for (name, trace) in &self.compaction_task_traces {
62                writeln!(s, ">> Compaction Task {name}")?;
63                writeln!(s, "{trace}")?;
64            }
65        }
66
67        if !self.inflight_barrier_traces.is_empty() {
68            writeln!(s, "--- Inflight Barrier Traces ---")?;
69            for (name, trace) in &self.inflight_barrier_traces {
70                writeln!(s, ">> Barrier {name}")?;
71                writeln!(s, "{trace}")?;
72            }
73        }
74
75        writeln!(s, "\n\n--- Barrier Worker States ---")?;
76        for (worker_id, state) in &self.barrier_worker_state {
77            writeln!(s, ">> Worker {worker_id}")?;
78            writeln!(s, "{state}\n")?;
79        }
80
81        if !self.jvm_stack_traces.is_empty() {
82            writeln!(s, "\n\n--- JVM Stack Traces ---")?;
83            for (worker_id, state) in &self.jvm_stack_traces {
84                writeln!(s, ">> Worker {worker_id}")?;
85                writeln!(s, "{state}\n")?;
86            }
87        }
88
89        if !self.meta_traces.is_empty() {
90            writeln!(s, "\n\n--- Meta Traces ---")?;
91            for (key, value) in &self.meta_traces {
92                writeln!(s, ">> {key}")?;
93                writeln!(s, "{value}\n")?;
94            }
95        }
96
97        Ok(())
98    }
99}
100
101#[easy_ext::ext(StackTraceResponseExt)]
102impl StackTraceResponse {
103    pub fn merge_other(&mut self, b: StackTraceResponse) {
104        self.actor_traces.extend(b.actor_traces);
105        self.rpc_traces.extend(b.rpc_traces);
106        self.compaction_task_traces.extend(b.compaction_task_traces);
107        self.inflight_barrier_traces
108            .extend(b.inflight_barrier_traces);
109        for (worker_id, worker_state) in b.barrier_worker_state {
110            match self.barrier_worker_state.entry(worker_id) {
111                Entry::Occupied(_entry) => {
112                    warn!(
113                        worker_id,
114                        worker_state, "duplicate barrier worker state. skipped"
115                    );
116                }
117                Entry::Vacant(entry) => {
118                    entry.insert(worker_state);
119                }
120            }
121        }
122        for (worker_id, worker_state) in b.jvm_stack_traces {
123            match self.jvm_stack_traces.entry(worker_id) {
124                Entry::Occupied(_entry) => {
125                    warn!(
126                        worker_id,
127                        worker_state, "duplicate jvm stack trace. skipped"
128                    );
129                }
130                Entry::Vacant(entry) => {
131                    entry.insert(worker_state);
132                }
133            }
134        }
135        self.meta_traces.extend(b.meta_traces);
136    }
137
138    pub fn output(&self) -> StackTraceResponseOutput<'_> {
139        StackTraceResponseOutput(self)
140    }
141}