risingwave_stream/executor/monitor/
profiling_stats.rs1use std::sync::atomic::Ordering;
16
17use risingwave_common::monitor::in_mem::GuardedCount;
18use risingwave_pb::id::ExecutorId;
19
20use crate::executor::monitor::StreamingMetrics;
21
22pub enum ProfileMetricsImpl {
23 NoopProfileMetrics,
24 ProfileMetrics(ProfileMetrics),
25}
26
27impl ProfileMetricsImpl {
28 pub fn new(
29 executor_id: ExecutorId,
30 stats: &StreamingMetrics,
31 enable_profiling: bool,
32 ) -> ProfileMetricsImpl {
33 if enable_profiling {
34 ProfileMetricsImpl::ProfileMetrics(ProfileMetrics {
35 stream_node_output_row_count: stats
36 .mem_stream_node_output_row_count
37 .new_count(executor_id),
38 stream_node_output_blocking_duration_ns: stats
39 .mem_stream_node_output_blocking_duration_ns
40 .new_count(executor_id),
41 })
42 } else {
43 ProfileMetricsImpl::NoopProfileMetrics
44 }
45 }
46}
47
48pub struct ProfileMetrics {
49 pub stream_node_output_row_count: GuardedCount<ExecutorId>,
50 pub stream_node_output_blocking_duration_ns: GuardedCount<ExecutorId>,
51}
52
53pub trait ProfileMetricsExt {
54 fn inc_row_count(&self, count: u64);
55 fn inc_blocking_duration_ns(&self, duration: u64);
56}
57
58impl ProfileMetricsExt for ProfileMetrics {
59 fn inc_row_count(&self, count: u64) {
60 self.stream_node_output_row_count
61 .count
62 .fetch_add(count, Ordering::Relaxed);
63 }
64
65 fn inc_blocking_duration_ns(&self, duration_ms: u64) {
66 self.stream_node_output_blocking_duration_ns
67 .count
68 .fetch_add(duration_ms, Ordering::Relaxed);
69 }
70}
71
72impl ProfileMetricsExt for ProfileMetricsImpl {
73 fn inc_row_count(&self, count: u64) {
74 match self {
75 ProfileMetricsImpl::NoopProfileMetrics => {}
76 ProfileMetricsImpl::ProfileMetrics(metrics) => metrics.inc_row_count(count),
77 }
78 }
79
80 fn inc_blocking_duration_ns(&self, duration: u64) {
81 match self {
82 ProfileMetricsImpl::NoopProfileMetrics => {}
83 ProfileMetricsImpl::ProfileMetrics(metrics) => {
84 metrics.inc_blocking_duration_ns(duration)
85 }
86 }
87 }
88}