risingwave_stream/executor/monitor/
profiling_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::Ordering;
16
17use risingwave_common::monitor::in_mem::GuardedCount;
18use risingwave_pb::id::ExecutorId;
19
20use crate::executor::monitor::StreamingMetrics;
21
22pub enum ProfileMetricsImpl {
23    NoopProfileMetrics,
24    ProfileMetrics(ProfileMetrics),
25}
26
27impl ProfileMetricsImpl {
28    pub fn new(
29        executor_id: ExecutorId,
30        stats: &StreamingMetrics,
31        enable_profiling: bool,
32    ) -> ProfileMetricsImpl {
33        if enable_profiling {
34            ProfileMetricsImpl::ProfileMetrics(ProfileMetrics {
35                stream_node_output_row_count: stats
36                    .mem_stream_node_output_row_count
37                    .new_count(executor_id),
38                stream_node_output_blocking_duration_ns: stats
39                    .mem_stream_node_output_blocking_duration_ns
40                    .new_count(executor_id),
41            })
42        } else {
43            ProfileMetricsImpl::NoopProfileMetrics
44        }
45    }
46}
47
48pub struct ProfileMetrics {
49    pub stream_node_output_row_count: GuardedCount<ExecutorId>,
50    pub stream_node_output_blocking_duration_ns: GuardedCount<ExecutorId>,
51}
52
53pub trait ProfileMetricsExt {
54    fn inc_row_count(&self, count: u64);
55    fn inc_blocking_duration_ns(&self, duration: u64);
56}
57
58impl ProfileMetricsExt for ProfileMetrics {
59    fn inc_row_count(&self, count: u64) {
60        self.stream_node_output_row_count
61            .count
62            .fetch_add(count, Ordering::Relaxed);
63    }
64
65    fn inc_blocking_duration_ns(&self, duration_ms: u64) {
66        self.stream_node_output_blocking_duration_ns
67            .count
68            .fetch_add(duration_ms, Ordering::Relaxed);
69    }
70}
71
72impl ProfileMetricsExt for ProfileMetricsImpl {
73    fn inc_row_count(&self, count: u64) {
74        match self {
75            ProfileMetricsImpl::NoopProfileMetrics => {}
76            ProfileMetricsImpl::ProfileMetrics(metrics) => metrics.inc_row_count(count),
77        }
78    }
79
80    fn inc_blocking_duration_ns(&self, duration: u64) {
81        match self {
82            ProfileMetricsImpl::NoopProfileMetrics => {}
83            ProfileMetricsImpl::ProfileMetrics(metrics) => {
84                metrics.inc_blocking_duration_ns(duration)
85            }
86        }
87    }
88}