risingwave_stream/executor/monitor/
profiling_stats.rs
1use std::sync::atomic::Ordering;
16
17use risingwave_common::monitor::in_mem::GuardedCount;
18
19use crate::executor::monitor::StreamingMetrics;
20
21pub enum ProfileMetricsImpl {
22 NoopProfileMetrics,
23 ProfileMetrics(ProfileMetrics),
24}
25
26impl ProfileMetricsImpl {
27 pub fn new(
28 executor_id: u64,
29 stats: &StreamingMetrics,
30 enable_profiling: bool,
31 ) -> ProfileMetricsImpl {
32 if enable_profiling {
33 ProfileMetricsImpl::ProfileMetrics(ProfileMetrics {
34 stream_node_output_row_count: stats
35 .mem_stream_node_output_row_count
36 .new_count(executor_id),
37 stream_node_output_blocking_duration_ns: stats
38 .mem_stream_node_output_blocking_duration_ns
39 .new_count(executor_id),
40 })
41 } else {
42 ProfileMetricsImpl::NoopProfileMetrics
43 }
44 }
45}
46
47pub struct ProfileMetrics {
48 pub stream_node_output_row_count: GuardedCount,
49 pub stream_node_output_blocking_duration_ns: GuardedCount,
50}
51
52pub trait ProfileMetricsExt {
53 fn inc_row_count(&self, count: u64);
54 fn inc_blocking_duration_ns(&self, duration: u64);
55}
56
57impl ProfileMetricsExt for ProfileMetrics {
58 fn inc_row_count(&self, count: u64) {
59 self.stream_node_output_row_count
60 .count
61 .fetch_add(count, Ordering::Relaxed);
62 }
63
64 fn inc_blocking_duration_ns(&self, duration_ms: u64) {
65 self.stream_node_output_blocking_duration_ns
66 .count
67 .fetch_add(duration_ms, Ordering::Relaxed);
68 }
69}
70
71impl ProfileMetricsExt for ProfileMetricsImpl {
72 fn inc_row_count(&self, count: u64) {
73 match self {
74 ProfileMetricsImpl::NoopProfileMetrics => {}
75 ProfileMetricsImpl::ProfileMetrics(metrics) => metrics.inc_row_count(count),
76 }
77 }
78
79 fn inc_blocking_duration_ns(&self, duration: u64) {
80 match self {
81 ProfileMetricsImpl::NoopProfileMetrics => {}
82 ProfileMetricsImpl::ProfileMetrics(metrics) => {
83 metrics.inc_blocking_duration_ns(duration)
84 }
85 }
86 }
87}