risingwave_compute/memory/
config.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use foyer::{LfuConfig, LruConfig, S3FifoConfig};
16use risingwave_common::config::{
17    CacheEvictionConfig, EvictionConfig, MAX_BLOCK_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
18    MIN_BUFFER_SIZE_PER_SHARD, StorageConfig, StorageMemoryConfig,
19};
20use risingwave_common::util::pretty_bytes::convert;
21
22use crate::ComputeNodeOpts;
23
24/// The minimal memory requirement of computing tasks in megabytes.
25pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
26/// The memory reserved for system usage (stack and code segment of processes, allocation
27/// overhead, network buffer, etc.) in megabytes.
28pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;
29
30const RESERVED_MEMORY_LEVELS: [usize; 2] = [16 << 30, usize::MAX];
31
32const RESERVED_MEMORY_PROPORTIONS: [f64; 2] = [0.3, 0.2];
33
34const STORAGE_MEMORY_DEFAULT_PROPORTION: f64 = 0.3;
35const STORAGE_MEMORY_MAX_PROPORTION: f64 = 0.9;
36
37const COMPACTOR_MEMORY_DEFAULT_PROPORTION: f64 = 0.1;
38
39const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
40
41const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096;
42const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.4;
43const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
44
45/// The proportion of compute memory used for batch processing.
46const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING: f64 = 0.3;
47const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING: f64 = 0.6;
48
49/// Each compute node reserves some memory for stack and code segment of processes, allocation
50/// overhead, network buffer, etc. based on gradient reserve memory proportion. The reserve memory
51/// size must be larger than `MIN_SYSTEM_RESERVED_MEMORY_MB`
52pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
53    if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
54        panic!(
55            "The total memory size ({}) is too small. It must be at least {} MB.",
56            convert(opts.total_memory_bytes as _),
57            MIN_COMPUTE_MEMORY_MB
58        );
59    }
60
61    // If `reserved_memory_bytes` is not set, calculate total_memory_bytes based on gradient reserve memory proportion.
62    let reserved = opts
63        .reserved_memory_bytes
64        .unwrap_or_else(|| gradient_reserve_memory_bytes(opts.total_memory_bytes));
65
66    // Should have at least `MIN_SYSTEM_RESERVED_MEMORY_MB` for reserved memory.
67    let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);
68
69    if reserved >= opts.total_memory_bytes {
70        panic!(
71            "reserved memory ({}) >= total memory ({}).",
72            convert(reserved as _),
73            convert(opts.total_memory_bytes as _)
74        );
75    }
76
77    (reserved, opts.total_memory_bytes - reserved)
78}
79
80/// Calculate the reserved memory based on the total memory size.
81/// The reserved memory size is calculated based on the following gradient:
82/// - 30% of the first 16GB
83/// - 20% of the rest
84pub fn gradient_reserve_memory_bytes(total_memory_bytes: usize) -> usize {
85    let mut total_memory_bytes = total_memory_bytes;
86    let mut reserved = 0;
87    for i in 0..RESERVED_MEMORY_LEVELS.len() {
88        let level_diff = if i == 0 {
89            RESERVED_MEMORY_LEVELS[0]
90        } else {
91            RESERVED_MEMORY_LEVELS[i] - RESERVED_MEMORY_LEVELS[i - 1]
92        };
93        if total_memory_bytes <= level_diff {
94            reserved += (total_memory_bytes as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
95            break;
96        } else {
97            reserved += (level_diff as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
98            total_memory_bytes -= level_diff;
99        }
100    }
101
102    reserved
103}
104
105/// Decide the memory limit for each storage cache. If not specified in `StorageConfig`, memory
106/// limits are calculated based on the proportions to total `non_reserved_memory_bytes`.
107pub fn storage_memory_config(
108    non_reserved_memory_bytes: usize,
109    embedded_compactor_enabled: bool,
110    storage_config: &StorageConfig,
111    is_serving: bool,
112) -> StorageMemoryConfig {
113    let (default_storage_memory_proportion, default_compactor_memory_proportion) =
114        if embedded_compactor_enabled {
115            (
116                STORAGE_MEMORY_DEFAULT_PROPORTION,
117                COMPACTOR_MEMORY_DEFAULT_PROPORTION,
118            )
119        } else {
120            (
121                STORAGE_MEMORY_DEFAULT_PROPORTION + COMPACTOR_MEMORY_DEFAULT_PROPORTION,
122                0.0,
123            )
124        };
125
126    let default_storage_memory_bytes =
127        (non_reserved_memory_bytes as f64 * default_storage_memory_proportion).ceil();
128    let max_storage_memory_bytes =
129        (non_reserved_memory_bytes as f64 * STORAGE_MEMORY_MAX_PROPORTION).ceil();
130
131    let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
132        ((non_reserved_memory_bytes as f64 * default_compactor_memory_proportion).ceil() as usize)
133            >> 20,
134    );
135    tracing::debug!(
136        "default_storage_memory_bytes={} max_storage_memory_bytes={} compactor_memory_limit_mb={}",
137        default_storage_memory_bytes,
138        max_storage_memory_bytes,
139        compactor_memory_limit_mb
140    );
141
142    // Only if all cache capacities and compactor memory are specified and their sum doesn't exceed the max allowed storage_memory_bytes, will we use them.
143    // Other invalid combinations of the cache capacities and compactor memory config will be ignored.
144    let (
145        config_block_cache_capacity_mb,
146        config_meta_cache_capacity_mb,
147        config_shared_buffer_capacity_mb,
148    ) = match (
149        storage_config.cache.block_cache_capacity_mb,
150        storage_config.cache.meta_cache_capacity_mb,
151        storage_config.shared_buffer_capacity_mb,
152    ) {
153        (
154            Some(block_cache_capacity_mb),
155            Some(meta_cache_capacity_mb),
156            Some(shared_buffer_capacity_mb),
157        ) => {
158            let config_storage_memory_bytes = (block_cache_capacity_mb
159                + meta_cache_capacity_mb
160                + shared_buffer_capacity_mb
161                + compactor_memory_limit_mb)
162                << 20;
163            if config_storage_memory_bytes as f64 > max_storage_memory_bytes {
164                tracing::warn!(
165                    "config block_cache_capacity_mb {} + meta_cache_capacity_mb {} + shared_buffer_capacity_mb {} + compactor_memory_limit_mb {} = {} exceeds allowed storage_memory_bytes {}. These configs will be ignored.",
166                    block_cache_capacity_mb,
167                    meta_cache_capacity_mb,
168                    shared_buffer_capacity_mb,
169                    compactor_memory_limit_mb,
170                    convert(config_storage_memory_bytes as _),
171                    convert(max_storage_memory_bytes as _)
172                );
173                (None, None, None)
174            } else {
175                (
176                    Some(block_cache_capacity_mb),
177                    Some(meta_cache_capacity_mb),
178                    Some(shared_buffer_capacity_mb),
179                )
180            }
181        }
182        c => {
183            tracing::warn!(
184                "config (block_cache_capacity_mb, meta_cache_capacity_mb, shared_buffer_capacity_mb): {:?} should be set altogether. These configs will be ignored.",
185                c
186            );
187            (None, None, None)
188        }
189    };
190
191    let mut default_block_cache_capacity_mb =
192        ((default_storage_memory_bytes * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION).ceil() as usize)
193            >> 20;
194    let default_meta_cache_capacity_mb =
195        ((default_storage_memory_bytes * STORAGE_META_CACHE_MEMORY_PROPORTION).ceil() as usize)
196            >> 20;
197    let meta_cache_capacity_mb = config_meta_cache_capacity_mb.unwrap_or(
198        // adapt to old version
199        storage_config
200            .meta_cache_capacity_mb
201            .unwrap_or(default_meta_cache_capacity_mb),
202    );
203
204    let prefetch_buffer_capacity_mb = storage_config
205        .prefetch_buffer_capacity_mb
206        .unwrap_or(default_block_cache_capacity_mb);
207
208    if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
209        default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
210        default_block_cache_capacity_mb =
211            default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb);
212    }
213
214    let default_shared_buffer_capacity_mb =
215        ((default_storage_memory_bytes * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION).ceil() as usize)
216            >> 20;
217    let mut shared_buffer_capacity_mb = config_shared_buffer_capacity_mb.unwrap_or(std::cmp::min(
218        default_shared_buffer_capacity_mb,
219        STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
220    ));
221    if is_serving {
222        default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
223        // set 1 to pass internal check
224        shared_buffer_capacity_mb = 1;
225    } else if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
226        default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
227        default_block_cache_capacity_mb =
228            default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb);
229    }
230    let block_cache_capacity_mb = config_block_cache_capacity_mb.unwrap_or(
231        // adapt to old version
232        storage_config
233            .block_cache_capacity_mb
234            .unwrap_or(default_block_cache_capacity_mb),
235    );
236
237    // The file cache flush buffer threshold is used as a emergency limitation.
238    // On most cases the flush buffer is not supposed to be as large as the threshold.
239    // So, the file cache flush buffer threshold size is not calculated in the memory usage.
240    let block_file_cache_flush_buffer_threshold_mb = storage_config
241        .data_file_cache
242        .flush_buffer_threshold_mb
243        .unwrap_or(
244            risingwave_common::config::storage::default::storage::block_file_cache_flush_buffer_threshold_mb(
245            ),
246        );
247    let meta_file_cache_flush_buffer_threshold_mb = storage_config
248        .meta_file_cache
249        .flush_buffer_threshold_mb
250        .unwrap_or(
251            risingwave_common::config::storage::default::storage::meta_file_cache_flush_buffer_threshold_mb(
252            ),
253        );
254
255    let total_calculated_mb = block_cache_capacity_mb
256        + meta_cache_capacity_mb
257        + shared_buffer_capacity_mb
258        + compactor_memory_limit_mb;
259    let soft_limit_mb = max_storage_memory_bytes as usize >> 20;
260    // + 5 because ceil is used when calculating `total_bytes`.
261    if total_calculated_mb > soft_limit_mb + 5 {
262        tracing::warn!(
263            "The storage memory ({}) exceeds soft limit ({}).",
264            convert((total_calculated_mb << 20) as _),
265            convert((soft_limit_mb << 20) as _)
266        );
267    }
268
269    let meta_cache_shard_num = storage_config
270        .cache
271        .meta_cache_shard_num
272        .unwrap_or_else(|| {
273            let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
274            while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
275                && shard_bits > 0
276            {
277                shard_bits -= 1;
278            }
279            1 << shard_bits
280        });
281    let block_cache_shard_num = storage_config
282        .cache
283        .block_cache_shard_num
284        .unwrap_or_else(|| {
285            let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
286            while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
287                && shard_bits > 0
288            {
289                shard_bits -= 1;
290            }
291            1 << shard_bits
292        });
293
294    let get_eviction_config = |c: &CacheEvictionConfig| {
295        match c {
296        CacheEvictionConfig::Lru {
297            high_priority_ratio_in_percent,
298        } => EvictionConfig::Lru(LruConfig {
299            high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
300                // adapt to old version
301                storage_config.high_priority_ratio_in_percent.unwrap_or(
302                    risingwave_common::config::storage::default::storage::high_priority_ratio_in_percent(),
303                ),
304            ) as f64
305                / 100.0,
306        }),
307        CacheEvictionConfig::Lfu {
308            window_capacity_ratio_in_percent,
309            protected_capacity_ratio_in_percent,
310            cmsketch_eps,
311            cmsketch_confidence,
312        } => EvictionConfig::Lfu(LfuConfig {
313            window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or(
314                risingwave_common::config::storage::default::storage::window_capacity_ratio_in_percent(),
315            ) as f64
316                / 100.0,
317            protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or(
318                risingwave_common::config::storage::default::storage::protected_capacity_ratio_in_percent(),
319            ) as f64
320                / 100.0,
321            cmsketch_eps: cmsketch_eps
322                .unwrap_or(risingwave_common::config::storage::default::storage::cmsketch_eps()),
323            cmsketch_confidence: cmsketch_confidence
324                .unwrap_or(risingwave_common::config::storage::default::storage::cmsketch_confidence()),
325        }),
326        CacheEvictionConfig::S3Fifo {
327            small_queue_capacity_ratio_in_percent,
328            ghost_queue_capacity_ratio_in_percent,
329            small_to_main_freq_threshold,
330        } => EvictionConfig::S3Fifo(S3FifoConfig {
331            small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent.unwrap_or(
332                risingwave_common::config::storage::default::storage::small_queue_capacity_ratio_in_percent(
333                ),
334            ) as f64
335                / 100.0,
336            ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent.unwrap_or(
337                risingwave_common::config::storage::default::storage::ghost_queue_capacity_ratio_in_percent(
338                ),
339            ) as f64
340                / 100.0,
341            small_to_main_freq_threshold: small_to_main_freq_threshold.unwrap_or(
342                risingwave_common::config::storage::default::storage::small_to_main_freq_threshold(),
343            ),
344        }),
345    }
346    };
347
348    let block_cache_eviction_config =
349        get_eviction_config(&storage_config.cache.block_cache_eviction);
350    let meta_cache_eviction_config = get_eviction_config(&storage_config.cache.meta_cache_eviction);
351    let vector_block_cache_eviction_config =
352        get_eviction_config(&storage_config.cache.vector_block_cache_eviction_config);
353    let vector_meta_cache_eviction_config =
354        get_eviction_config(&storage_config.cache.vector_meta_cache_eviction_config);
355
356    StorageMemoryConfig {
357        block_cache_capacity_mb,
358        block_cache_shard_num,
359        meta_cache_capacity_mb,
360        meta_cache_shard_num,
361        vector_block_cache_capacity_mb: storage_config.cache.vector_block_cache_capacity_mb,
362        vector_block_cache_shard_num: storage_config.cache.vector_block_cache_shard_num,
363        vector_meta_cache_capacity_mb: storage_config.cache.vector_meta_cache_capacity_mb,
364        vector_meta_cache_shard_num: storage_config.cache.vector_meta_cache_shard_num,
365        shared_buffer_capacity_mb,
366        compactor_memory_limit_mb,
367        prefetch_buffer_capacity_mb,
368        block_cache_eviction_config,
369        meta_cache_eviction_config,
370        vector_block_cache_eviction_config,
371        vector_meta_cache_eviction_config,
372        block_file_cache_flush_buffer_threshold_mb,
373        meta_file_cache_flush_buffer_threshold_mb,
374    }
375}
376
377pub fn batch_mem_limit(compute_memory_bytes: usize, is_serving_node: bool) -> u64 {
378    if is_serving_node {
379        (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING) as u64
380    } else {
381        (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING) as u64
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use clap::Parser;
388    use risingwave_common::config::StorageConfig;
389
390    use super::{reserve_memory_bytes, storage_memory_config};
391    use crate::ComputeNodeOpts;
392
393    #[test]
394    fn test_reserve_memory_bytes() {
395        // at least 512 MB
396        {
397            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
398            opts.total_memory_bytes = 1536 << 20;
399            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
400            assert_eq!(reserved, 512 << 20);
401            assert_eq!(non_reserved, 1024 << 20);
402        }
403
404        // reserve based on proportion
405        {
406            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
407            opts.total_memory_bytes = 10 << 30;
408            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
409            assert_eq!(reserved, 3 << 30);
410            assert_eq!(non_reserved, 7 << 30);
411        }
412
413        // reserve based on opts
414        {
415            let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
416            opts.total_memory_bytes = 10 << 30;
417            opts.reserved_memory_bytes = Some(2 << 30);
418            let (reserved, non_reserved) = reserve_memory_bytes(&opts);
419            assert_eq!(reserved, 2 << 30);
420            assert_eq!(non_reserved, 8 << 30);
421        }
422    }
423
424    #[test]
425    fn test_storage_memory_config() {
426        let mut storage_config = StorageConfig::default();
427
428        let total_non_reserved_memory_bytes = 8 << 30;
429
430        // Embedded compactor enabled, streaming node
431        let memory_config = storage_memory_config(
432            total_non_reserved_memory_bytes,
433            true,
434            &storage_config,
435            false,
436        );
437        assert_eq!(memory_config.block_cache_capacity_mb, 737);
438        assert_eq!(memory_config.meta_cache_capacity_mb, 983);
439        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
440        assert_eq!(memory_config.compactor_memory_limit_mb, 819);
441
442        // Embedded compactor disabled, serving node
443        let memory_config = storage_memory_config(
444            total_non_reserved_memory_bytes,
445            false,
446            &storage_config,
447            true,
448        );
449        assert_eq!(memory_config.block_cache_capacity_mb, 1966);
450        assert_eq!(memory_config.meta_cache_capacity_mb, 1310);
451        assert_eq!(memory_config.shared_buffer_capacity_mb, 1);
452        assert_eq!(memory_config.compactor_memory_limit_mb, 0);
453
454        // Embedded compactor enabled, streaming node, file cache
455        storage_config.data_file_cache.dir = "data".to_owned();
456        storage_config.meta_file_cache.dir = "meta".to_owned();
457        let memory_config = storage_memory_config(
458            total_non_reserved_memory_bytes,
459            true,
460            &storage_config,
461            false,
462        );
463        assert_eq!(memory_config.block_cache_capacity_mb, 737);
464        assert_eq!(memory_config.meta_cache_capacity_mb, 983);
465        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
466        assert_eq!(memory_config.compactor_memory_limit_mb, 819);
467
468        // Embedded compactor enabled, streaming node, file cache, cache capacities specified
469        storage_config.cache.block_cache_capacity_mb = Some(512);
470        storage_config.cache.meta_cache_capacity_mb = Some(128);
471        storage_config.shared_buffer_capacity_mb = Some(1024);
472        storage_config.compactor_memory_limit_mb = Some(512);
473        let memory_config = storage_memory_config(
474            total_non_reserved_memory_bytes,
475            true,
476            &storage_config,
477            false,
478        );
479        assert_eq!(memory_config.block_cache_capacity_mb, 512);
480        assert_eq!(memory_config.meta_cache_capacity_mb, 128);
481        assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
482        assert_eq!(memory_config.compactor_memory_limit_mb, 512);
483
484        // Total storage memory is high but below proportion limit
485        // Check that specified config is correctly applied
486        storage_config.cache.meta_cache_capacity_mb = Some(5120);
487        let memory_config = storage_memory_config(
488            total_non_reserved_memory_bytes,
489            true,
490            &storage_config,
491            false,
492        );
493        assert_eq!(memory_config.block_cache_capacity_mb, 512);
494        assert_eq!(memory_config.meta_cache_capacity_mb, 5120);
495        assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
496        assert_eq!(memory_config.compactor_memory_limit_mb, 512);
497
498        // Total storage memory exceed proportion limit
499        // Check that specified config is ignored and fallback to defaults
500        storage_config.cache.meta_cache_capacity_mb = Some(10240);
501        let memory_config = storage_memory_config(
502            total_non_reserved_memory_bytes,
503            true,
504            &storage_config,
505            false,
506        );
507        assert_eq!(memory_config.block_cache_capacity_mb, 737);
508        assert_eq!(memory_config.meta_cache_capacity_mb, 983);
509        assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
510        assert_eq!(memory_config.compactor_memory_limit_mb, 512);
511    }
512
513    #[test]
514    fn test_gradient_reserve_memory_bytes() {
515        assert_eq!(super::gradient_reserve_memory_bytes(4 << 30), 1288490188);
516        assert_eq!(super::gradient_reserve_memory_bytes(8 << 30), 2576980377);
517        assert_eq!(super::gradient_reserve_memory_bytes(16 << 30), 5153960755);
518        assert_eq!(super::gradient_reserve_memory_bytes(24 << 30), 6871947673);
519        assert_eq!(super::gradient_reserve_memory_bytes(32 << 30), 8589934591);
520        assert_eq!(super::gradient_reserve_memory_bytes(54 << 30), 13314398617);
521        assert_eq!(super::gradient_reserve_memory_bytes(64 << 30), 15461882265);
522        assert_eq!(super::gradient_reserve_memory_bytes(100 << 30), 23192823398);
523        assert_eq!(super::gradient_reserve_memory_bytes(128 << 30), 29205777612);
524    }
525}