risingwave_common/
log.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::{NonZeroU32, NonZeroUsize};
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18use governor::Quota;
19
20type RateLimiter = governor::RateLimiter<
21    governor::state::NotKeyed,
22    governor::state::InMemoryState,
23    governor::clock::MonotonicClock,
24>;
25
26/// `LogSuppresser` is a helper to suppress log spamming.
27pub struct LogSuppresser {
28    /// The number of times the log has been suppressed. Will be returned and cleared when the
29    /// rate limiter allows next log to be printed.
30    suppressed_count: AtomicUsize,
31
32    /// Inner rate limiter.
33    rate_limiter: RateLimiter,
34}
35
36#[derive(Debug)]
37pub struct LogSuppressed;
38
39impl LogSuppresser {
40    pub fn new(rate_limiter: RateLimiter) -> Self {
41        Self {
42            suppressed_count: AtomicUsize::new(0),
43            rate_limiter,
44        }
45    }
46
47    /// Check if the log should be suppressed.
48    /// If the log should be suppressed, return `Err(LogSuppressed)`.
49    /// Otherwise, return `Ok(Some(..))` with count of suppressed messages since last check,
50    /// or `Ok(None)` if there's none.
51    pub fn check(&self) -> core::result::Result<Option<NonZeroUsize>, LogSuppressed> {
52        match self.rate_limiter.check() {
53            Ok(()) => Ok(NonZeroUsize::new(
54                self.suppressed_count.swap(0, Ordering::Relaxed),
55            )),
56            Err(_) => {
57                self.suppressed_count.fetch_add(1, Ordering::Relaxed);
58                Err(LogSuppressed)
59            }
60        }
61    }
62}
63
64impl Default for LogSuppresser {
65    /// Default rate limiter allows 1 log per second.
66    fn default() -> Self {
67        Self::new(RateLimiter::direct(Quota::per_second(
68            NonZeroU32::new(1).unwrap(),
69        )))
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use std::sync::LazyLock;
76    use std::time::{Duration, Instant};
77
78    use tracing_subscriber::util::SubscriberInitExt;
79
80    use super::*;
81
82    #[tokio::test]
83    async fn demo() {
84        let _logger = tracing_subscriber::fmt::Subscriber::builder()
85            .with_max_level(tracing::Level::ERROR)
86            .set_default();
87
88        let mut interval = tokio::time::interval(Duration::from_millis(10));
89
90        let mut allowed = 0;
91        let mut suppressed = 0;
92
93        let start = Instant::now();
94
95        for _ in 0..1000 {
96            interval.tick().await;
97            static RATE_LIMITER: LazyLock<LogSuppresser> = LazyLock::new(|| {
98                LogSuppresser::new(RateLimiter::direct(Quota::per_second(
99                    NonZeroU32::new(5).unwrap(),
100                )))
101            });
102
103            if let Ok(suppressed_count) = RATE_LIMITER.check() {
104                suppressed += suppressed_count.map(|v| v.get()).unwrap_or_default();
105                allowed += 1;
106                tracing::error!(suppressed_count, "failed to foo bar");
107            }
108        }
109        let duration = Instant::now().duration_since(start);
110
111        tracing::error!(
112            allowed,
113            suppressed,
114            ?duration,
115            rate = allowed as f64 / duration.as_secs_f64()
116        );
117    }
118}