risingwave_stream/executor/monitor/
profiling_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::Ordering;
16
17use risingwave_common::monitor::in_mem::GuardedCount;
18
19use crate::executor::monitor::StreamingMetrics;
20
21pub enum ProfileMetricsImpl {
22    NoopProfileMetrics,
23    ProfileMetrics(ProfileMetrics),
24}
25
26impl ProfileMetricsImpl {
27    pub fn new(
28        executor_id: u64,
29        stats: &StreamingMetrics,
30        enable_profiling: bool,
31    ) -> ProfileMetricsImpl {
32        if enable_profiling {
33            ProfileMetricsImpl::ProfileMetrics(ProfileMetrics {
34                stream_node_output_row_count: stats
35                    .mem_stream_node_output_row_count
36                    .new_count(executor_id),
37                stream_node_output_blocking_duration_ns: stats
38                    .mem_stream_node_output_blocking_duration_ns
39                    .new_count(executor_id),
40            })
41        } else {
42            ProfileMetricsImpl::NoopProfileMetrics
43        }
44    }
45}
46
47pub struct ProfileMetrics {
48    pub stream_node_output_row_count: GuardedCount,
49    pub stream_node_output_blocking_duration_ns: GuardedCount,
50}
51
52pub trait ProfileMetricsExt {
53    fn inc_row_count(&self, count: u64);
54    fn inc_blocking_duration_ns(&self, duration: u64);
55}
56
57impl ProfileMetricsExt for ProfileMetrics {
58    fn inc_row_count(&self, count: u64) {
59        self.stream_node_output_row_count
60            .count
61            .fetch_add(count, Ordering::Relaxed);
62    }
63
64    fn inc_blocking_duration_ns(&self, duration_ms: u64) {
65        self.stream_node_output_blocking_duration_ns
66            .count
67            .fetch_add(duration_ms, Ordering::Relaxed);
68    }
69}
70
71impl ProfileMetricsExt for ProfileMetricsImpl {
72    fn inc_row_count(&self, count: u64) {
73        match self {
74            ProfileMetricsImpl::NoopProfileMetrics => {}
75            ProfileMetricsImpl::ProfileMetrics(metrics) => metrics.inc_row_count(count),
76        }
77    }
78
79    fn inc_blocking_duration_ns(&self, duration: u64) {
80        match self {
81            ProfileMetricsImpl::NoopProfileMetrics => {}
82            ProfileMetricsImpl::ProfileMetrics(metrics) => {
83                metrics.inc_blocking_duration_ns(duration)
84            }
85        }
86    }
87}