risingwave_compute/memory/
config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use foyer::{LfuConfig, LruConfig, S3FifoConfig};
16use risingwave_common::config::{
17    CacheEvictionConfig, EvictionConfig, MAX_BLOCK_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
18    MIN_BUFFER_SIZE_PER_SHARD, StorageConfig, StorageMemoryConfig,
19};
20use risingwave_common::util::pretty_bytes::convert;
21
22use crate::ComputeNodeOpts;
23
24/// The minimal memory requirement of computing tasks in megabytes.
25pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
26/// The memory reserved for system usage (stack and code segment of processes, allocation
27/// overhead, network buffer, etc.) in megabytes.
28pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;
29
30const RESERVED_MEMORY_LEVELS: [usize; 2] = [16 << 30, usize::MAX];
31
32const RESERVED_MEMORY_PROPORTIONS: [f64; 2] = [0.3, 0.2];
33
34const STORAGE_MEMORY_PROPORTION: f64 = 0.3;
35
36const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;
37
38const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
39
40const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096;
41const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
42const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
43
44/// The proportion of compute memory used for batch processing.
45const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING: f64 = 0.3;
46const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING: f64 = 0.6;
47
48/// Each compute node reserves some memory for stack and code segment of processes, allocation
49/// overhead, network buffer, etc. based on gradient reserve memory proportion. The reserve memory
50/// size must be larger than `MIN_SYSTEM_RESERVED_MEMORY_MB`
51pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
52    if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
53        panic!(
54            "The total memory size ({}) is too small. It must be at least {} MB.",
55            convert(opts.total_memory_bytes as _),
56            MIN_COMPUTE_MEMORY_MB
57        );
58    }
59
60    // If `reserved_memory_bytes` is not set, calculate total_memory_bytes based on gradient reserve memory proportion.
61    let reserved = opts
62        .reserved_memory_bytes
63        .unwrap_or_else(|| gradient_reserve_memory_bytes(opts.total_memory_bytes));
64
65    // Should have at least `MIN_SYSTEM_RESERVED_MEMORY_MB` for reserved memory.
66    let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);
67
68    if reserved >= opts.total_memory_bytes {
69        panic!(
70            "reserved memory ({}) >= total memory ({}).",
71            convert(reserved as _),
72            convert(opts.total_memory_bytes as _)
73        );
74    }
75
76    (reserved, opts.total_memory_bytes - reserved)
77}
78
79/// Calculate the reserved memory based on the total memory size.
80/// The reserved memory size is calculated based on the following gradient:
81/// - 30% of the first 16GB
82/// - 20% of the rest
83pub fn gradient_reserve_memory_bytes(total_memory_bytes: usize) -> usize {
84    let mut total_memory_bytes = total_memory_bytes;
85    let mut reserved = 0;
86    for i in 0..RESERVED_MEMORY_LEVELS.len() {
87        let level_diff = if i == 0 {
88            RESERVED_MEMORY_LEVELS[0]
89        } else {
90            RESERVED_MEMORY_LEVELS[i] - RESERVED_MEMORY_LEVELS[i - 1]
91        };
92        if total_memory_bytes <= level_diff {
93            reserved += (total_memory_bytes as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
94            break;
95        } else {
96            reserved += (level_diff as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
97            total_memory_bytes -= level_diff;
98        }
99    }
100
101    reserved
102}
103
104/// Decide the memory limit for each storage cache. If not specified in `StorageConfig`, memory
105/// limits are calculated based on the proportions to total `non_reserved_memory_bytes`.
106pub fn storage_memory_config(
107    non_reserved_memory_bytes: usize,
108    embedded_compactor_enabled: bool,
109    storage_config: &StorageConfig,
110    is_serving: bool,
111) -> StorageMemoryConfig {
112    let (storage_memory_proportion, compactor_memory_proportion) = if embedded_compactor_enabled {
113        (STORAGE_MEMORY_PROPORTION, COMPACTOR_MEMORY_PROPORTION)
114    } else {
115        (STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0)
116    };
117
118    let storage_memory_bytes = non_reserved_memory_bytes as f64 * storage_memory_proportion;
119
120    // Only if the all cache capacities are specified and their sum doesn't exceed the max allowed storage_memory_bytes, will we use them.
121    // Other invalid combination of the cache capacities config will be ignored.
122    let (
123        config_block_cache_capacity_mb,
124        config_meta_cache_capacity_mb,
125        config_shared_buffer_capacity_mb,
126    ) = match (
127        storage_config.cache.block_cache_capacity_mb,
128        storage_config.cache.meta_cache_capacity_mb,
129        storage_config.shared_buffer_capacity_mb,
130    ) {
131        (
132            Some(block_cache_capacity_mb),
133            Some(meta_cache_capacity_mb),
134            Some(shared_buffer_capacity_mb),
135        ) => {
136            let config_storage_memory_bytes =
137                (block_cache_capacity_mb + meta_cache_capacity_mb + shared_buffer_capacity_mb)
138                    << 20;
139            if config_storage_memory_bytes as f64 > storage_memory_bytes {
140                tracing::warn!(
141                    "config block_cache_capacity_mb {} + meta_cache_capacity_mb {} + shared_buffer_capacity_mb {} = {} exceeds allowed storage_memory_bytes {}. These configs will be ignored.",
142                    block_cache_capacity_mb,
143                    meta_cache_capacity_mb,
144                    shared_buffer_capacity_mb,
145                    convert(config_storage_memory_bytes as _),
146                    convert(storage_memory_bytes as _)
147                );
148                (None, None, None)
149            } else {
150                (
151                    Some(block_cache_capacity_mb),
152                    Some(meta_cache_capacity_mb),
153                    Some(shared_buffer_capacity_mb),
154                )
155            }
156        }
157        c => {
158            tracing::warn!(
159                "config (block_cache_capacity_mb, meta_cache_capacity_mb, shared_buffer_capacity_mb): {:?} should be set altogether. These configs will be ignored.",
160                c
161            );
162            (None, None, None)
163        }
164    };
165
166    let mut default_block_cache_capacity_mb =
167        ((storage_memory_bytes * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
168    let default_meta_cache_capacity_mb =
169        ((storage_memory_bytes * STORAGE_META_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
170    let meta_cache_capacity_mb = config_meta_cache_capacity_mb.unwrap_or(
171        // adapt to old version
172        storage_config
173            .meta_cache_capacity_mb
174            .unwrap_or(default_meta_cache_capacity_mb),
175    );
176
177    let prefetch_buffer_capacity_mb = storage_config
178        .prefetch_buffer_capacity_mb
179        .unwrap_or(default_block_cache_capacity_mb);
180
181    if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
182        default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
183        default_block_cache_capacity_mb =
184            default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb);
185    }
186
187    let default_shared_buffer_capacity_mb =
188        ((storage_memory_bytes * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION).ceil() as usize) >> 20;
189    let mut shared_buffer_capacity_mb = config_shared_buffer_capacity_mb.unwrap_or(std::cmp::min(
190        default_shared_buffer_capacity_mb,
191        STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
192    ));
193    if is_serving {
194        default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
195        // set 1 to pass internal check
196        shared_buffer_capacity_mb = 1;
197    } else if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
198        default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
199        default_block_cache_capacity_mb =
200            default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb);
201    }
202    let block_cache_capacity_mb = config_block_cache_capacity_mb.unwrap_or(
203        // adapt to old version
204        storage_config
205            .block_cache_capacity_mb
206            .unwrap_or(default_block_cache_capacity_mb),
207    );
208
209    let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
210        ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20,
211    );
212
213    // The file cache flush buffer threshold is used as a emergency limitation.
214    // On most cases the flush buffer is not supposed to be as large as the threshold.
215    // So, the file cache flush buffer threshold size is not calculated in the memory usage.
216    let block_file_cache_flush_buffer_threshold_mb = storage_config
217        .data_file_cache
218        .flush_buffer_threshold_mb
219        .unwrap_or(
220            risingwave_common::config::storage::default::storage::block_file_cache_flush_buffer_threshold_mb(
221            ),
222        );
223    let meta_file_cache_flush_buffer_threshold_mb = storage_config
224        .meta_file_cache
225        .flush_buffer_threshold_mb
226        .unwrap_or(
227            risingwave_common::config::storage::default::storage::meta_file_cache_flush_buffer_threshold_mb(
228            ),
229        );
230
231    let total_calculated_mb = block_cache_capacity_mb
232        + meta_cache_capacity_mb
233        + shared_buffer_capacity_mb
234        + compactor_memory_limit_mb;
235    let soft_limit_mb = (non_reserved_memory_bytes as f64
236        * (storage_memory_proportion + compactor_memory_proportion).ceil())
237        as usize
238        >> 20;
239    // + 5 because ceil is used when calculating `total_bytes`.
240    if total_calculated_mb > soft_limit_mb + 5 {
241        tracing::warn!(
242            "The storage memory ({}) exceeds soft limit ({}).",
243            convert((total_calculated_mb << 20) as _),
244            convert((soft_limit_mb << 20) as _)
245        );
246    }
247
248    let meta_cache_shard_num = storage_config
249        .cache
250        .meta_cache_shard_num
251        .unwrap_or_else(|| {
252            let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
253            while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
254                && shard_bits > 0
255            {
256                shard_bits -= 1;
257            }
258            1 << shard_bits
259        });
260    let block_cache_shard_num = storage_config
261        .cache
262        .block_cache_shard_num
263        .unwrap_or_else(|| {
264            let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
265            while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
266                && shard_bits > 0
267            {
268                shard_bits -= 1;
269            }
270            1 << shard_bits
271        });
272
273    let get_eviction_config = |c: &CacheEvictionConfig| {
274        match c {
275        CacheEvictionConfig::Lru {
276            high_priority_ratio_in_percent,
277        } => EvictionConfig::Lru(LruConfig {
278            high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
279                // adapt to old version
280                storage_config.high_priority_ratio_in_percent.unwrap_or(
281                    risingwave_common::config::storage::default::storage::high_priority_ratio_in_percent(),
282                ),
283            ) as f64
284                / 100.0,
285        }),
286        CacheEvictionConfig::Lfu {
287            window_capacity_ratio_in_percent,
288            protected_capacity_ratio_in_percent,
289            cmsketch_eps,
290            cmsketch_confidence,
291        } => EvictionConfig::Lfu(LfuConfig {
292            window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or(
293                risingwave_common::config::storage::default::storage::window_capacity_ratio_in_percent(),
294            ) as f64
295                / 100.0,
296            protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or(
297                risingwave_common::config::storage::default::storage::protected_capacity_ratio_in_percent(),
298            ) as f64
299                / 100.0,
300            cmsketch_eps: cmsketch_eps
301                .unwrap_or(risingwave_common::config::storage::default::storage::cmsketch_eps()),
302            cmsketch_confidence: cmsketch_confidence
303                .unwrap_or(risingwave_common::config::storage::default::storage::cmsketch_confidence()),
304        }),
305        CacheEvictionConfig::S3Fifo {
306            small_queue_capacity_ratio_in_percent,
307            ghost_queue_capacity_ratio_in_percent,
308            small_to_main_freq_threshold,
309        } => EvictionConfig::S3Fifo(S3FifoConfig {
310            small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent.unwrap_or(
311                risingwave_common::config::storage::default::storage::small_queue_capacity_ratio_in_percent(
312                ),
313            ) as f64
314                / 100.0,
315            ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent.unwrap_or(
316                risingwave_common::config::storage::default::storage::ghost_queue_capacity_ratio_in_percent(
317                ),
318            ) as f64
319                / 100.0,
320            small_to_main_freq_threshold: small_to_main_freq_threshold.unwrap_or(
321                risingwave_common::config::storage::default::storage::small_to_main_freq_threshold(),
322            ),
323        }),
324    }
325    };
326
327    let block_cache_eviction_config =
328        get_eviction_config(&storage_config.cache.block_cache_eviction);
329    let meta_cache_eviction_config = get_eviction_config(&storage_config.cache.meta_cache_eviction);
330    let vector_block_cache_eviction_config =
331        get_eviction_config(&storage_config.cache.vector_block_cache_eviction_config);
332    let vector_meta_cache_eviction_config =
333        get_eviction_config(&storage_config.cache.vector_meta_cache_eviction_config);
334
335    StorageMemoryConfig {
336        block_cache_capacity_mb,
337        block_cache_shard_num,
338        meta_cache_capacity_mb,
339        meta_cache_shard_num,
340        vector_block_cache_capacity_mb: storage_config.cache.vector_block_cache_capacity_mb,
341        vector_block_cache_shard_num: storage_config.cache.vector_block_cache_shard_num,
342        vector_meta_cache_capacity_mb: storage_config.cache.vector_meta_cache_capacity_mb,
343        vector_meta_cache_shard_num: storage_config.cache.vector_meta_cache_shard_num,
344        shared_buffer_capacity_mb,
345        compactor_memory_limit_mb,
346        prefetch_buffer_capacity_mb,
347        block_cache_eviction_config,
348        meta_cache_eviction_config,
349        vector_block_cache_eviction_config,
350        vector_meta_cache_eviction_config,
351        block_file_cache_flush_buffer_threshold_mb,
352        meta_file_cache_flush_buffer_threshold_mb,
353    }
354}
355
356pub fn batch_mem_limit(compute_memory_bytes: usize, is_serving_node: bool) -> u64 {
357    if is_serving_node {
358        (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING) as u64
359    } else {
360        (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING) as u64
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use clap::Parser;
367    use risingwave_common::config::StorageConfig;
368
369    use super::{reserve_memory_bytes, storage_memory_config};
370    use crate::ComputeNodeOpts;
371
372    #[test]
373    fn test_reserve_memory_bytes() {
374        // at least 512 MB
375        {
376            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
377            opts.total_memory_bytes = 1536 << 20;
378            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
379            assert_eq!(reserved, 512 << 20);
380            assert_eq!(non_reserved, 1024 << 20);
381        }
382
383        // reserve based on proportion
384        {
385            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
386            opts.total_memory_bytes = 10 << 30;
387            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
388            assert_eq!(reserved, 3 << 30);
389            assert_eq!(non_reserved, 7 << 30);
390        }
391
392        // reserve based on opts
393        {
394            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
395            opts.total_memory_bytes = 10 << 30;
396            opts.reserved_memory_bytes = Some(2 << 30);
397            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
398            assert_eq!(reserved, 2 << 30);
399            assert_eq!(non_reserved, 8 << 30);
400        }
401    }
402
403    #[test]
404    fn test_storage_memory_config() {
405        let mut storage_config = StorageConfig::default();
406
407        let total_non_reserved_memory_bytes = 8 << 30;
408
409        // Embedded compactor enabled, streaming node
410        let memory_config = storage_memory_config(
411            total_non_reserved_memory_bytes,
412            true,
413            &storage_config,
414            false,
415        );
416        assert_eq!(memory_config.block_cache_capacity_mb, 737);
417        assert_eq!(memory_config.meta_cache_capacity_mb, 860);
418        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
419        assert_eq!(memory_config.compactor_memory_limit_mb, 819);
420
421        // Embedded compactor disabled, serving node
422        let memory_config = storage_memory_config(
423            total_non_reserved_memory_bytes,
424            false,
425            &storage_config,
426            true,
427        );
428        assert_eq!(memory_config.block_cache_capacity_mb, 1966);
429        assert_eq!(memory_config.meta_cache_capacity_mb, 1146);
430        assert_eq!(memory_config.shared_buffer_capacity_mb, 1);
431        assert_eq!(memory_config.compactor_memory_limit_mb, 0);
432
433        // Embedded compactor enabled, streaming node, file cache
434        storage_config.data_file_cache.dir = "data".to_owned();
435        storage_config.meta_file_cache.dir = "meta".to_owned();
436        let memory_config = storage_memory_config(
437            total_non_reserved_memory_bytes,
438            true,
439            &storage_config,
440            false,
441        );
442        assert_eq!(memory_config.block_cache_capacity_mb, 737);
443        assert_eq!(memory_config.meta_cache_capacity_mb, 860);
444        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
445        assert_eq!(memory_config.compactor_memory_limit_mb, 819);
446
447        // Embedded compactor enabled, streaming node, file cache, cache capacities specified
448        storage_config.cache.block_cache_capacity_mb = Some(512);
449        storage_config.cache.meta_cache_capacity_mb = Some(128);
450        storage_config.shared_buffer_capacity_mb = Some(1024);
451        storage_config.compactor_memory_limit_mb = Some(512);
452        let memory_config = storage_memory_config(
453            total_non_reserved_memory_bytes,
454            true,
455            &storage_config,
456            false,
457        );
458        assert_eq!(memory_config.block_cache_capacity_mb, 512);
459        assert_eq!(memory_config.meta_cache_capacity_mb, 128);
460        assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
461        assert_eq!(memory_config.compactor_memory_limit_mb, 512);
462    }
463
464    #[test]
465    fn test_gradient_reserve_memory_bytes() {
466        assert_eq!(super::gradient_reserve_memory_bytes(4 << 30), 1288490188);
467        assert_eq!(super::gradient_reserve_memory_bytes(8 << 30), 2576980377);
468        assert_eq!(super::gradient_reserve_memory_bytes(16 << 30), 5153960755);
469        assert_eq!(super::gradient_reserve_memory_bytes(24 << 30), 6871947673);
470        assert_eq!(super::gradient_reserve_memory_bytes(32 << 30), 8589934591);
471        assert_eq!(super::gradient_reserve_memory_bytes(54 << 30), 13314398617);
472        assert_eq!(super::gradient_reserve_memory_bytes(64 << 30), 15461882265);
473        assert_eq!(super::gradient_reserve_memory_bytes(100 << 30), 23192823398);
474        assert_eq!(super::gradient_reserve_memory_bytes(128 << 30), 29205777612);
475    }
476}