1use std::num::{NonZeroU32, NonZeroUsize};
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18use governor::Quota;
19
20type RateLimiter = governor::RateLimiter<
21 governor::state::NotKeyed,
22 governor::state::InMemoryState,
23 governor::clock::MonotonicClock,
24>;
25
26pub struct LogSuppresser {
28 suppressed_count: AtomicUsize,
31
32 rate_limiter: RateLimiter,
34}
35
36#[derive(Debug)]
37pub struct LogSuppressed;
38
39impl LogSuppresser {
40 pub fn new(rate_limiter: RateLimiter) -> Self {
41 Self {
42 suppressed_count: AtomicUsize::new(0),
43 rate_limiter,
44 }
45 }
46
47 pub fn check(&self) -> core::result::Result<Option<NonZeroUsize>, LogSuppressed> {
52 match self.rate_limiter.check() {
53 Ok(()) => Ok(NonZeroUsize::new(
54 self.suppressed_count.swap(0, Ordering::Relaxed),
55 )),
56 Err(_) => {
57 self.suppressed_count.fetch_add(1, Ordering::Relaxed);
58 Err(LogSuppressed)
59 }
60 }
61 }
62}
63
64impl Default for LogSuppresser {
65 fn default() -> Self {
67 Self::new(RateLimiter::direct(Quota::per_second(
68 NonZeroU32::new(1).unwrap(),
69 )))
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use std::sync::LazyLock;
76 use std::time::{Duration, Instant};
77
78 use tracing_subscriber::util::SubscriberInitExt;
79
80 use super::*;
81
82 #[tokio::test]
83 async fn demo() {
84 let _logger = tracing_subscriber::fmt::Subscriber::builder()
85 .with_max_level(tracing::Level::ERROR)
86 .set_default();
87
88 let mut interval = tokio::time::interval(Duration::from_millis(10));
89
90 let mut allowed = 0;
91 let mut suppressed = 0;
92
93 let start = Instant::now();
94
95 for _ in 0..1000 {
96 interval.tick().await;
97 static RATE_LIMITER: LazyLock<LogSuppresser> = LazyLock::new(|| {
98 LogSuppresser::new(RateLimiter::direct(Quota::per_second(
99 NonZeroU32::new(5).unwrap(),
100 )))
101 });
102
103 if let Ok(suppressed_count) = RATE_LIMITER.check() {
104 suppressed += suppressed_count.map(|v| v.get()).unwrap_or_default();
105 allowed += 1;
106 tracing::error!(suppressed_count, "failed to foo bar");
107 }
108 }
109 let duration = Instant::now().duration_since(start);
110
111 tracing::error!(
112 allowed,
113 suppressed,
114 ?duration,
115 rate = allowed as f64 / duration.as_secs_f64()
116 );
117 }
118}