risingwave_meta/manager/
idle.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::{Duration, Instant};
18
19use risingwave_common::util::tokio_util::sync::CancellationToken;
20use tokio::task::JoinHandle;
21
22/// `IdleManager` keeps track of latest activity and report whether the meta service has been
23/// idle for long time.
24pub struct IdleManager {
25    config_max_idle_ms: u64, // Idle manager will not work if set to 0
26    instant_base: Instant,   // An arbitrary base, used to convert new instants into u64 secs.
27    last_active_offset_ms: AtomicU64,
28}
29
30pub type IdleManagerRef = Arc<IdleManager>;
31
32impl IdleManager {
33    pub fn disabled() -> Self {
34        Self::new(0)
35    }
36
37    pub fn new(config_max_idle_ms: u64) -> Self {
38        IdleManager {
39            config_max_idle_ms,
40            instant_base: Instant::now(),
41            last_active_offset_ms: AtomicU64::new(0),
42        }
43    }
44
45    pub fn get_config_max_idle(&self) -> Duration {
46        Duration::from_millis(self.config_max_idle_ms)
47    }
48
49    fn offset_ms_now(&self) -> u64 {
50        let now = Instant::now();
51        if now <= self.instant_base {
52            return 0;
53        }
54        ((now - self.instant_base).as_secs_f64() * 1000.0) as u64
55    }
56
57    pub fn record_activity(&self) {
58        self.last_active_offset_ms
59            .store(self.offset_ms_now(), Ordering::Release);
60    }
61
62    pub fn is_exceeding_max_idle(&self) -> bool {
63        if self.config_max_idle_ms == 0 {
64            return false;
65        }
66        let new_offset_ms = self.offset_ms_now();
67        let last_offset_ms = self.last_active_offset_ms.load(Ordering::Acquire);
68        if new_offset_ms < last_offset_ms {
69            // Should never happen normally, but in some arch it may happen.
70            // In this case, let's do nothing..
71            return false;
72        }
73        (new_offset_ms - last_offset_ms) > self.config_max_idle_ms
74    }
75
76    /// Idle checker send signal when the meta does not receive requests for long time.
77    pub fn start_idle_checker(
78        idle_manager: IdleManagerRef,
79        check_interval: Duration,
80        shutdown: CancellationToken,
81    ) -> JoinHandle<()> {
82        let dur = idle_manager.get_config_max_idle();
83        if !dur.is_zero() {
84            tracing::warn!(
85                "--dangerous-max-idle-secs is set. The meta server will be automatically stopped after idle for {:?}.",
86                dur
87            )
88        }
89
90        tokio::spawn(async move {
91            let mut min_interval = tokio::time::interval(check_interval);
92            loop {
93                min_interval.tick().await;
94                if idle_manager.is_exceeding_max_idle() {
95                    break;
96                }
97            }
98            tracing::warn!(
99                "Idle checker found the server is already idle for {:?}",
100                idle_manager.get_config_max_idle()
101            );
102            tracing::warn!("Idle checker is shutting down the server");
103
104            shutdown.cancel();
105        })
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[tokio::test]
114    async fn test_idle_manager() {
115        let im = IdleManager::new(400);
116        assert!(!im.is_exceeding_max_idle());
117        im.record_activity();
118        assert!(!im.is_exceeding_max_idle());
119
120        tokio::time::sleep(std::time::Duration::from_millis(800)).await;
121        assert!(im.is_exceeding_max_idle());
122        im.record_activity();
123        assert!(!im.is_exceeding_max_idle());
124
125        tokio::time::sleep(std::time::Duration::from_millis(800)).await;
126        assert!(im.is_exceeding_max_idle());
127        im.record_activity();
128        assert!(!im.is_exceeding_max_idle());
129
130        let im = IdleManager::disabled();
131        assert!(!im.is_exceeding_max_idle());
132        im.record_activity();
133        assert!(!im.is_exceeding_max_idle());
134    }
135}