risingwave_common/util/
prost.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::fmt::{Display, Formatter};
17use std::ops::Deref;
18
19use risingwave_pb::batch_plan;
20use risingwave_pb::monitor_service::StackTraceResponse;
21use tracing::warn;
22
23pub trait TypeUrl {
24    fn type_url() -> &'static str;
25}
26
27impl TypeUrl for batch_plan::ExchangeNode {
28    fn type_url() -> &'static str {
29        "type.googleapis.com/plan.ExchangeNode"
30    }
31}
32
33pub struct StackTraceResponseOutput<'a>(&'a StackTraceResponse);
34
35impl Deref for StackTraceResponseOutput<'_> {
36    type Target = StackTraceResponse;
37
38    fn deref(&self) -> &Self::Target {
39        self.0
40    }
41}
42
43impl Display for StackTraceResponseOutput<'_> {
44    fn fmt(&self, s: &mut Formatter<'_>) -> std::fmt::Result {
45        if !self.actor_traces.is_empty() {
46            writeln!(s, "--- Actor Traces ---")?;
47            for (actor_id, trace) in &self.actor_traces {
48                writeln!(s, ">> Actor {}", *actor_id)?;
49                writeln!(s, "{trace}")?;
50            }
51        }
52        if !self.rpc_traces.is_empty() {
53            let _ = writeln!(s, "--- RPC Traces ---");
54            for (name, trace) in &self.rpc_traces {
55                writeln!(s, ">> RPC {name}")?;
56                writeln!(s, "{trace}")?;
57            }
58        }
59        if !self.compaction_task_traces.is_empty() {
60            writeln!(s, "--- Compactor Traces ---")?;
61            for (name, trace) in &self.compaction_task_traces {
62                writeln!(s, ">> Compaction Task {name}")?;
63                writeln!(s, "{trace}")?;
64            }
65        }
66
67        if !self.inflight_barrier_traces.is_empty() {
68            writeln!(s, "--- Inflight Barrier Traces ---")?;
69            for (name, trace) in &self.inflight_barrier_traces {
70                writeln!(s, ">> Barrier {name}")?;
71                writeln!(s, "{trace}")?;
72            }
73        }
74
75        writeln!(s, "\n\n--- Barrier Worker States ---")?;
76        for (worker_id, state) in &self.barrier_worker_state {
77            writeln!(s, ">> Worker {worker_id}")?;
78            writeln!(s, "{state}\n")?;
79        }
80
81        if !self.jvm_stack_traces.is_empty() {
82            writeln!(s, "\n\n--- JVM Stack Traces ---")?;
83            for (worker_id, state) in &self.jvm_stack_traces {
84                writeln!(s, ">> Worker {worker_id}")?;
85                writeln!(s, "{state}\n")?;
86            }
87        }
88
89        Ok(())
90    }
91}
92
93#[easy_ext::ext(StackTraceResponseExt)]
94impl StackTraceResponse {
95    pub fn merge_other(&mut self, b: StackTraceResponse) {
96        self.actor_traces.extend(b.actor_traces);
97        self.rpc_traces.extend(b.rpc_traces);
98        self.compaction_task_traces.extend(b.compaction_task_traces);
99        self.inflight_barrier_traces
100            .extend(b.inflight_barrier_traces);
101        for (worker_id, worker_state) in b.barrier_worker_state {
102            match self.barrier_worker_state.entry(worker_id) {
103                Entry::Occupied(_entry) => {
104                    warn!(
105                        worker_id,
106                        worker_state, "duplicate barrier worker state. skipped"
107                    );
108                }
109                Entry::Vacant(entry) => {
110                    entry.insert(worker_state);
111                }
112            }
113        }
114        for (worker_id, worker_state) in b.jvm_stack_traces {
115            match self.jvm_stack_traces.entry(worker_id) {
116                Entry::Occupied(_entry) => {
117                    warn!(
118                        worker_id,
119                        worker_state, "duplicate jvm stack trace. skipped"
120                    );
121                }
122                Entry::Vacant(entry) => {
123                    entry.insert(worker_state);
124                }
125            }
126        }
127    }
128
129    pub fn output(&self) -> StackTraceResponseOutput<'_> {
130        StackTraceResponseOutput(self)
131    }
132}