1use foyer::{LfuConfig, LruConfig, S3FifoConfig};
16use risingwave_common::config::{
17 CacheEvictionConfig, EvictionConfig, MAX_BLOCK_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
18 MIN_BUFFER_SIZE_PER_SHARD, StorageConfig, StorageMemoryConfig,
19};
20use risingwave_common::util::pretty_bytes::convert;
21
22use crate::ComputeNodeOpts;
23
24pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
26pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;
29
30const RESERVED_MEMORY_LEVELS: [usize; 2] = [16 << 30, usize::MAX];
31
32const RESERVED_MEMORY_PROPORTIONS: [f64; 2] = [0.3, 0.2];
33
34const STORAGE_MEMORY_DEFAULT_PROPORTION: f64 = 0.3;
35const STORAGE_MEMORY_MAX_PROPORTION: f64 = 0.9;
36
37const COMPACTOR_MEMORY_DEFAULT_PROPORTION: f64 = 0.1;
38
39const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
40
41const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096;
42const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.4;
43const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
44
45const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING: f64 = 0.3;
47const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING: f64 = 0.6;
48
49pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
53 if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
54 panic!(
55 "The total memory size ({}) is too small. It must be at least {} MB.",
56 convert(opts.total_memory_bytes as _),
57 MIN_COMPUTE_MEMORY_MB
58 );
59 }
60
61 let reserved = opts
63 .reserved_memory_bytes
64 .unwrap_or_else(|| gradient_reserve_memory_bytes(opts.total_memory_bytes));
65
66 let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);
68
69 if reserved >= opts.total_memory_bytes {
70 panic!(
71 "reserved memory ({}) >= total memory ({}).",
72 convert(reserved as _),
73 convert(opts.total_memory_bytes as _)
74 );
75 }
76
77 (reserved, opts.total_memory_bytes - reserved)
78}
79
80pub fn gradient_reserve_memory_bytes(total_memory_bytes: usize) -> usize {
85 let mut total_memory_bytes = total_memory_bytes;
86 let mut reserved = 0;
87 for i in 0..RESERVED_MEMORY_LEVELS.len() {
88 let level_diff = if i == 0 {
89 RESERVED_MEMORY_LEVELS[0]
90 } else {
91 RESERVED_MEMORY_LEVELS[i] - RESERVED_MEMORY_LEVELS[i - 1]
92 };
93 if total_memory_bytes <= level_diff {
94 reserved += (total_memory_bytes as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
95 break;
96 } else {
97 reserved += (level_diff as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
98 total_memory_bytes -= level_diff;
99 }
100 }
101
102 reserved
103}
104
105pub fn storage_memory_config(
108 non_reserved_memory_bytes: usize,
109 embedded_compactor_enabled: bool,
110 storage_config: &StorageConfig,
111 is_serving: bool,
112) -> StorageMemoryConfig {
113 let (default_storage_memory_proportion, default_compactor_memory_proportion) =
114 if embedded_compactor_enabled {
115 (
116 STORAGE_MEMORY_DEFAULT_PROPORTION,
117 COMPACTOR_MEMORY_DEFAULT_PROPORTION,
118 )
119 } else {
120 (
121 STORAGE_MEMORY_DEFAULT_PROPORTION + COMPACTOR_MEMORY_DEFAULT_PROPORTION,
122 0.0,
123 )
124 };
125
126 let default_storage_memory_bytes =
127 (non_reserved_memory_bytes as f64 * default_storage_memory_proportion).ceil();
128 let max_storage_memory_bytes =
129 (non_reserved_memory_bytes as f64 * STORAGE_MEMORY_MAX_PROPORTION).ceil();
130
131 let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
132 ((non_reserved_memory_bytes as f64 * default_compactor_memory_proportion).ceil() as usize)
133 >> 20,
134 );
135 tracing::debug!(
136 "default_storage_memory_bytes={} max_storage_memory_bytes={} compactor_memory_limit_mb={}",
137 default_storage_memory_bytes,
138 max_storage_memory_bytes,
139 compactor_memory_limit_mb
140 );
141
142 let (
145 config_block_cache_capacity_mb,
146 config_meta_cache_capacity_mb,
147 config_shared_buffer_capacity_mb,
148 ) = match (
149 storage_config.cache.block_cache_capacity_mb,
150 storage_config.cache.meta_cache_capacity_mb,
151 storage_config.shared_buffer_capacity_mb,
152 ) {
153 (
154 Some(block_cache_capacity_mb),
155 Some(meta_cache_capacity_mb),
156 Some(shared_buffer_capacity_mb),
157 ) => {
158 let config_storage_memory_bytes = (block_cache_capacity_mb
159 + meta_cache_capacity_mb
160 + shared_buffer_capacity_mb
161 + compactor_memory_limit_mb)
162 << 20;
163 if config_storage_memory_bytes as f64 > max_storage_memory_bytes {
164 tracing::warn!(
165 "config block_cache_capacity_mb {} + meta_cache_capacity_mb {} + shared_buffer_capacity_mb {} + compactor_memory_limit_mb {} = {} exceeds allowed storage_memory_bytes {}. These configs will be ignored.",
166 block_cache_capacity_mb,
167 meta_cache_capacity_mb,
168 shared_buffer_capacity_mb,
169 compactor_memory_limit_mb,
170 convert(config_storage_memory_bytes as _),
171 convert(max_storage_memory_bytes as _)
172 );
173 (None, None, None)
174 } else {
175 (
176 Some(block_cache_capacity_mb),
177 Some(meta_cache_capacity_mb),
178 Some(shared_buffer_capacity_mb),
179 )
180 }
181 }
182 c => {
183 tracing::warn!(
184 "config (block_cache_capacity_mb, meta_cache_capacity_mb, shared_buffer_capacity_mb): {:?} should be set altogether. These configs will be ignored.",
185 c
186 );
187 (None, None, None)
188 }
189 };
190
191 let mut default_block_cache_capacity_mb =
192 ((default_storage_memory_bytes * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION).ceil() as usize)
193 >> 20;
194 let default_meta_cache_capacity_mb =
195 ((default_storage_memory_bytes * STORAGE_META_CACHE_MEMORY_PROPORTION).ceil() as usize)
196 >> 20;
197 let meta_cache_capacity_mb = config_meta_cache_capacity_mb.unwrap_or(
198 storage_config
200 .meta_cache_capacity_mb
201 .unwrap_or(default_meta_cache_capacity_mb),
202 );
203
204 let prefetch_buffer_capacity_mb = storage_config
205 .prefetch_buffer_capacity_mb
206 .unwrap_or(default_block_cache_capacity_mb);
207
208 if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
209 default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
210 default_block_cache_capacity_mb =
211 default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb);
212 }
213
214 let default_shared_buffer_capacity_mb =
215 ((default_storage_memory_bytes * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION).ceil() as usize)
216 >> 20;
217 let mut shared_buffer_capacity_mb = config_shared_buffer_capacity_mb.unwrap_or(std::cmp::min(
218 default_shared_buffer_capacity_mb,
219 STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
220 ));
221 if is_serving {
222 default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
223 shared_buffer_capacity_mb = 1;
225 } else if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
226 default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
227 default_block_cache_capacity_mb =
228 default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb);
229 }
230 let block_cache_capacity_mb = config_block_cache_capacity_mb.unwrap_or(
231 storage_config
233 .block_cache_capacity_mb
234 .unwrap_or(default_block_cache_capacity_mb),
235 );
236
237 let block_file_cache_flush_buffer_threshold_mb = storage_config
241 .data_file_cache
242 .flush_buffer_threshold_mb
243 .unwrap_or(
244 risingwave_common::config::storage::default::storage::block_file_cache_flush_buffer_threshold_mb(
245 ),
246 );
247 let meta_file_cache_flush_buffer_threshold_mb = storage_config
248 .meta_file_cache
249 .flush_buffer_threshold_mb
250 .unwrap_or(
251 risingwave_common::config::storage::default::storage::meta_file_cache_flush_buffer_threshold_mb(
252 ),
253 );
254
255 let total_calculated_mb = block_cache_capacity_mb
256 + meta_cache_capacity_mb
257 + shared_buffer_capacity_mb
258 + compactor_memory_limit_mb;
259 let soft_limit_mb = max_storage_memory_bytes as usize >> 20;
260 if total_calculated_mb > soft_limit_mb + 5 {
262 tracing::warn!(
263 "The storage memory ({}) exceeds soft limit ({}).",
264 convert((total_calculated_mb << 20) as _),
265 convert((soft_limit_mb << 20) as _)
266 );
267 }
268
269 let meta_cache_shard_num = storage_config
270 .cache
271 .meta_cache_shard_num
272 .unwrap_or_else(|| {
273 let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
274 while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
275 && shard_bits > 0
276 {
277 shard_bits -= 1;
278 }
279 1 << shard_bits
280 });
281 let block_cache_shard_num = storage_config
282 .cache
283 .block_cache_shard_num
284 .unwrap_or_else(|| {
285 let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
286 while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
287 && shard_bits > 0
288 {
289 shard_bits -= 1;
290 }
291 1 << shard_bits
292 });
293
294 let get_eviction_config = |c: &CacheEvictionConfig| {
295 match c {
296 CacheEvictionConfig::Lru {
297 high_priority_ratio_in_percent,
298 } => EvictionConfig::Lru(LruConfig {
299 high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
300 storage_config.high_priority_ratio_in_percent.unwrap_or(
302 risingwave_common::config::storage::default::storage::high_priority_ratio_in_percent(),
303 ),
304 ) as f64
305 / 100.0,
306 }),
307 CacheEvictionConfig::Lfu {
308 window_capacity_ratio_in_percent,
309 protected_capacity_ratio_in_percent,
310 cmsketch_eps,
311 cmsketch_confidence,
312 } => EvictionConfig::Lfu(LfuConfig {
313 window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or(
314 risingwave_common::config::storage::default::storage::window_capacity_ratio_in_percent(),
315 ) as f64
316 / 100.0,
317 protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or(
318 risingwave_common::config::storage::default::storage::protected_capacity_ratio_in_percent(),
319 ) as f64
320 / 100.0,
321 cmsketch_eps: cmsketch_eps
322 .unwrap_or(risingwave_common::config::storage::default::storage::cmsketch_eps()),
323 cmsketch_confidence: cmsketch_confidence
324 .unwrap_or(risingwave_common::config::storage::default::storage::cmsketch_confidence()),
325 }),
326 CacheEvictionConfig::S3Fifo {
327 small_queue_capacity_ratio_in_percent,
328 ghost_queue_capacity_ratio_in_percent,
329 small_to_main_freq_threshold,
330 } => EvictionConfig::S3Fifo(S3FifoConfig {
331 small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent.unwrap_or(
332 risingwave_common::config::storage::default::storage::small_queue_capacity_ratio_in_percent(
333 ),
334 ) as f64
335 / 100.0,
336 ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent.unwrap_or(
337 risingwave_common::config::storage::default::storage::ghost_queue_capacity_ratio_in_percent(
338 ),
339 ) as f64
340 / 100.0,
341 small_to_main_freq_threshold: small_to_main_freq_threshold.unwrap_or(
342 risingwave_common::config::storage::default::storage::small_to_main_freq_threshold(),
343 ),
344 }),
345 }
346 };
347
348 let block_cache_eviction_config =
349 get_eviction_config(&storage_config.cache.block_cache_eviction);
350 let meta_cache_eviction_config = get_eviction_config(&storage_config.cache.meta_cache_eviction);
351 let vector_block_cache_eviction_config =
352 get_eviction_config(&storage_config.cache.vector_block_cache_eviction_config);
353 let vector_meta_cache_eviction_config =
354 get_eviction_config(&storage_config.cache.vector_meta_cache_eviction_config);
355
356 StorageMemoryConfig {
357 block_cache_capacity_mb,
358 block_cache_shard_num,
359 meta_cache_capacity_mb,
360 meta_cache_shard_num,
361 vector_block_cache_capacity_mb: storage_config.cache.vector_block_cache_capacity_mb,
362 vector_block_cache_shard_num: storage_config.cache.vector_block_cache_shard_num,
363 vector_meta_cache_capacity_mb: storage_config.cache.vector_meta_cache_capacity_mb,
364 vector_meta_cache_shard_num: storage_config.cache.vector_meta_cache_shard_num,
365 shared_buffer_capacity_mb,
366 compactor_memory_limit_mb,
367 prefetch_buffer_capacity_mb,
368 block_cache_eviction_config,
369 meta_cache_eviction_config,
370 vector_block_cache_eviction_config,
371 vector_meta_cache_eviction_config,
372 block_file_cache_flush_buffer_threshold_mb,
373 meta_file_cache_flush_buffer_threshold_mb,
374 }
375}
376
377pub fn batch_mem_limit(compute_memory_bytes: usize, is_serving_node: bool) -> u64 {
378 if is_serving_node {
379 (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING) as u64
380 } else {
381 (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING) as u64
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use clap::Parser;
388 use risingwave_common::config::StorageConfig;
389
390 use super::{reserve_memory_bytes, storage_memory_config};
391 use crate::ComputeNodeOpts;
392
393 #[test]
394 fn test_reserve_memory_bytes() {
395 {
397 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
398 opts.total_memory_bytes = 1536 << 20;
399 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
400 assert_eq!(reserved, 512 << 20);
401 assert_eq!(non_reserved, 1024 << 20);
402 }
403
404 {
406 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
407 opts.total_memory_bytes = 10 << 30;
408 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
409 assert_eq!(reserved, 3 << 30);
410 assert_eq!(non_reserved, 7 << 30);
411 }
412
413 {
415 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
416 opts.total_memory_bytes = 10 << 30;
417 opts.reserved_memory_bytes = Some(2 << 30);
418 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
419 assert_eq!(reserved, 2 << 30);
420 assert_eq!(non_reserved, 8 << 30);
421 }
422 }
423
424 #[test]
425 fn test_storage_memory_config() {
426 let mut storage_config = StorageConfig::default();
427
428 let total_non_reserved_memory_bytes = 8 << 30;
429
430 let memory_config = storage_memory_config(
432 total_non_reserved_memory_bytes,
433 true,
434 &storage_config,
435 false,
436 );
437 assert_eq!(memory_config.block_cache_capacity_mb, 737);
438 assert_eq!(memory_config.meta_cache_capacity_mb, 983);
439 assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
440 assert_eq!(memory_config.compactor_memory_limit_mb, 819);
441
442 let memory_config = storage_memory_config(
444 total_non_reserved_memory_bytes,
445 false,
446 &storage_config,
447 true,
448 );
449 assert_eq!(memory_config.block_cache_capacity_mb, 1966);
450 assert_eq!(memory_config.meta_cache_capacity_mb, 1310);
451 assert_eq!(memory_config.shared_buffer_capacity_mb, 1);
452 assert_eq!(memory_config.compactor_memory_limit_mb, 0);
453
454 storage_config.data_file_cache.dir = "data".to_owned();
456 storage_config.meta_file_cache.dir = "meta".to_owned();
457 let memory_config = storage_memory_config(
458 total_non_reserved_memory_bytes,
459 true,
460 &storage_config,
461 false,
462 );
463 assert_eq!(memory_config.block_cache_capacity_mb, 737);
464 assert_eq!(memory_config.meta_cache_capacity_mb, 983);
465 assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
466 assert_eq!(memory_config.compactor_memory_limit_mb, 819);
467
468 storage_config.cache.block_cache_capacity_mb = Some(512);
470 storage_config.cache.meta_cache_capacity_mb = Some(128);
471 storage_config.shared_buffer_capacity_mb = Some(1024);
472 storage_config.compactor_memory_limit_mb = Some(512);
473 let memory_config = storage_memory_config(
474 total_non_reserved_memory_bytes,
475 true,
476 &storage_config,
477 false,
478 );
479 assert_eq!(memory_config.block_cache_capacity_mb, 512);
480 assert_eq!(memory_config.meta_cache_capacity_mb, 128);
481 assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
482 assert_eq!(memory_config.compactor_memory_limit_mb, 512);
483
484 storage_config.cache.meta_cache_capacity_mb = Some(5120);
487 let memory_config = storage_memory_config(
488 total_non_reserved_memory_bytes,
489 true,
490 &storage_config,
491 false,
492 );
493 assert_eq!(memory_config.block_cache_capacity_mb, 512);
494 assert_eq!(memory_config.meta_cache_capacity_mb, 5120);
495 assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
496 assert_eq!(memory_config.compactor_memory_limit_mb, 512);
497
498 storage_config.cache.meta_cache_capacity_mb = Some(10240);
501 let memory_config = storage_memory_config(
502 total_non_reserved_memory_bytes,
503 true,
504 &storage_config,
505 false,
506 );
507 assert_eq!(memory_config.block_cache_capacity_mb, 737);
508 assert_eq!(memory_config.meta_cache_capacity_mb, 983);
509 assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
510 assert_eq!(memory_config.compactor_memory_limit_mb, 512);
511 }
512
513 #[test]
514 fn test_gradient_reserve_memory_bytes() {
515 assert_eq!(super::gradient_reserve_memory_bytes(4 << 30), 1288490188);
516 assert_eq!(super::gradient_reserve_memory_bytes(8 << 30), 2576980377);
517 assert_eq!(super::gradient_reserve_memory_bytes(16 << 30), 5153960755);
518 assert_eq!(super::gradient_reserve_memory_bytes(24 << 30), 6871947673);
519 assert_eq!(super::gradient_reserve_memory_bytes(32 << 30), 8589934591);
520 assert_eq!(super::gradient_reserve_memory_bytes(54 << 30), 13314398617);
521 assert_eq!(super::gradient_reserve_memory_bytes(64 << 30), 15461882265);
522 assert_eq!(super::gradient_reserve_memory_bytes(100 << 30), 23192823398);
523 assert_eq!(super::gradient_reserve_memory_bytes(128 << 30), 29205777612);
524 }
525}