risingwave_meta/manager/
idle.rs1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::{Duration, Instant};
18
19use risingwave_common::util::tokio_util::sync::CancellationToken;
20use tokio::task::JoinHandle;
21
22pub struct IdleManager {
25 config_max_idle_ms: u64, instant_base: Instant, last_active_offset_ms: AtomicU64,
28}
29
30pub type IdleManagerRef = Arc<IdleManager>;
31
32impl IdleManager {
33 pub fn disabled() -> Self {
34 Self::new(0)
35 }
36
37 pub fn new(config_max_idle_ms: u64) -> Self {
38 IdleManager {
39 config_max_idle_ms,
40 instant_base: Instant::now(),
41 last_active_offset_ms: AtomicU64::new(0),
42 }
43 }
44
45 pub fn get_config_max_idle(&self) -> Duration {
46 Duration::from_millis(self.config_max_idle_ms)
47 }
48
49 fn offset_ms_now(&self) -> u64 {
50 let now = Instant::now();
51 if now <= self.instant_base {
52 return 0;
53 }
54 ((now - self.instant_base).as_secs_f64() * 1000.0) as u64
55 }
56
57 pub fn record_activity(&self) {
58 self.last_active_offset_ms
59 .store(self.offset_ms_now(), Ordering::Release);
60 }
61
62 pub fn is_exceeding_max_idle(&self) -> bool {
63 if self.config_max_idle_ms == 0 {
64 return false;
65 }
66 let new_offset_ms = self.offset_ms_now();
67 let last_offset_ms = self.last_active_offset_ms.load(Ordering::Acquire);
68 if new_offset_ms < last_offset_ms {
69 return false;
72 }
73 (new_offset_ms - last_offset_ms) > self.config_max_idle_ms
74 }
75
76 pub fn start_idle_checker(
78 idle_manager: IdleManagerRef,
79 check_interval: Duration,
80 shutdown: CancellationToken,
81 ) -> JoinHandle<()> {
82 let dur = idle_manager.get_config_max_idle();
83 if !dur.is_zero() {
84 tracing::warn!(
85 "--dangerous-max-idle-secs is set. The meta server will be automatically stopped after idle for {:?}.",
86 dur
87 )
88 }
89
90 tokio::spawn(async move {
91 let mut min_interval = tokio::time::interval(check_interval);
92 loop {
93 min_interval.tick().await;
94 if idle_manager.is_exceeding_max_idle() {
95 break;
96 }
97 }
98 tracing::warn!(
99 "Idle checker found the server is already idle for {:?}",
100 idle_manager.get_config_max_idle()
101 );
102 tracing::warn!("Idle checker is shutting down the server");
103
104 shutdown.cancel();
105 })
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[tokio::test]
114 async fn test_idle_manager() {
115 let im = IdleManager::new(400);
116 assert!(!im.is_exceeding_max_idle());
117 im.record_activity();
118 assert!(!im.is_exceeding_max_idle());
119
120 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
121 assert!(im.is_exceeding_max_idle());
122 im.record_activity();
123 assert!(!im.is_exceeding_max_idle());
124
125 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
126 assert!(im.is_exceeding_max_idle());
127 im.record_activity();
128 assert!(!im.is_exceeding_max_idle());
129
130 let im = IdleManager::disabled();
131 assert!(!im.is_exceeding_max_idle());
132 im.record_activity();
133 assert!(!im.is_exceeding_max_idle());
134 }
135}