risingwave_compute/memory/
config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use foyer::{LfuConfig, LruConfig, S3FifoConfig};
16use risingwave_common::config::{
17    CacheEvictionConfig, EvictionConfig, MAX_BLOCK_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
18    MIN_BUFFER_SIZE_PER_SHARD, StorageConfig, StorageMemoryConfig,
19};
20use risingwave_common::util::pretty_bytes::convert;
21
22use crate::ComputeNodeOpts;
23
24/// The minimal memory requirement of computing tasks in megabytes.
25pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
26/// The memory reserved for system usage (stack and code segment of processes, allocation
27/// overhead, network buffer, etc.) in megabytes.
28pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;
29
30const RESERVED_MEMORY_LEVELS: [usize; 2] = [16 << 30, usize::MAX];
31
32const RESERVED_MEMORY_PROPORTIONS: [f64; 2] = [0.3, 0.2];
33
34const STORAGE_MEMORY_PROPORTION: f64 = 0.3;
35
36const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;
37
38const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
39
40const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096;
41const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
42const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
43
44/// The proportion of compute memory used for batch processing.
45const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING: f64 = 0.3;
46const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING: f64 = 0.6;
47
48/// Each compute node reserves some memory for stack and code segment of processes, allocation
49/// overhead, network buffer, etc. based on gradient reserve memory proportion. The reserve memory
50/// size must be larger than `MIN_SYSTEM_RESERVED_MEMORY_MB`
51pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
52    if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
53        panic!(
54            "The total memory size ({}) is too small. It must be at least {} MB.",
55            convert(opts.total_memory_bytes as _),
56            MIN_COMPUTE_MEMORY_MB
57        );
58    }
59
60    // If `reserved_memory_bytes` is not set, calculate total_memory_bytes based on gradient reserve memory proportion.
61    let reserved = opts
62        .reserved_memory_bytes
63        .unwrap_or_else(|| gradient_reserve_memory_bytes(opts.total_memory_bytes));
64
65    // Should have at least `MIN_SYSTEM_RESERVED_MEMORY_MB` for reserved memory.
66    let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);
67
68    if reserved >= opts.total_memory_bytes {
69        panic!(
70            "reserved memory ({}) >= total memory ({}).",
71            convert(reserved as _),
72            convert(opts.total_memory_bytes as _)
73        );
74    }
75
76    (reserved, opts.total_memory_bytes - reserved)
77}
78
79/// Calculate the reserved memory based on the total memory size.
80/// The reserved memory size is calculated based on the following gradient:
81/// - 30% of the first 16GB
82/// - 20% of the rest
83pub fn gradient_reserve_memory_bytes(total_memory_bytes: usize) -> usize {
84    let mut total_memory_bytes = total_memory_bytes;
85    let mut reserved = 0;
86    for i in 0..RESERVED_MEMORY_LEVELS.len() {
87        let level_diff = if i == 0 {
88            RESERVED_MEMORY_LEVELS[0]
89        } else {
90            RESERVED_MEMORY_LEVELS[i] - RESERVED_MEMORY_LEVELS[i - 1]
91        };
92        if total_memory_bytes <= level_diff {
93            reserved += (total_memory_bytes as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
94            break;
95        } else {
96            reserved += (level_diff as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
97            total_memory_bytes -= level_diff;
98        }
99    }
100
101    reserved
102}
103
104/// Decide the memory limit for each storage cache. If not specified in `StorageConfig`, memory
105/// limits are calculated based on the proportions to total `non_reserved_memory_bytes`.
106pub fn storage_memory_config(
107    non_reserved_memory_bytes: usize,
108    embedded_compactor_enabled: bool,
109    storage_config: &StorageConfig,
110    is_serving: bool,
111) -> StorageMemoryConfig {
112    let (storage_memory_proportion, compactor_memory_proportion) = if embedded_compactor_enabled {
113        (STORAGE_MEMORY_PROPORTION, COMPACTOR_MEMORY_PROPORTION)
114    } else {
115        (STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0)
116    };
117
118    let storage_memory_bytes = non_reserved_memory_bytes as f64 * storage_memory_proportion;
119
120    // Only if the all cache capacities are specified and their sum doesn't exceed the max allowed storage_memory_bytes, will we use them.
121    // Other invalid combination of the cache capacities config will be ignored.
122    let (
123        config_block_cache_capacity_mb,
124        config_meta_cache_capacity_mb,
125        config_shared_buffer_capacity_mb,
126    ) = match (
127        storage_config.cache.block_cache_capacity_mb,
128        storage_config.cache.meta_cache_capacity_mb,
129        storage_config.shared_buffer_capacity_mb,
130    ) {
131        (
132            Some(block_cache_capacity_mb),
133            Some(meta_cache_capacity_mb),
134            Some(shared_buffer_capacity_mb),
135        ) => {
136            let config_storage_memory_bytes =
137                (block_cache_capacity_mb + meta_cache_capacity_mb + shared_buffer_capacity_mb)
138                    << 20;
139            if config_storage_memory_bytes as f64 > storage_memory_bytes {
140                tracing::warn!(
141                    "config block_cache_capacity_mb {} + meta_cache_capacity_mb {} + shared_buffer_capacity_mb {} = {} exceeds allowed storage_memory_bytes {}. These configs will be ignored.",
142                    block_cache_capacity_mb,
143                    meta_cache_capacity_mb,
144                    shared_buffer_capacity_mb,
145                    convert(config_storage_memory_bytes as _),
146                    convert(storage_memory_bytes as _)
147                );
148                (None, None, None)
149            } else {
150                (
151                    Some(block_cache_capacity_mb),
152                    Some(meta_cache_capacity_mb),
153                    Some(shared_buffer_capacity_mb),
154                )
155            }
156        }
157        c => {
158            tracing::warn!(
159                "config (block_cache_capacity_mb, meta_cache_capacity_mb, shared_buffer_capacity_mb): {:?} should be set altogether. These configs will be ignored.",
160                c
161            );
162            (None, None, None)
163        }
164    };
165
166    let mut default_block_cache_capacity_mb =
167        ((storage_memory_bytes * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
168    let default_meta_cache_capacity_mb =
169        ((storage_memory_bytes * STORAGE_META_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
170    let meta_cache_capacity_mb = config_meta_cache_capacity_mb.unwrap_or(
171        // adapt to old version
172        storage_config
173            .meta_cache_capacity_mb
174            .unwrap_or(default_meta_cache_capacity_mb),
175    );
176
177    let prefetch_buffer_capacity_mb = storage_config
178        .prefetch_buffer_capacity_mb
179        .unwrap_or(default_block_cache_capacity_mb);
180
181    if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
182        default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
183        default_block_cache_capacity_mb =
184            default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb);
185    }
186
187    let default_shared_buffer_capacity_mb =
188        ((storage_memory_bytes * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION).ceil() as usize) >> 20;
189    let mut shared_buffer_capacity_mb = config_shared_buffer_capacity_mb.unwrap_or(std::cmp::min(
190        default_shared_buffer_capacity_mb,
191        STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
192    ));
193    if is_serving {
194        default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
195        // set 1 to pass internal check
196        shared_buffer_capacity_mb = 1;
197    } else if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
198        default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
199        default_block_cache_capacity_mb =
200            default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb);
201    }
202    let block_cache_capacity_mb = config_block_cache_capacity_mb.unwrap_or(
203        // adapt to old version
204        storage_config
205            .block_cache_capacity_mb
206            .unwrap_or(default_block_cache_capacity_mb),
207    );
208
209    let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
210        ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20,
211    );
212
213    // The file cache flush buffer threshold is used as a emergency limitation.
214    // On most cases the flush buffer is not supposed to be as large as the threshold.
215    // So, the file cache flush buffer threshold size is not calculated in the memory usage.
216    let block_file_cache_flush_buffer_threshold_mb = storage_config
217        .data_file_cache
218        .flush_buffer_threshold_mb
219        .unwrap_or(
220            risingwave_common::config::default::storage::block_file_cache_flush_buffer_threshold_mb(
221            ),
222        );
223    let meta_file_cache_flush_buffer_threshold_mb = storage_config
224        .meta_file_cache
225        .flush_buffer_threshold_mb
226        .unwrap_or(
227            risingwave_common::config::default::storage::meta_file_cache_flush_buffer_threshold_mb(
228            ),
229        );
230
231    let total_calculated_mb = block_cache_capacity_mb
232        + meta_cache_capacity_mb
233        + shared_buffer_capacity_mb
234        + compactor_memory_limit_mb;
235    let soft_limit_mb = (non_reserved_memory_bytes as f64
236        * (storage_memory_proportion + compactor_memory_proportion).ceil())
237        as usize
238        >> 20;
239    // + 5 because ceil is used when calculating `total_bytes`.
240    if total_calculated_mb > soft_limit_mb + 5 {
241        tracing::warn!(
242            "The storage memory ({}) exceeds soft limit ({}).",
243            convert((total_calculated_mb << 20) as _),
244            convert((soft_limit_mb << 20) as _)
245        );
246    }
247
248    let meta_cache_shard_num = storage_config
249        .cache
250        .meta_cache_shard_num
251        .unwrap_or_else(|| {
252            let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
253            while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
254                && shard_bits > 0
255            {
256                shard_bits -= 1;
257            }
258            1 << shard_bits
259        });
260    let block_cache_shard_num = storage_config
261        .cache
262        .block_cache_shard_num
263        .unwrap_or_else(|| {
264            let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
265            while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
266                && shard_bits > 0
267            {
268                shard_bits -= 1;
269            }
270            1 << shard_bits
271        });
272
273    let get_eviction_config = |c: &CacheEvictionConfig| match c {
274        CacheEvictionConfig::Lru {
275            high_priority_ratio_in_percent,
276        } => EvictionConfig::Lru(LruConfig {
277            high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
278                // adapt to old version
279                storage_config.high_priority_ratio_in_percent.unwrap_or(
280                    risingwave_common::config::default::storage::high_priority_ratio_in_percent(),
281                ),
282            ) as f64
283                / 100.0,
284        }),
285        CacheEvictionConfig::Lfu {
286            window_capacity_ratio_in_percent,
287            protected_capacity_ratio_in_percent,
288            cmsketch_eps,
289            cmsketch_confidence,
290        } => EvictionConfig::Lfu(LfuConfig {
291            window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or(
292                risingwave_common::config::default::storage::window_capacity_ratio_in_percent(),
293            ) as f64
294                / 100.0,
295            protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or(
296                risingwave_common::config::default::storage::protected_capacity_ratio_in_percent(),
297            ) as f64
298                / 100.0,
299            cmsketch_eps: cmsketch_eps
300                .unwrap_or(risingwave_common::config::default::storage::cmsketch_eps()),
301            cmsketch_confidence: cmsketch_confidence
302                .unwrap_or(risingwave_common::config::default::storage::cmsketch_confidence()),
303        }),
304        CacheEvictionConfig::S3Fifo {
305            small_queue_capacity_ratio_in_percent,
306            ghost_queue_capacity_ratio_in_percent,
307            small_to_main_freq_threshold,
308        } => EvictionConfig::S3Fifo(S3FifoConfig {
309            small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent.unwrap_or(
310                risingwave_common::config::default::storage::small_queue_capacity_ratio_in_percent(
311                ),
312            ) as f64
313                / 100.0,
314            ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent.unwrap_or(
315                risingwave_common::config::default::storage::ghost_queue_capacity_ratio_in_percent(
316                ),
317            ) as f64
318                / 100.0,
319            small_to_main_freq_threshold: small_to_main_freq_threshold.unwrap_or(
320                risingwave_common::config::default::storage::small_to_main_freq_threshold(),
321            ),
322        }),
323    };
324
325    let block_cache_eviction_config =
326        get_eviction_config(&storage_config.cache.block_cache_eviction);
327    let meta_cache_eviction_config = get_eviction_config(&storage_config.cache.meta_cache_eviction);
328    let vector_block_cache_eviction_config =
329        get_eviction_config(&storage_config.cache.vector_block_cache_eviction_config);
330    let vector_meta_cache_eviction_config =
331        get_eviction_config(&storage_config.cache.vector_meta_cache_eviction_config);
332
333    StorageMemoryConfig {
334        block_cache_capacity_mb,
335        block_cache_shard_num,
336        meta_cache_capacity_mb,
337        meta_cache_shard_num,
338        vector_block_cache_capacity_mb: storage_config.cache.vector_block_cache_capacity_mb,
339        vector_block_cache_shard_num: storage_config.cache.vector_block_cache_shard_num,
340        vector_meta_cache_capacity_mb: storage_config.cache.vector_meta_cache_capacity_mb,
341        vector_meta_cache_shard_num: storage_config.cache.vector_meta_cache_shard_num,
342        shared_buffer_capacity_mb,
343        compactor_memory_limit_mb,
344        prefetch_buffer_capacity_mb,
345        block_cache_eviction_config,
346        meta_cache_eviction_config,
347        vector_block_cache_eviction_config,
348        vector_meta_cache_eviction_config,
349        block_file_cache_flush_buffer_threshold_mb,
350        meta_file_cache_flush_buffer_threshold_mb,
351    }
352}
353
354pub fn batch_mem_limit(compute_memory_bytes: usize, is_serving_node: bool) -> u64 {
355    if is_serving_node {
356        (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING) as u64
357    } else {
358        (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING) as u64
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use clap::Parser;
365    use risingwave_common::config::StorageConfig;
366
367    use super::{reserve_memory_bytes, storage_memory_config};
368    use crate::ComputeNodeOpts;
369
370    #[test]
371    fn test_reserve_memory_bytes() {
372        // at least 512 MB
373        {
374            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
375            opts.total_memory_bytes = 1536 << 20;
376            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
377            assert_eq!(reserved, 512 << 20);
378            assert_eq!(non_reserved, 1024 << 20);
379        }
380
381        // reserve based on proportion
382        {
383            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
384            opts.total_memory_bytes = 10 << 30;
385            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
386            assert_eq!(reserved, 3 << 30);
387            assert_eq!(non_reserved, 7 << 30);
388        }
389
390        // reserve based on opts
391        {
392            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
393            opts.total_memory_bytes = 10 << 30;
394            opts.reserved_memory_bytes = Some(2 << 30);
395            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
396            assert_eq!(reserved, 2 << 30);
397            assert_eq!(non_reserved, 8 << 30);
398        }
399    }
400
401    #[test]
402    fn test_storage_memory_config() {
403        let mut storage_config = StorageConfig::default();
404
405        let total_non_reserved_memory_bytes = 8 << 30;
406
407        // Embedded compactor enabled, streaming node
408        let memory_config = storage_memory_config(
409            total_non_reserved_memory_bytes,
410            true,
411            &storage_config,
412            false,
413        );
414        assert_eq!(memory_config.block_cache_capacity_mb, 737);
415        assert_eq!(memory_config.meta_cache_capacity_mb, 860);
416        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
417        assert_eq!(memory_config.compactor_memory_limit_mb, 819);
418
419        // Embedded compactor disabled, serving node
420        let memory_config = storage_memory_config(
421            total_non_reserved_memory_bytes,
422            false,
423            &storage_config,
424            true,
425        );
426        assert_eq!(memory_config.block_cache_capacity_mb, 1966);
427        assert_eq!(memory_config.meta_cache_capacity_mb, 1146);
428        assert_eq!(memory_config.shared_buffer_capacity_mb, 1);
429        assert_eq!(memory_config.compactor_memory_limit_mb, 0);
430
431        // Embedded compactor enabled, streaming node, file cache
432        storage_config.data_file_cache.dir = "data".to_owned();
433        storage_config.meta_file_cache.dir = "meta".to_owned();
434        let memory_config = storage_memory_config(
435            total_non_reserved_memory_bytes,
436            true,
437            &storage_config,
438            false,
439        );
440        assert_eq!(memory_config.block_cache_capacity_mb, 737);
441        assert_eq!(memory_config.meta_cache_capacity_mb, 860);
442        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
443        assert_eq!(memory_config.compactor_memory_limit_mb, 819);
444
445        // Embedded compactor enabled, streaming node, file cache, cache capacities specified
446        storage_config.cache.block_cache_capacity_mb = Some(512);
447        storage_config.cache.meta_cache_capacity_mb = Some(128);
448        storage_config.shared_buffer_capacity_mb = Some(1024);
449        storage_config.compactor_memory_limit_mb = Some(512);
450        let memory_config = storage_memory_config(
451            total_non_reserved_memory_bytes,
452            true,
453            &storage_config,
454            false,
455        );
456        assert_eq!(memory_config.block_cache_capacity_mb, 512);
457        assert_eq!(memory_config.meta_cache_capacity_mb, 128);
458        assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
459        assert_eq!(memory_config.compactor_memory_limit_mb, 512);
460    }
461
462    #[test]
463    fn test_gradient_reserve_memory_bytes() {
464        assert_eq!(super::gradient_reserve_memory_bytes(4 << 30), 1288490188);
465        assert_eq!(super::gradient_reserve_memory_bytes(8 << 30), 2576980377);
466        assert_eq!(super::gradient_reserve_memory_bytes(16 << 30), 5153960755);
467        assert_eq!(super::gradient_reserve_memory_bytes(24 << 30), 6871947673);
468        assert_eq!(super::gradient_reserve_memory_bytes(32 << 30), 8589934591);
469        assert_eq!(super::gradient_reserve_memory_bytes(54 << 30), 13314398617);
470        assert_eq!(super::gradient_reserve_memory_bytes(64 << 30), 15461882265);
471        assert_eq!(super::gradient_reserve_memory_bytes(100 << 30), 23192823398);
472        assert_eq!(super::gradient_reserve_memory_bytes(128 << 30), 29205777612);
473    }
474}