risingwave_common/util/
prost.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::fmt::{Display, Formatter};
17use std::ops::Deref;
18
19use risingwave_pb::batch_plan;
20use risingwave_pb::monitor_service::StackTraceResponse;
21use tracing::warn;
22
23pub trait TypeUrl {
24    fn type_url() -> &'static str;
25}
26
27impl TypeUrl for batch_plan::ExchangeNode {
28    fn type_url() -> &'static str {
29        "type.googleapis.com/plan.ExchangeNode"
30    }
31}
32
33pub struct StackTraceResponseOutput<'a>(&'a StackTraceResponse);
34
35impl Deref for StackTraceResponseOutput<'_> {
36    type Target = StackTraceResponse;
37
38    fn deref(&self) -> &Self::Target {
39        self.0
40    }
41}
42
43impl Display for StackTraceResponseOutput<'_> {
44    fn fmt(&self, s: &mut Formatter<'_>) -> std::fmt::Result {
45        if !self.node_errors.is_empty() {
46            writeln!(s, "--- Stack Trace Errors ---")?;
47            for (worker_id, err) in &self.node_errors {
48                writeln!(s, ">> Worker {worker_id}")?;
49                writeln!(s, "{err}\n")?;
50            }
51        }
52        if !self.actor_traces.is_empty() {
53            writeln!(s, "--- Actor Traces ---")?;
54            for (actor_id, trace) in &self.actor_traces {
55                writeln!(s, ">> Actor {}", *actor_id)?;
56                writeln!(s, "{trace}")?;
57            }
58        }
59        if !self.rpc_traces.is_empty() {
60            let _ = writeln!(s, "--- RPC Traces ---");
61            for (name, trace) in &self.rpc_traces {
62                writeln!(s, ">> RPC {name}")?;
63                writeln!(s, "{trace}")?;
64            }
65        }
66        if !self.compaction_task_traces.is_empty() {
67            writeln!(s, "--- Compactor Traces ---")?;
68            for (name, trace) in &self.compaction_task_traces {
69                writeln!(s, ">> Compaction Task {name}")?;
70                writeln!(s, "{trace}")?;
71            }
72        }
73
74        if !self.inflight_barrier_traces.is_empty() {
75            writeln!(s, "--- Inflight Barrier Traces ---")?;
76            for (name, trace) in &self.inflight_barrier_traces {
77                writeln!(s, ">> Barrier {name}")?;
78                writeln!(s, "{trace}")?;
79            }
80        }
81
82        writeln!(s, "\n\n--- Barrier Worker States ---")?;
83        for (worker_id, state) in &self.barrier_worker_state {
84            writeln!(s, ">> Worker {worker_id}")?;
85            writeln!(s, "{state}\n")?;
86        }
87
88        if !self.jvm_stack_traces.is_empty() {
89            writeln!(s, "\n\n--- JVM Stack Traces ---")?;
90            for (worker_id, state) in &self.jvm_stack_traces {
91                writeln!(s, ">> Worker {worker_id}")?;
92                writeln!(s, "{state}\n")?;
93            }
94        }
95
96        if !self.meta_traces.is_empty() {
97            writeln!(s, "\n\n--- Meta Traces ---")?;
98            for (key, value) in &self.meta_traces {
99                writeln!(s, ">> {key}")?;
100                writeln!(s, "{value}\n")?;
101            }
102        }
103
104        Ok(())
105    }
106}
107
108#[easy_ext::ext(StackTraceResponseExt)]
109impl StackTraceResponse {
110    pub fn merge_other(&mut self, b: StackTraceResponse) {
111        self.actor_traces.extend(b.actor_traces);
112        self.rpc_traces.extend(b.rpc_traces);
113        self.compaction_task_traces.extend(b.compaction_task_traces);
114        self.inflight_barrier_traces
115            .extend(b.inflight_barrier_traces);
116        for (worker_id, err) in b.node_errors {
117            if self.node_errors.contains_key(&worker_id) {
118                warn!(
119                    worker_id = %worker_id,
120                    error = %err,
121                    "duplicate node error. skipped"
122                );
123                continue;
124            }
125            self.node_errors.insert(worker_id, err);
126        }
127        for (worker_id, worker_state) in b.barrier_worker_state {
128            match self.barrier_worker_state.entry(worker_id) {
129                Entry::Occupied(_entry) => {
130                    warn!(
131                        %worker_id,
132                        worker_state, "duplicate barrier worker state. skipped"
133                    );
134                }
135                Entry::Vacant(entry) => {
136                    entry.insert(worker_state);
137                }
138            }
139        }
140        for (worker_id, worker_state) in b.jvm_stack_traces {
141            match self.jvm_stack_traces.entry(worker_id) {
142                Entry::Occupied(_entry) => {
143                    warn!(
144                        %worker_id,
145                        worker_state, "duplicate jvm stack trace. skipped"
146                    );
147                }
148                Entry::Vacant(entry) => {
149                    entry.insert(worker_state);
150                }
151            }
152        }
153        self.meta_traces.extend(b.meta_traces);
154    }
155
156    pub fn output(&self) -> StackTraceResponseOutput<'_> {
157        StackTraceResponseOutput(self)
158    }
159}