risingwave_compute/memory/
controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::Ordering;
17
18use risingwave_common::sequence::{SEQUENCE_GLOBAL, Sequence};
19use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats;
20use risingwave_stream::executor::monitor::StreamingMetrics;
21
22use super::manager::MemoryManagerConfig;
23
24pub enum LruEvictionPolicy {
25    None,
26    Stable,
27    Graceful,
28    Aggressive,
29}
30
31impl From<LruEvictionPolicy> for u8 {
32    fn from(value: LruEvictionPolicy) -> Self {
33        match value {
34            LruEvictionPolicy::None => 0,
35            LruEvictionPolicy::Stable => 1,
36            LruEvictionPolicy::Graceful => 2,
37            LruEvictionPolicy::Aggressive => 3,
38        }
39    }
40}
41
42/// - `allocated`: Total number of bytes allocated by the application.
43/// - `active`: Total number of bytes in active pages allocated by the application. This is a multiple of the page size, and greater than or equal to `stats.allocated`. This does not include `stats.arenas.<i>.pdirty`, `stats.arenas.<i>.pmuzzy`, nor pages entirely devoted to allocator metadata.
44/// - `resident`: Total number of bytes in physically resident data pages mapped by the allocator.
45/// - `metadata`: Total number of bytes dedicated to jemalloc metadata.
46///
47/// Reference: <https://jemalloc.net/jemalloc.3.html>
48pub struct MemoryStats {
49    pub allocated: usize,
50    pub active: usize,
51    pub resident: usize,
52    pub metadata: usize,
53}
54
55/// `LruWatermarkController` controls LRU Watermark (epoch) according to actual memory usage statistics
56/// collected from Jemalloc and JVM.
57///
58/// Basically, it works as a negative feedback loop: collect memory statistics, and then set the LRU watarmarking
59/// according to maintain the memory usage in a proper level.
60///
61/// ```text
62///     ┌───────────────────┐        ┌───────────────────┐
63///     │       INPUT       │        │       OUTPUT      │
64/// ┌───►     (observe)     ├───────►│     (control)     ├───┐
65/// │   │ Memory Statistics │        │ New LRU Watermark │   │
66/// │   └───────────────────┘        └───────────────────┘   │
67/// │                                                        │
68/// └────────────────────────────────────────────────────────┘
69/// ```
70///
71/// Check the function [`Self::tick()`] to see the control policy.
72pub struct LruWatermarkController {
73    metrics: Arc<StreamingMetrics>,
74
75    threshold_stable: usize,
76    threshold_graceful: usize,
77    threshold_aggressive: usize,
78
79    eviction_factor_stable: f64,
80    eviction_factor_graceful: f64,
81    eviction_factor_aggressive: f64,
82
83    watermark_sequence: Sequence,
84}
85
86impl LruWatermarkController {
87    pub fn new(config: &MemoryManagerConfig) -> Self {
88        let threshold_stable = (config.target_memory as f64 * config.threshold_stable) as usize;
89        let threshold_graceful = (config.target_memory as f64 * config.threshold_graceful) as usize;
90        let threshold_aggressive =
91            (config.target_memory as f64 * config.threshold_aggressive) as usize;
92
93        Self {
94            metrics: config.metrics.clone(),
95            threshold_stable,
96            threshold_graceful,
97            threshold_aggressive,
98            eviction_factor_stable: config.eviction_factor_stable,
99            eviction_factor_graceful: config.eviction_factor_graceful,
100            eviction_factor_aggressive: config.eviction_factor_aggressive,
101            watermark_sequence: 0,
102        }
103    }
104}
105
106impl std::fmt::Debug for LruWatermarkController {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("LruWatermarkController")
109            .field("threshold_stable", &self.threshold_stable)
110            .field("threshold_graceful", &self.threshold_graceful)
111            .field("threshold_aggressive", &self.threshold_aggressive)
112            .finish()
113    }
114}
115
116/// Get memory statistics from Jemalloc
117///
118/// - `stats.allocated`: Total number of bytes allocated by the application.
119/// - `stats.active`: Total number of bytes in active pages allocated by the application. This is a multiple of the page size, and greater than or equal to `stats.allocated`. This does not include `stats.arenas.<i>.pdirty`, `stats.arenas.<i>.pmuzzy`, nor pages entirely devoted to allocator metadata.
120/// - `stats.resident`: Total number of bytes in physically resident data pages mapped by the allocator.
121/// - `stats.metadata`: Total number of bytes dedicated to jemalloc metadata.
122///
123/// Reference: <https://jemalloc.net/jemalloc.3.html>
124fn jemalloc_memory_stats() -> MemoryStats {
125    if let Err(e) = tikv_jemalloc_ctl::epoch::advance() {
126        tracing::warn!("Jemalloc epoch advance failed! {:?}", e);
127    }
128    let allocated = tikv_jemalloc_ctl::stats::allocated::read().unwrap();
129    let active = tikv_jemalloc_ctl::stats::active::read().unwrap();
130    let resident = tikv_jemalloc_ctl::stats::resident::read().unwrap();
131    let metadata = tikv_jemalloc_ctl::stats::metadata::read().unwrap();
132    MemoryStats {
133        allocated,
134        active,
135        resident,
136        metadata,
137    }
138}
139
140impl LruWatermarkController {
141    pub fn tick(&mut self) -> Sequence {
142        // NOTE: Be careful! The meaning of `allocated` and `active` differ in JeMalloc and JVM
143        let MemoryStats {
144            allocated: jemalloc_allocated_bytes,
145            active: jemalloc_active_bytes,
146            resident: jemalloc_resident_bytes,
147            metadata: jemalloc_metadata_bytes,
148        } = jemalloc_memory_stats();
149        let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats();
150
151        let cur_used_memory_bytes = jemalloc_active_bytes + jvm_allocated_bytes;
152
153        // To calculate the total amount of memory that needs to be evicted, we sequentially calculate and accumulate
154        // the memory amounts exceeding the thresholds for aggressive, graceful, and stable, multiplying each by
155        // different weights.
156        //
157        //   (range)                 : (weight)
158        // * aggressive ~ inf        : evict factor aggressive
159        // *   graceful ~ aggressive : evict factor graceful
160        // *     stable ~ graceful   : evict factor stable
161        // *          0 ~ stable     : no eviction
162        //
163        // Why different weights instead of 1.0? It acts like a penalty factor, used to penalize the system for not
164        // keeping the memory usage down at the lower thresholds.
165        let to_evict_bytes = cur_used_memory_bytes.saturating_sub(self.threshold_aggressive) as f64
166            * self.eviction_factor_aggressive
167            + cur_used_memory_bytes
168                .saturating_sub(self.threshold_graceful)
169                .min(self.threshold_aggressive - self.threshold_graceful) as f64
170                * self.eviction_factor_graceful
171            + cur_used_memory_bytes
172                .saturating_sub(self.threshold_stable)
173                .min(self.threshold_graceful - self.threshold_stable) as f64
174                * self.eviction_factor_stable;
175        let ratio = to_evict_bytes / cur_used_memory_bytes as f64;
176        let latest_sequence = SEQUENCE_GLOBAL.load(Ordering::Relaxed);
177        let sequence_diff =
178            ((latest_sequence - self.watermark_sequence) as f64 * ratio) as Sequence;
179        self.watermark_sequence = latest_sequence.min(self.watermark_sequence + sequence_diff);
180
181        let policy = if cur_used_memory_bytes > self.threshold_aggressive {
182            LruEvictionPolicy::Aggressive
183        } else if cur_used_memory_bytes > self.threshold_graceful {
184            LruEvictionPolicy::Graceful
185        } else if cur_used_memory_bytes > self.threshold_stable {
186            LruEvictionPolicy::Stable
187        } else {
188            LruEvictionPolicy::None
189        };
190
191        self.metrics.lru_latest_sequence.set(latest_sequence as _);
192        self.metrics
193            .lru_watermark_sequence
194            .set(self.watermark_sequence as _);
195        self.metrics
196            .lru_eviction_policy
197            .set(Into::<u8>::into(policy) as _);
198
199        self.metrics
200            .jemalloc_allocated_bytes
201            .set(jemalloc_allocated_bytes as _);
202        self.metrics
203            .jemalloc_active_bytes
204            .set(jemalloc_active_bytes as _);
205        self.metrics
206            .jemalloc_resident_bytes
207            .set(jemalloc_resident_bytes as _);
208        self.metrics
209            .jemalloc_metadata_bytes
210            .set(jemalloc_metadata_bytes as _);
211        self.metrics
212            .jvm_allocated_bytes
213            .set(jvm_allocated_bytes as _);
214        self.metrics.jvm_active_bytes.set(jvm_active_bytes as _);
215
216        self.watermark_sequence
217    }
218}