risingwave_common/log.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::atomic::{AtomicUsize, Ordering};
use governor::Quota;
type RateLimiter = governor::RateLimiter<
governor::state::NotKeyed,
governor::state::InMemoryState,
governor::clock::MonotonicClock,
>;
/// `LogSuppresser` is a helper to suppress log spamming.
pub struct LogSuppresser {
/// The number of times the log has been suppressed. Will be returned and cleared when the
/// rate limiter allows next log to be printed.
suppressed_count: AtomicUsize,
/// Inner rate limiter.
rate_limiter: RateLimiter,
}
#[derive(Debug)]
pub struct LogSuppressed;
impl LogSuppresser {
pub fn new(rate_limiter: RateLimiter) -> Self {
Self {
suppressed_count: AtomicUsize::new(0),
rate_limiter,
}
}
/// Check if the log should be suppressed.
/// If the log should be suppressed, return `Err(LogSuppressed)`.
/// Otherwise, return `Ok(Some(..))` with count of suppressed messages since last check,
/// or `Ok(None)` if there's none.
pub fn check(&self) -> core::result::Result<Option<NonZeroUsize>, LogSuppressed> {
match self.rate_limiter.check() {
Ok(()) => Ok(NonZeroUsize::new(
self.suppressed_count.swap(0, Ordering::Relaxed),
)),
Err(_) => {
self.suppressed_count.fetch_add(1, Ordering::Relaxed);
Err(LogSuppressed)
}
}
}
}
impl Default for LogSuppresser {
/// Default rate limiter allows 1 log per second.
fn default() -> Self {
Self::new(RateLimiter::direct(Quota::per_second(
NonZeroU32::new(1).unwrap(),
)))
}
}
#[cfg(test)]
mod tests {
use std::sync::LazyLock;
use std::time::Duration;
use tracing_subscriber::util::SubscriberInitExt;
use super::*;
#[tokio::test]
async fn demo() {
let _logger = tracing_subscriber::fmt::Subscriber::builder()
.with_max_level(tracing::Level::ERROR)
.set_default();
let mut interval = tokio::time::interval(Duration::from_millis(100));
for _ in 0..100 {
interval.tick().await;
static RATE_LIMITER: LazyLock<LogSuppresser> = LazyLock::new(|| {
LogSuppresser::new(RateLimiter::direct(Quota::per_second(
NonZeroU32::new(5).unwrap(),
)))
});
if let Ok(suppressed_count) = RATE_LIMITER.check() {
tracing::error!(suppressed_count, "failed to foo bar");
}
}
}
}