risingwave_compute/memory/
manager.rs1use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use risingwave_common::sequence::AtomicSequence;
20use risingwave_stream::executor::monitor::StreamingMetrics;
21
22use super::controller::LruWatermarkController;
23
24pub struct MemoryManagerConfig {
25 pub target_memory: usize,
28
29 pub threshold_aggressive: f64,
30 pub threshold_graceful: f64,
31 pub threshold_stable: f64,
32
33 pub eviction_factor_stable: f64,
34 pub eviction_factor_graceful: f64,
35 pub eviction_factor_aggressive: f64,
36
37 pub metrics: Arc<StreamingMetrics>,
38}
39
40pub struct MemoryManager {
42 watermark_sequence: Arc<AtomicSequence>,
44
45 metrics: Arc<StreamingMetrics>,
46
47 controller: Mutex<LruWatermarkController>,
48}
49
50impl MemoryManager {
51 const MIN_INTERVAL: Duration = Duration::from_millis(10);
54
55 pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
56 let controller = Mutex::new(LruWatermarkController::new(&config));
57 tracing::info!("LRU watermark controller: {:?}", &controller);
58
59 Arc::new(Self {
60 watermark_sequence: Arc::new(0.into()),
61 metrics: config.metrics,
62 controller,
63 })
64 }
65
66 pub fn get_watermark_sequence(&self) -> Arc<AtomicU64> {
67 self.watermark_sequence.clone()
68 }
69
70 pub async fn run(self: Arc<Self>, interval: Duration) {
71 let interval = std::cmp::max(interval, Self::MIN_INTERVAL);
73 tracing::info!("start running MemoryManager with interval {interval:?}",);
74
75 let mut tick_interval = tokio::time::interval(interval);
77
78 loop {
79 tick_interval.tick().await;
80
81 let new_watermark_sequence = self.controller.lock().unwrap().tick();
82
83 self.watermark_sequence
84 .store(new_watermark_sequence, Ordering::Relaxed);
85
86 self.metrics.lru_runtime_loop_count.inc();
87 }
88 }
89}