risingwave_common/util/
prost.rs1use std::collections::btree_map::Entry;
16use std::fmt::{Display, Formatter};
17use std::ops::Deref;
18
19use risingwave_pb::batch_plan;
20use risingwave_pb::monitor_service::StackTraceResponse;
21use tracing::warn;
22
23pub trait TypeUrl {
24 fn type_url() -> &'static str;
25}
26
27impl TypeUrl for batch_plan::ExchangeNode {
28 fn type_url() -> &'static str {
29 "type.googleapis.com/plan.ExchangeNode"
30 }
31}
32
33pub struct StackTraceResponseOutput<'a>(&'a StackTraceResponse);
34
35impl Deref for StackTraceResponseOutput<'_> {
36 type Target = StackTraceResponse;
37
38 fn deref(&self) -> &Self::Target {
39 self.0
40 }
41}
42
43impl Display for StackTraceResponseOutput<'_> {
44 fn fmt(&self, s: &mut Formatter<'_>) -> std::fmt::Result {
45 if !self.actor_traces.is_empty() {
46 writeln!(s, "--- Actor Traces ---")?;
47 for (actor_id, trace) in &self.actor_traces {
48 writeln!(s, ">> Actor {}", *actor_id)?;
49 writeln!(s, "{trace}")?;
50 }
51 }
52 if !self.rpc_traces.is_empty() {
53 let _ = writeln!(s, "--- RPC Traces ---");
54 for (name, trace) in &self.rpc_traces {
55 writeln!(s, ">> RPC {name}")?;
56 writeln!(s, "{trace}")?;
57 }
58 }
59 if !self.compaction_task_traces.is_empty() {
60 writeln!(s, "--- Compactor Traces ---")?;
61 for (name, trace) in &self.compaction_task_traces {
62 writeln!(s, ">> Compaction Task {name}")?;
63 writeln!(s, "{trace}")?;
64 }
65 }
66
67 if !self.inflight_barrier_traces.is_empty() {
68 writeln!(s, "--- Inflight Barrier Traces ---")?;
69 for (name, trace) in &self.inflight_barrier_traces {
70 writeln!(s, ">> Barrier {name}")?;
71 writeln!(s, "{trace}")?;
72 }
73 }
74
75 writeln!(s, "\n\n--- Barrier Worker States ---")?;
76 for (worker_id, state) in &self.barrier_worker_state {
77 writeln!(s, ">> Worker {worker_id}")?;
78 writeln!(s, "{state}\n")?;
79 }
80
81 if !self.jvm_stack_traces.is_empty() {
82 writeln!(s, "\n\n--- JVM Stack Traces ---")?;
83 for (worker_id, state) in &self.jvm_stack_traces {
84 writeln!(s, ">> Worker {worker_id}")?;
85 writeln!(s, "{state}\n")?;
86 }
87 }
88
89 if !self.meta_traces.is_empty() {
90 writeln!(s, "\n\n--- Meta Traces ---")?;
91 for (key, value) in &self.meta_traces {
92 writeln!(s, ">> {key}")?;
93 writeln!(s, "{value}\n")?;
94 }
95 }
96
97 Ok(())
98 }
99}
100
101#[easy_ext::ext(StackTraceResponseExt)]
102impl StackTraceResponse {
103 pub fn merge_other(&mut self, b: StackTraceResponse) {
104 self.actor_traces.extend(b.actor_traces);
105 self.rpc_traces.extend(b.rpc_traces);
106 self.compaction_task_traces.extend(b.compaction_task_traces);
107 self.inflight_barrier_traces
108 .extend(b.inflight_barrier_traces);
109 for (worker_id, worker_state) in b.barrier_worker_state {
110 match self.barrier_worker_state.entry(worker_id) {
111 Entry::Occupied(_entry) => {
112 warn!(
113 worker_id,
114 worker_state, "duplicate barrier worker state. skipped"
115 );
116 }
117 Entry::Vacant(entry) => {
118 entry.insert(worker_state);
119 }
120 }
121 }
122 for (worker_id, worker_state) in b.jvm_stack_traces {
123 match self.jvm_stack_traces.entry(worker_id) {
124 Entry::Occupied(_entry) => {
125 warn!(
126 worker_id,
127 worker_state, "duplicate jvm stack trace. skipped"
128 );
129 }
130 Entry::Vacant(entry) => {
131 entry.insert(worker_state);
132 }
133 }
134 }
135 self.meta_traces.extend(b.meta_traces);
136 }
137
138 pub fn output(&self) -> StackTraceResponseOutput<'_> {
139 StackTraceResponseOutput(self)
140 }
141}