risingwave_common/util/
prost.rs1use std::collections::btree_map::Entry;
16use std::fmt::{Display, Formatter};
17use std::ops::Deref;
18
19use risingwave_pb::batch_plan;
20use risingwave_pb::monitor_service::StackTraceResponse;
21use tracing::warn;
22
23pub trait TypeUrl {
24 fn type_url() -> &'static str;
25}
26
27impl TypeUrl for batch_plan::ExchangeNode {
28 fn type_url() -> &'static str {
29 "type.googleapis.com/plan.ExchangeNode"
30 }
31}
32
33pub struct StackTraceResponseOutput<'a>(&'a StackTraceResponse);
34
35impl Deref for StackTraceResponseOutput<'_> {
36 type Target = StackTraceResponse;
37
38 fn deref(&self) -> &Self::Target {
39 self.0
40 }
41}
42
43impl Display for StackTraceResponseOutput<'_> {
44 fn fmt(&self, s: &mut Formatter<'_>) -> std::fmt::Result {
45 if !self.actor_traces.is_empty() {
46 writeln!(s, "--- Actor Traces ---")?;
47 for (actor_id, trace) in &self.actor_traces {
48 writeln!(s, ">> Actor {}", *actor_id)?;
49 writeln!(s, "{trace}")?;
50 }
51 }
52 if !self.rpc_traces.is_empty() {
53 let _ = writeln!(s, "--- RPC Traces ---");
54 for (name, trace) in &self.rpc_traces {
55 writeln!(s, ">> RPC {name}")?;
56 writeln!(s, "{trace}")?;
57 }
58 }
59 if !self.compaction_task_traces.is_empty() {
60 writeln!(s, "--- Compactor Traces ---")?;
61 for (name, trace) in &self.compaction_task_traces {
62 writeln!(s, ">> Compaction Task {name}")?;
63 writeln!(s, "{trace}")?;
64 }
65 }
66
67 if !self.inflight_barrier_traces.is_empty() {
68 writeln!(s, "--- Inflight Barrier Traces ---")?;
69 for (name, trace) in &self.inflight_barrier_traces {
70 writeln!(s, ">> Barrier {name}")?;
71 writeln!(s, "{trace}")?;
72 }
73 }
74
75 writeln!(s, "\n\n--- Barrier Worker States ---")?;
76 for (worker_id, state) in &self.barrier_worker_state {
77 writeln!(s, ">> Worker {worker_id}")?;
78 writeln!(s, "{state}\n")?;
79 }
80
81 if !self.jvm_stack_traces.is_empty() {
82 writeln!(s, "\n\n--- JVM Stack Traces ---")?;
83 for (worker_id, state) in &self.jvm_stack_traces {
84 writeln!(s, ">> Worker {worker_id}")?;
85 writeln!(s, "{state}\n")?;
86 }
87 }
88
89 Ok(())
90 }
91}
92
93#[easy_ext::ext(StackTraceResponseExt)]
94impl StackTraceResponse {
95 pub fn merge_other(&mut self, b: StackTraceResponse) {
96 self.actor_traces.extend(b.actor_traces);
97 self.rpc_traces.extend(b.rpc_traces);
98 self.compaction_task_traces.extend(b.compaction_task_traces);
99 self.inflight_barrier_traces
100 .extend(b.inflight_barrier_traces);
101 for (worker_id, worker_state) in b.barrier_worker_state {
102 match self.barrier_worker_state.entry(worker_id) {
103 Entry::Occupied(_entry) => {
104 warn!(
105 worker_id,
106 worker_state, "duplicate barrier worker state. skipped"
107 );
108 }
109 Entry::Vacant(entry) => {
110 entry.insert(worker_state);
111 }
112 }
113 }
114 for (worker_id, worker_state) in b.jvm_stack_traces {
115 match self.jvm_stack_traces.entry(worker_id) {
116 Entry::Occupied(_entry) => {
117 warn!(
118 worker_id,
119 worker_state, "duplicate jvm stack trace. skipped"
120 );
121 }
122 Entry::Vacant(entry) => {
123 entry.insert(worker_state);
124 }
125 }
126 }
127 }
128
129 pub fn output(&self) -> StackTraceResponseOutput<'_> {
130 StackTraceResponseOutput(self)
131 }
132}