risingwave_common/util/
prost.rs1use std::collections::btree_map::Entry;
16use std::fmt::{Display, Formatter};
17use std::ops::Deref;
18
19use risingwave_pb::batch_plan;
20use risingwave_pb::monitor_service::StackTraceResponse;
21use tracing::warn;
22
23pub trait TypeUrl {
24 fn type_url() -> &'static str;
25}
26
27impl TypeUrl for batch_plan::ExchangeNode {
28 fn type_url() -> &'static str {
29 "type.googleapis.com/plan.ExchangeNode"
30 }
31}
32
33pub struct StackTraceResponseOutput<'a>(&'a StackTraceResponse);
34
35impl Deref for StackTraceResponseOutput<'_> {
36 type Target = StackTraceResponse;
37
38 fn deref(&self) -> &Self::Target {
39 self.0
40 }
41}
42
43impl Display for StackTraceResponseOutput<'_> {
44 fn fmt(&self, s: &mut Formatter<'_>) -> std::fmt::Result {
45 if !self.node_errors.is_empty() {
46 writeln!(s, "--- Stack Trace Errors ---")?;
47 for (worker_id, err) in &self.node_errors {
48 writeln!(s, ">> Worker {worker_id}")?;
49 writeln!(s, "{err}\n")?;
50 }
51 }
52 if !self.actor_traces.is_empty() {
53 writeln!(s, "--- Actor Traces ---")?;
54 for (actor_id, trace) in &self.actor_traces {
55 writeln!(s, ">> Actor {}", *actor_id)?;
56 writeln!(s, "{trace}")?;
57 }
58 }
59 if !self.rpc_traces.is_empty() {
60 let _ = writeln!(s, "--- RPC Traces ---");
61 for (name, trace) in &self.rpc_traces {
62 writeln!(s, ">> RPC {name}")?;
63 writeln!(s, "{trace}")?;
64 }
65 }
66 if !self.compaction_task_traces.is_empty() {
67 writeln!(s, "--- Compactor Traces ---")?;
68 for (name, trace) in &self.compaction_task_traces {
69 writeln!(s, ">> Compaction Task {name}")?;
70 writeln!(s, "{trace}")?;
71 }
72 }
73
74 if !self.inflight_barrier_traces.is_empty() {
75 writeln!(s, "--- Inflight Barrier Traces ---")?;
76 for (name, trace) in &self.inflight_barrier_traces {
77 writeln!(s, ">> Barrier {name}")?;
78 writeln!(s, "{trace}")?;
79 }
80 }
81
82 writeln!(s, "\n\n--- Barrier Worker States ---")?;
83 for (worker_id, state) in &self.barrier_worker_state {
84 writeln!(s, ">> Worker {worker_id}")?;
85 writeln!(s, "{state}\n")?;
86 }
87
88 if !self.jvm_stack_traces.is_empty() {
89 writeln!(s, "\n\n--- JVM Stack Traces ---")?;
90 for (worker_id, state) in &self.jvm_stack_traces {
91 writeln!(s, ">> Worker {worker_id}")?;
92 writeln!(s, "{state}\n")?;
93 }
94 }
95
96 if !self.meta_traces.is_empty() {
97 writeln!(s, "\n\n--- Meta Traces ---")?;
98 for (key, value) in &self.meta_traces {
99 writeln!(s, ">> {key}")?;
100 writeln!(s, "{value}\n")?;
101 }
102 }
103
104 Ok(())
105 }
106}
107
108#[easy_ext::ext(StackTraceResponseExt)]
109impl StackTraceResponse {
110 pub fn merge_other(&mut self, b: StackTraceResponse) {
111 self.actor_traces.extend(b.actor_traces);
112 self.rpc_traces.extend(b.rpc_traces);
113 self.compaction_task_traces.extend(b.compaction_task_traces);
114 self.inflight_barrier_traces
115 .extend(b.inflight_barrier_traces);
116 for (worker_id, err) in b.node_errors {
117 if self.node_errors.contains_key(&worker_id) {
118 warn!(
119 worker_id = %worker_id,
120 error = %err,
121 "duplicate node error. skipped"
122 );
123 continue;
124 }
125 self.node_errors.insert(worker_id, err);
126 }
127 for (worker_id, worker_state) in b.barrier_worker_state {
128 match self.barrier_worker_state.entry(worker_id) {
129 Entry::Occupied(_entry) => {
130 warn!(
131 %worker_id,
132 worker_state, "duplicate barrier worker state. skipped"
133 );
134 }
135 Entry::Vacant(entry) => {
136 entry.insert(worker_state);
137 }
138 }
139 }
140 for (worker_id, worker_state) in b.jvm_stack_traces {
141 match self.jvm_stack_traces.entry(worker_id) {
142 Entry::Occupied(_entry) => {
143 warn!(
144 %worker_id,
145 worker_state, "duplicate jvm stack trace. skipped"
146 );
147 }
148 Entry::Vacant(entry) => {
149 entry.insert(worker_state);
150 }
151 }
152 }
153 self.meta_traces.extend(b.meta_traces);
154 }
155
156 pub fn output(&self) -> StackTraceResponseOutput<'_> {
157 StackTraceResponseOutput(self)
158 }
159}