risingwave_compute/memory/
controller.rs1use std::sync::Arc;
16use std::sync::atomic::Ordering;
17
18use risingwave_common::sequence::{SEQUENCE_GLOBAL, Sequence};
19use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats;
20use risingwave_stream::executor::monitor::StreamingMetrics;
21
22use super::manager::MemoryManagerConfig;
23
24pub enum LruEvictionPolicy {
25 None,
26 Stable,
27 Graceful,
28 Aggressive,
29}
30
31impl From<LruEvictionPolicy> for u8 {
32 fn from(value: LruEvictionPolicy) -> Self {
33 match value {
34 LruEvictionPolicy::None => 0,
35 LruEvictionPolicy::Stable => 1,
36 LruEvictionPolicy::Graceful => 2,
37 LruEvictionPolicy::Aggressive => 3,
38 }
39 }
40}
41
42pub struct MemoryStats {
49 pub allocated: usize,
50 pub active: usize,
51 pub resident: usize,
52 pub metadata: usize,
53}
54
55pub struct LruWatermarkController {
73 metrics: Arc<StreamingMetrics>,
74
75 threshold_stable: usize,
76 threshold_graceful: usize,
77 threshold_aggressive: usize,
78
79 eviction_factor_stable: f64,
80 eviction_factor_graceful: f64,
81 eviction_factor_aggressive: f64,
82
83 watermark_sequence: Sequence,
84}
85
86impl LruWatermarkController {
87 pub fn new(config: &MemoryManagerConfig) -> Self {
88 let threshold_stable = (config.target_memory as f64 * config.threshold_stable) as usize;
89 let threshold_graceful = (config.target_memory as f64 * config.threshold_graceful) as usize;
90 let threshold_aggressive =
91 (config.target_memory as f64 * config.threshold_aggressive) as usize;
92
93 Self {
94 metrics: config.metrics.clone(),
95 threshold_stable,
96 threshold_graceful,
97 threshold_aggressive,
98 eviction_factor_stable: config.eviction_factor_stable,
99 eviction_factor_graceful: config.eviction_factor_graceful,
100 eviction_factor_aggressive: config.eviction_factor_aggressive,
101 watermark_sequence: 0,
102 }
103 }
104}
105
106impl std::fmt::Debug for LruWatermarkController {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("LruWatermarkController")
109 .field("threshold_stable", &self.threshold_stable)
110 .field("threshold_graceful", &self.threshold_graceful)
111 .field("threshold_aggressive", &self.threshold_aggressive)
112 .finish()
113 }
114}
115
116fn jemalloc_memory_stats() -> MemoryStats {
125 if let Err(e) = tikv_jemalloc_ctl::epoch::advance() {
126 tracing::warn!("Jemalloc epoch advance failed! {:?}", e);
127 }
128 let allocated = tikv_jemalloc_ctl::stats::allocated::read().unwrap();
129 let active = tikv_jemalloc_ctl::stats::active::read().unwrap();
130 let resident = tikv_jemalloc_ctl::stats::resident::read().unwrap();
131 let metadata = tikv_jemalloc_ctl::stats::metadata::read().unwrap();
132 MemoryStats {
133 allocated,
134 active,
135 resident,
136 metadata,
137 }
138}
139
140impl LruWatermarkController {
141 pub fn tick(&mut self) -> Sequence {
142 let MemoryStats {
144 allocated: jemalloc_allocated_bytes,
145 active: jemalloc_active_bytes,
146 resident: jemalloc_resident_bytes,
147 metadata: jemalloc_metadata_bytes,
148 } = jemalloc_memory_stats();
149 let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats();
150
151 let cur_used_memory_bytes = jemalloc_active_bytes + jvm_allocated_bytes;
152
153 let to_evict_bytes = cur_used_memory_bytes.saturating_sub(self.threshold_aggressive) as f64
166 * self.eviction_factor_aggressive
167 + cur_used_memory_bytes
168 .saturating_sub(self.threshold_graceful)
169 .min(self.threshold_aggressive - self.threshold_graceful) as f64
170 * self.eviction_factor_graceful
171 + cur_used_memory_bytes
172 .saturating_sub(self.threshold_stable)
173 .min(self.threshold_graceful - self.threshold_stable) as f64
174 * self.eviction_factor_stable;
175 let ratio = to_evict_bytes / cur_used_memory_bytes as f64;
176 let latest_sequence = SEQUENCE_GLOBAL.load(Ordering::Relaxed);
177 let sequence_diff =
178 ((latest_sequence - self.watermark_sequence) as f64 * ratio) as Sequence;
179 self.watermark_sequence = latest_sequence.min(self.watermark_sequence + sequence_diff);
180
181 let policy = if cur_used_memory_bytes > self.threshold_aggressive {
182 LruEvictionPolicy::Aggressive
183 } else if cur_used_memory_bytes > self.threshold_graceful {
184 LruEvictionPolicy::Graceful
185 } else if cur_used_memory_bytes > self.threshold_stable {
186 LruEvictionPolicy::Stable
187 } else {
188 LruEvictionPolicy::None
189 };
190
191 self.metrics.lru_latest_sequence.set(latest_sequence as _);
192 self.metrics
193 .lru_watermark_sequence
194 .set(self.watermark_sequence as _);
195 self.metrics
196 .lru_eviction_policy
197 .set(Into::<u8>::into(policy) as _);
198
199 self.metrics
200 .jemalloc_allocated_bytes
201 .set(jemalloc_allocated_bytes as _);
202 self.metrics
203 .jemalloc_active_bytes
204 .set(jemalloc_active_bytes as _);
205 self.metrics
206 .jemalloc_resident_bytes
207 .set(jemalloc_resident_bytes as _);
208 self.metrics
209 .jemalloc_metadata_bytes
210 .set(jemalloc_metadata_bytes as _);
211 self.metrics
212 .jvm_allocated_bytes
213 .set(jvm_allocated_bytes as _);
214 self.metrics.jvm_active_bytes.set(jvm_active_bytes as _);
215
216 self.watermark_sequence
217 }
218}