risingwave_compute/memory/
config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use foyer::{LfuConfig, LruConfig, S3FifoConfig};
16use risingwave_common::config::{
17    CacheEvictionConfig, EvictionConfig, MAX_BLOCK_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
18    MIN_BUFFER_SIZE_PER_SHARD, StorageConfig, StorageMemoryConfig,
19};
20use risingwave_common::util::pretty_bytes::convert;
21
22use crate::ComputeNodeOpts;
23
24/// The minimal memory requirement of computing tasks in megabytes.
25pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
26/// The memory reserved for system usage (stack and code segment of processes, allocation
27/// overhead, network buffer, etc.) in megabytes.
28pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;
29
30const RESERVED_MEMORY_LEVELS: [usize; 2] = [16 << 30, usize::MAX];
31
32const RESERVED_MEMORY_PROPORTIONS: [f64; 2] = [0.3, 0.2];
33
34const STORAGE_MEMORY_PROPORTION: f64 = 0.3;
35
36const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;
37
38const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
39
40const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096;
41const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
42const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
43
44/// The proportion of compute memory used for batch processing.
45const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING: f64 = 0.3;
46const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING: f64 = 0.6;
47
48/// Each compute node reserves some memory for stack and code segment of processes, allocation
49/// overhead, network buffer, etc. based on gradient reserve memory proportion. The reserve memory
50/// size must be larger than `MIN_SYSTEM_RESERVED_MEMORY_MB`
51pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
52    if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
53        panic!(
54            "The total memory size ({}) is too small. It must be at least {} MB.",
55            convert(opts.total_memory_bytes as _),
56            MIN_COMPUTE_MEMORY_MB
57        );
58    }
59
60    // If `reserved_memory_bytes` is not set, calculate total_memory_bytes based on gradient reserve memory proportion.
61    let reserved = opts
62        .reserved_memory_bytes
63        .unwrap_or_else(|| gradient_reserve_memory_bytes(opts.total_memory_bytes));
64
65    // Should have at least `MIN_SYSTEM_RESERVED_MEMORY_MB` for reserved memory.
66    let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);
67
68    if reserved >= opts.total_memory_bytes {
69        panic!(
70            "reserved memory ({}) >= total memory ({}).",
71            convert(reserved as _),
72            convert(opts.total_memory_bytes as _)
73        );
74    }
75
76    (reserved, opts.total_memory_bytes - reserved)
77}
78
79/// Calculate the reserved memory based on the total memory size.
80/// The reserved memory size is calculated based on the following gradient:
81/// - 30% of the first 16GB
82/// - 20% of the rest
83pub fn gradient_reserve_memory_bytes(total_memory_bytes: usize) -> usize {
84    let mut total_memory_bytes = total_memory_bytes;
85    let mut reserved = 0;
86    for i in 0..RESERVED_MEMORY_LEVELS.len() {
87        let level_diff = if i == 0 {
88            RESERVED_MEMORY_LEVELS[0]
89        } else {
90            RESERVED_MEMORY_LEVELS[i] - RESERVED_MEMORY_LEVELS[i - 1]
91        };
92        if total_memory_bytes <= level_diff {
93            reserved += (total_memory_bytes as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
94            break;
95        } else {
96            reserved += (level_diff as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
97            total_memory_bytes -= level_diff;
98        }
99    }
100
101    reserved
102}
103
104/// Decide the memory limit for each storage cache. If not specified in `StorageConfig`, memory
105/// limits are calculated based on the proportions to total `non_reserved_memory_bytes`.
106pub fn storage_memory_config(
107    non_reserved_memory_bytes: usize,
108    embedded_compactor_enabled: bool,
109    storage_config: &StorageConfig,
110    is_serving: bool,
111) -> StorageMemoryConfig {
112    let (storage_memory_proportion, compactor_memory_proportion) = if embedded_compactor_enabled {
113        (STORAGE_MEMORY_PROPORTION, COMPACTOR_MEMORY_PROPORTION)
114    } else {
115        (STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0)
116    };
117
118    let storage_memory_bytes = non_reserved_memory_bytes as f64 * storage_memory_proportion;
119
120    // Only if the all cache capacities are specified and their sum doesn't exceed the max allowed storage_memory_bytes, will we use them.
121    // Other invalid combination of the cache capacities config will be ignored.
122    let (
123        config_block_cache_capacity_mb,
124        config_meta_cache_capacity_mb,
125        config_shared_buffer_capacity_mb,
126    ) = match (
127        storage_config.cache.block_cache_capacity_mb,
128        storage_config.cache.meta_cache_capacity_mb,
129        storage_config.shared_buffer_capacity_mb,
130    ) {
131        (
132            Some(block_cache_capacity_mb),
133            Some(meta_cache_capacity_mb),
134            Some(shared_buffer_capacity_mb),
135        ) => {
136            let config_storage_memory_bytes =
137                (block_cache_capacity_mb + meta_cache_capacity_mb + shared_buffer_capacity_mb)
138                    << 20;
139            if config_storage_memory_bytes as f64 > storage_memory_bytes {
140                tracing::warn!(
141                    "config block_cache_capacity_mb {} + meta_cache_capacity_mb {} + shared_buffer_capacity_mb {} = {} exceeds allowed storage_memory_bytes {}. These configs will be ignored.",
142                    block_cache_capacity_mb,
143                    meta_cache_capacity_mb,
144                    shared_buffer_capacity_mb,
145                    convert(config_storage_memory_bytes as _),
146                    convert(storage_memory_bytes as _)
147                );
148                (None, None, None)
149            } else {
150                (
151                    Some(block_cache_capacity_mb),
152                    Some(meta_cache_capacity_mb),
153                    Some(shared_buffer_capacity_mb),
154                )
155            }
156        }
157        c => {
158            tracing::warn!(
159                "config (block_cache_capacity_mb, meta_cache_capacity_mb, shared_buffer_capacity_mb): {:?} should be set altogether. These configs will be ignored.",
160                c
161            );
162            (None, None, None)
163        }
164    };
165
166    let mut default_block_cache_capacity_mb =
167        ((storage_memory_bytes * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
168    let default_meta_cache_capacity_mb =
169        ((storage_memory_bytes * STORAGE_META_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
170    let meta_cache_capacity_mb = config_meta_cache_capacity_mb.unwrap_or(
171        // adapt to old version
172        storage_config
173            .meta_cache_capacity_mb
174            .unwrap_or(default_meta_cache_capacity_mb),
175    );
176
177    let prefetch_buffer_capacity_mb = storage_config
178        .prefetch_buffer_capacity_mb
179        .unwrap_or(default_block_cache_capacity_mb);
180
181    if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
182        default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
183        default_block_cache_capacity_mb =
184            default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb);
185    }
186
187    let default_shared_buffer_capacity_mb =
188        ((storage_memory_bytes * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION).ceil() as usize) >> 20;
189    let mut shared_buffer_capacity_mb = config_shared_buffer_capacity_mb.unwrap_or(std::cmp::min(
190        default_shared_buffer_capacity_mb,
191        STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
192    ));
193    if is_serving {
194        default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
195        // set 1 to pass internal check
196        shared_buffer_capacity_mb = 1;
197    } else if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
198        default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
199        default_block_cache_capacity_mb =
200            default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb);
201    }
202    let block_cache_capacity_mb = config_block_cache_capacity_mb.unwrap_or(
203        // adapt to old version
204        storage_config
205            .block_cache_capacity_mb
206            .unwrap_or(default_block_cache_capacity_mb),
207    );
208
209    let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
210        ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20,
211    );
212
213    // The file cache flush buffer threshold is used as a emergency limitation.
214    // On most cases the flush buffer is not supposed to be as large as the threshold.
215    // So, the file cache flush buffer threshold size is not calculated in the memory usage.
216    let block_file_cache_flush_buffer_threshold_mb = storage_config
217        .data_file_cache
218        .flush_buffer_threshold_mb
219        .unwrap_or(
220            risingwave_common::config::default::storage::block_file_cache_flush_buffer_threshold_mb(
221            ),
222        );
223    let meta_file_cache_flush_buffer_threshold_mb = storage_config
224        .meta_file_cache
225        .flush_buffer_threshold_mb
226        .unwrap_or(
227            risingwave_common::config::default::storage::meta_file_cache_flush_buffer_threshold_mb(
228            ),
229        );
230
231    let total_calculated_mb = block_cache_capacity_mb
232        + meta_cache_capacity_mb
233        + shared_buffer_capacity_mb
234        + compactor_memory_limit_mb;
235    let soft_limit_mb = (non_reserved_memory_bytes as f64
236        * (storage_memory_proportion + compactor_memory_proportion).ceil())
237        as usize
238        >> 20;
239    // + 5 because ceil is used when calculating `total_bytes`.
240    if total_calculated_mb > soft_limit_mb + 5 {
241        tracing::warn!(
242            "The storage memory ({}) exceeds soft limit ({}).",
243            convert((total_calculated_mb << 20) as _),
244            convert((soft_limit_mb << 20) as _)
245        );
246    }
247
248    let meta_cache_shard_num = storage_config
249        .cache
250        .meta_cache_shard_num
251        .unwrap_or_else(|| {
252            let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
253            while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
254                && shard_bits > 0
255            {
256                shard_bits -= 1;
257            }
258            1 << shard_bits
259        });
260    let block_cache_shard_num = storage_config
261        .cache
262        .block_cache_shard_num
263        .unwrap_or_else(|| {
264            let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
265            while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
266                && shard_bits > 0
267            {
268                shard_bits -= 1;
269            }
270            1 << shard_bits
271        });
272
273    let get_eviction_config = |c: &CacheEvictionConfig| match c {
274        CacheEvictionConfig::Lru {
275            high_priority_ratio_in_percent,
276        } => EvictionConfig::Lru(LruConfig {
277            high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
278                // adapt to old version
279                storage_config.high_priority_ratio_in_percent.unwrap_or(
280                    risingwave_common::config::default::storage::high_priority_ratio_in_percent(),
281                ),
282            ) as f64
283                / 100.0,
284        }),
285        CacheEvictionConfig::Lfu {
286            window_capacity_ratio_in_percent,
287            protected_capacity_ratio_in_percent,
288            cmsketch_eps,
289            cmsketch_confidence,
290        } => EvictionConfig::Lfu(LfuConfig {
291            window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or(
292                risingwave_common::config::default::storage::window_capacity_ratio_in_percent(),
293            ) as f64
294                / 100.0,
295            protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or(
296                risingwave_common::config::default::storage::protected_capacity_ratio_in_percent(),
297            ) as f64
298                / 100.0,
299            cmsketch_eps: cmsketch_eps
300                .unwrap_or(risingwave_common::config::default::storage::cmsketch_eps()),
301            cmsketch_confidence: cmsketch_confidence
302                .unwrap_or(risingwave_common::config::default::storage::cmsketch_confidence()),
303        }),
304        CacheEvictionConfig::S3Fifo {
305            small_queue_capacity_ratio_in_percent,
306            ghost_queue_capacity_ratio_in_percent,
307            small_to_main_freq_threshold,
308        } => EvictionConfig::S3Fifo(S3FifoConfig {
309            small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent.unwrap_or(
310                risingwave_common::config::default::storage::small_queue_capacity_ratio_in_percent(
311                ),
312            ) as f64
313                / 100.0,
314            ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent.unwrap_or(
315                risingwave_common::config::default::storage::ghost_queue_capacity_ratio_in_percent(
316                ),
317            ) as f64
318                / 100.0,
319            small_to_main_freq_threshold: small_to_main_freq_threshold.unwrap_or(
320                risingwave_common::config::default::storage::small_to_main_freq_threshold(),
321            ),
322        }),
323    };
324
325    let block_cache_eviction_config =
326        get_eviction_config(&storage_config.cache.block_cache_eviction);
327    let meta_cache_eviction_config = get_eviction_config(&storage_config.cache.meta_cache_eviction);
328
329    StorageMemoryConfig {
330        block_cache_capacity_mb,
331        block_cache_shard_num,
332        meta_cache_capacity_mb,
333        meta_cache_shard_num,
334        shared_buffer_capacity_mb,
335        compactor_memory_limit_mb,
336        prefetch_buffer_capacity_mb,
337        block_cache_eviction_config,
338        meta_cache_eviction_config,
339        block_file_cache_flush_buffer_threshold_mb,
340        meta_file_cache_flush_buffer_threshold_mb,
341    }
342}
343
344pub fn batch_mem_limit(compute_memory_bytes: usize, is_serving_node: bool) -> u64 {
345    if is_serving_node {
346        (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING) as u64
347    } else {
348        (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING) as u64
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use clap::Parser;
355    use risingwave_common::config::StorageConfig;
356
357    use super::{reserve_memory_bytes, storage_memory_config};
358    use crate::ComputeNodeOpts;
359
360    #[test]
361    fn test_reserve_memory_bytes() {
362        // at least 512 MB
363        {
364            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
365            opts.total_memory_bytes = 1536 << 20;
366            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
367            assert_eq!(reserved, 512 << 20);
368            assert_eq!(non_reserved, 1024 << 20);
369        }
370
371        // reserve based on proportion
372        {
373            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
374            opts.total_memory_bytes = 10 << 30;
375            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
376            assert_eq!(reserved, 3 << 30);
377            assert_eq!(non_reserved, 7 << 30);
378        }
379
380        // reserve based on opts
381        {
382            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
383            opts.total_memory_bytes = 10 << 30;
384            opts.reserved_memory_bytes = Some(2 << 30);
385            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
386            assert_eq!(reserved, 2 << 30);
387            assert_eq!(non_reserved, 8 << 30);
388        }
389    }
390
391    #[test]
392    fn test_storage_memory_config() {
393        let mut storage_config = StorageConfig::default();
394
395        let total_non_reserved_memory_bytes = 8 << 30;
396
397        // Embedded compactor enabled, streaming node
398        let memory_config = storage_memory_config(
399            total_non_reserved_memory_bytes,
400            true,
401            &storage_config,
402            false,
403        );
404        assert_eq!(memory_config.block_cache_capacity_mb, 737);
405        assert_eq!(memory_config.meta_cache_capacity_mb, 860);
406        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
407        assert_eq!(memory_config.compactor_memory_limit_mb, 819);
408
409        // Embedded compactor disabled, serving node
410        let memory_config = storage_memory_config(
411            total_non_reserved_memory_bytes,
412            false,
413            &storage_config,
414            true,
415        );
416        assert_eq!(memory_config.block_cache_capacity_mb, 1966);
417        assert_eq!(memory_config.meta_cache_capacity_mb, 1146);
418        assert_eq!(memory_config.shared_buffer_capacity_mb, 1);
419        assert_eq!(memory_config.compactor_memory_limit_mb, 0);
420
421        // Embedded compactor enabled, streaming node, file cache
422        storage_config.data_file_cache.dir = "data".to_owned();
423        storage_config.meta_file_cache.dir = "meta".to_owned();
424        let memory_config = storage_memory_config(
425            total_non_reserved_memory_bytes,
426            true,
427            &storage_config,
428            false,
429        );
430        assert_eq!(memory_config.block_cache_capacity_mb, 737);
431        assert_eq!(memory_config.meta_cache_capacity_mb, 860);
432        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
433        assert_eq!(memory_config.compactor_memory_limit_mb, 819);
434
435        // Embedded compactor enabled, streaming node, file cache, cache capacities specified
436        storage_config.cache.block_cache_capacity_mb = Some(512);
437        storage_config.cache.meta_cache_capacity_mb = Some(128);
438        storage_config.shared_buffer_capacity_mb = Some(1024);
439        storage_config.compactor_memory_limit_mb = Some(512);
440        let memory_config = storage_memory_config(
441            total_non_reserved_memory_bytes,
442            true,
443            &storage_config,
444            false,
445        );
446        assert_eq!(memory_config.block_cache_capacity_mb, 512);
447        assert_eq!(memory_config.meta_cache_capacity_mb, 128);
448        assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
449        assert_eq!(memory_config.compactor_memory_limit_mb, 512);
450    }
451
452    #[test]
453    fn test_gradient_reserve_memory_bytes() {
454        assert_eq!(super::gradient_reserve_memory_bytes(4 << 30), 1288490188);
455        assert_eq!(super::gradient_reserve_memory_bytes(8 << 30), 2576980377);
456        assert_eq!(super::gradient_reserve_memory_bytes(16 << 30), 5153960755);
457        assert_eq!(super::gradient_reserve_memory_bytes(24 << 30), 6871947673);
458        assert_eq!(super::gradient_reserve_memory_bytes(32 << 30), 8589934591);
459        assert_eq!(super::gradient_reserve_memory_bytes(54 << 30), 13314398617);
460        assert_eq!(super::gradient_reserve_memory_bytes(64 << 30), 15461882265);
461        assert_eq!(super::gradient_reserve_memory_bytes(100 << 30), 23192823398);
462        assert_eq!(super::gradient_reserve_memory_bytes(128 << 30), 29205777612);
463    }
464}