risingwave_compute/memory/
config.rs1use foyer::{LfuConfig, LruConfig, S3FifoConfig};
16use risingwave_common::config::{
17 CacheEvictionConfig, EvictionConfig, MAX_BLOCK_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
18 MIN_BUFFER_SIZE_PER_SHARD, StorageConfig, StorageMemoryConfig,
19};
20use risingwave_common::util::pretty_bytes::convert;
21
22use crate::ComputeNodeOpts;
23
24pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
26pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;
29
30const RESERVED_MEMORY_LEVELS: [usize; 2] = [16 << 30, usize::MAX];
31
32const RESERVED_MEMORY_PROPORTIONS: [f64; 2] = [0.3, 0.2];
33
34const STORAGE_MEMORY_PROPORTION: f64 = 0.3;
35
36const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;
37
38const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
39
40const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096;
41const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
42const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
43
44const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING: f64 = 0.3;
46const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING: f64 = 0.6;
47
48pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
52 if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
53 panic!(
54 "The total memory size ({}) is too small. It must be at least {} MB.",
55 convert(opts.total_memory_bytes as _),
56 MIN_COMPUTE_MEMORY_MB
57 );
58 }
59
60 let reserved = opts
62 .reserved_memory_bytes
63 .unwrap_or_else(|| gradient_reserve_memory_bytes(opts.total_memory_bytes));
64
65 let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);
67
68 if reserved >= opts.total_memory_bytes {
69 panic!(
70 "reserved memory ({}) >= total memory ({}).",
71 convert(reserved as _),
72 convert(opts.total_memory_bytes as _)
73 );
74 }
75
76 (reserved, opts.total_memory_bytes - reserved)
77}
78
79pub fn gradient_reserve_memory_bytes(total_memory_bytes: usize) -> usize {
84 let mut total_memory_bytes = total_memory_bytes;
85 let mut reserved = 0;
86 for i in 0..RESERVED_MEMORY_LEVELS.len() {
87 let level_diff = if i == 0 {
88 RESERVED_MEMORY_LEVELS[0]
89 } else {
90 RESERVED_MEMORY_LEVELS[i] - RESERVED_MEMORY_LEVELS[i - 1]
91 };
92 if total_memory_bytes <= level_diff {
93 reserved += (total_memory_bytes as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
94 break;
95 } else {
96 reserved += (level_diff as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
97 total_memory_bytes -= level_diff;
98 }
99 }
100
101 reserved
102}
103
104pub fn storage_memory_config(
107 non_reserved_memory_bytes: usize,
108 embedded_compactor_enabled: bool,
109 storage_config: &StorageConfig,
110 is_serving: bool,
111) -> StorageMemoryConfig {
112 let (storage_memory_proportion, compactor_memory_proportion) = if embedded_compactor_enabled {
113 (STORAGE_MEMORY_PROPORTION, COMPACTOR_MEMORY_PROPORTION)
114 } else {
115 (STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0)
116 };
117
118 let storage_memory_bytes = non_reserved_memory_bytes as f64 * storage_memory_proportion;
119
120 let (
123 config_block_cache_capacity_mb,
124 config_meta_cache_capacity_mb,
125 config_shared_buffer_capacity_mb,
126 ) = match (
127 storage_config.cache.block_cache_capacity_mb,
128 storage_config.cache.meta_cache_capacity_mb,
129 storage_config.shared_buffer_capacity_mb,
130 ) {
131 (
132 Some(block_cache_capacity_mb),
133 Some(meta_cache_capacity_mb),
134 Some(shared_buffer_capacity_mb),
135 ) => {
136 let config_storage_memory_bytes =
137 (block_cache_capacity_mb + meta_cache_capacity_mb + shared_buffer_capacity_mb)
138 << 20;
139 if config_storage_memory_bytes as f64 > storage_memory_bytes {
140 tracing::warn!(
141 "config block_cache_capacity_mb {} + meta_cache_capacity_mb {} + shared_buffer_capacity_mb {} = {} exceeds allowed storage_memory_bytes {}. These configs will be ignored.",
142 block_cache_capacity_mb,
143 meta_cache_capacity_mb,
144 shared_buffer_capacity_mb,
145 convert(config_storage_memory_bytes as _),
146 convert(storage_memory_bytes as _)
147 );
148 (None, None, None)
149 } else {
150 (
151 Some(block_cache_capacity_mb),
152 Some(meta_cache_capacity_mb),
153 Some(shared_buffer_capacity_mb),
154 )
155 }
156 }
157 c => {
158 tracing::warn!(
159 "config (block_cache_capacity_mb, meta_cache_capacity_mb, shared_buffer_capacity_mb): {:?} should be set altogether. These configs will be ignored.",
160 c
161 );
162 (None, None, None)
163 }
164 };
165
166 let mut default_block_cache_capacity_mb =
167 ((storage_memory_bytes * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
168 let default_meta_cache_capacity_mb =
169 ((storage_memory_bytes * STORAGE_META_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
170 let meta_cache_capacity_mb = config_meta_cache_capacity_mb.unwrap_or(
171 storage_config
173 .meta_cache_capacity_mb
174 .unwrap_or(default_meta_cache_capacity_mb),
175 );
176
177 let prefetch_buffer_capacity_mb = storage_config
178 .prefetch_buffer_capacity_mb
179 .unwrap_or(default_block_cache_capacity_mb);
180
181 if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
182 default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
183 default_block_cache_capacity_mb =
184 default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb);
185 }
186
187 let default_shared_buffer_capacity_mb =
188 ((storage_memory_bytes * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION).ceil() as usize) >> 20;
189 let mut shared_buffer_capacity_mb = config_shared_buffer_capacity_mb.unwrap_or(std::cmp::min(
190 default_shared_buffer_capacity_mb,
191 STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
192 ));
193 if is_serving {
194 default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
195 shared_buffer_capacity_mb = 1;
197 } else if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
198 default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
199 default_block_cache_capacity_mb =
200 default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb);
201 }
202 let block_cache_capacity_mb = config_block_cache_capacity_mb.unwrap_or(
203 storage_config
205 .block_cache_capacity_mb
206 .unwrap_or(default_block_cache_capacity_mb),
207 );
208
209 let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
210 ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20,
211 );
212
213 let block_file_cache_flush_buffer_threshold_mb = storage_config
217 .data_file_cache
218 .flush_buffer_threshold_mb
219 .unwrap_or(
220 risingwave_common::config::default::storage::block_file_cache_flush_buffer_threshold_mb(
221 ),
222 );
223 let meta_file_cache_flush_buffer_threshold_mb = storage_config
224 .meta_file_cache
225 .flush_buffer_threshold_mb
226 .unwrap_or(
227 risingwave_common::config::default::storage::meta_file_cache_flush_buffer_threshold_mb(
228 ),
229 );
230
231 let total_calculated_mb = block_cache_capacity_mb
232 + meta_cache_capacity_mb
233 + shared_buffer_capacity_mb
234 + compactor_memory_limit_mb;
235 let soft_limit_mb = (non_reserved_memory_bytes as f64
236 * (storage_memory_proportion + compactor_memory_proportion).ceil())
237 as usize
238 >> 20;
239 if total_calculated_mb > soft_limit_mb + 5 {
241 tracing::warn!(
242 "The storage memory ({}) exceeds soft limit ({}).",
243 convert((total_calculated_mb << 20) as _),
244 convert((soft_limit_mb << 20) as _)
245 );
246 }
247
248 let meta_cache_shard_num = storage_config
249 .cache
250 .meta_cache_shard_num
251 .unwrap_or_else(|| {
252 let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
253 while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
254 && shard_bits > 0
255 {
256 shard_bits -= 1;
257 }
258 1 << shard_bits
259 });
260 let block_cache_shard_num = storage_config
261 .cache
262 .block_cache_shard_num
263 .unwrap_or_else(|| {
264 let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
265 while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
266 && shard_bits > 0
267 {
268 shard_bits -= 1;
269 }
270 1 << shard_bits
271 });
272
273 let get_eviction_config = |c: &CacheEvictionConfig| match c {
274 CacheEvictionConfig::Lru {
275 high_priority_ratio_in_percent,
276 } => EvictionConfig::Lru(LruConfig {
277 high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
278 storage_config.high_priority_ratio_in_percent.unwrap_or(
280 risingwave_common::config::default::storage::high_priority_ratio_in_percent(),
281 ),
282 ) as f64
283 / 100.0,
284 }),
285 CacheEvictionConfig::Lfu {
286 window_capacity_ratio_in_percent,
287 protected_capacity_ratio_in_percent,
288 cmsketch_eps,
289 cmsketch_confidence,
290 } => EvictionConfig::Lfu(LfuConfig {
291 window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or(
292 risingwave_common::config::default::storage::window_capacity_ratio_in_percent(),
293 ) as f64
294 / 100.0,
295 protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or(
296 risingwave_common::config::default::storage::protected_capacity_ratio_in_percent(),
297 ) as f64
298 / 100.0,
299 cmsketch_eps: cmsketch_eps
300 .unwrap_or(risingwave_common::config::default::storage::cmsketch_eps()),
301 cmsketch_confidence: cmsketch_confidence
302 .unwrap_or(risingwave_common::config::default::storage::cmsketch_confidence()),
303 }),
304 CacheEvictionConfig::S3Fifo {
305 small_queue_capacity_ratio_in_percent,
306 ghost_queue_capacity_ratio_in_percent,
307 small_to_main_freq_threshold,
308 } => EvictionConfig::S3Fifo(S3FifoConfig {
309 small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent.unwrap_or(
310 risingwave_common::config::default::storage::small_queue_capacity_ratio_in_percent(
311 ),
312 ) as f64
313 / 100.0,
314 ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent.unwrap_or(
315 risingwave_common::config::default::storage::ghost_queue_capacity_ratio_in_percent(
316 ),
317 ) as f64
318 / 100.0,
319 small_to_main_freq_threshold: small_to_main_freq_threshold.unwrap_or(
320 risingwave_common::config::default::storage::small_to_main_freq_threshold(),
321 ),
322 }),
323 };
324
325 let block_cache_eviction_config =
326 get_eviction_config(&storage_config.cache.block_cache_eviction);
327 let meta_cache_eviction_config = get_eviction_config(&storage_config.cache.meta_cache_eviction);
328
329 StorageMemoryConfig {
330 block_cache_capacity_mb,
331 block_cache_shard_num,
332 meta_cache_capacity_mb,
333 meta_cache_shard_num,
334 shared_buffer_capacity_mb,
335 compactor_memory_limit_mb,
336 prefetch_buffer_capacity_mb,
337 block_cache_eviction_config,
338 meta_cache_eviction_config,
339 block_file_cache_flush_buffer_threshold_mb,
340 meta_file_cache_flush_buffer_threshold_mb,
341 }
342}
343
344pub fn batch_mem_limit(compute_memory_bytes: usize, is_serving_node: bool) -> u64 {
345 if is_serving_node {
346 (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING) as u64
347 } else {
348 (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING) as u64
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use clap::Parser;
355 use risingwave_common::config::StorageConfig;
356
357 use super::{reserve_memory_bytes, storage_memory_config};
358 use crate::ComputeNodeOpts;
359
360 #[test]
361 fn test_reserve_memory_bytes() {
362 {
364 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
365 opts.total_memory_bytes = 1536 << 20;
366 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
367 assert_eq!(reserved, 512 << 20);
368 assert_eq!(non_reserved, 1024 << 20);
369 }
370
371 {
373 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
374 opts.total_memory_bytes = 10 << 30;
375 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
376 assert_eq!(reserved, 3 << 30);
377 assert_eq!(non_reserved, 7 << 30);
378 }
379
380 {
382 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
383 opts.total_memory_bytes = 10 << 30;
384 opts.reserved_memory_bytes = Some(2 << 30);
385 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
386 assert_eq!(reserved, 2 << 30);
387 assert_eq!(non_reserved, 8 << 30);
388 }
389 }
390
391 #[test]
392 fn test_storage_memory_config() {
393 let mut storage_config = StorageConfig::default();
394
395 let total_non_reserved_memory_bytes = 8 << 30;
396
397 let memory_config = storage_memory_config(
399 total_non_reserved_memory_bytes,
400 true,
401 &storage_config,
402 false,
403 );
404 assert_eq!(memory_config.block_cache_capacity_mb, 737);
405 assert_eq!(memory_config.meta_cache_capacity_mb, 860);
406 assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
407 assert_eq!(memory_config.compactor_memory_limit_mb, 819);
408
409 let memory_config = storage_memory_config(
411 total_non_reserved_memory_bytes,
412 false,
413 &storage_config,
414 true,
415 );
416 assert_eq!(memory_config.block_cache_capacity_mb, 1966);
417 assert_eq!(memory_config.meta_cache_capacity_mb, 1146);
418 assert_eq!(memory_config.shared_buffer_capacity_mb, 1);
419 assert_eq!(memory_config.compactor_memory_limit_mb, 0);
420
421 storage_config.data_file_cache.dir = "data".to_owned();
423 storage_config.meta_file_cache.dir = "meta".to_owned();
424 let memory_config = storage_memory_config(
425 total_non_reserved_memory_bytes,
426 true,
427 &storage_config,
428 false,
429 );
430 assert_eq!(memory_config.block_cache_capacity_mb, 737);
431 assert_eq!(memory_config.meta_cache_capacity_mb, 860);
432 assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
433 assert_eq!(memory_config.compactor_memory_limit_mb, 819);
434
435 storage_config.cache.block_cache_capacity_mb = Some(512);
437 storage_config.cache.meta_cache_capacity_mb = Some(128);
438 storage_config.shared_buffer_capacity_mb = Some(1024);
439 storage_config.compactor_memory_limit_mb = Some(512);
440 let memory_config = storage_memory_config(
441 total_non_reserved_memory_bytes,
442 true,
443 &storage_config,
444 false,
445 );
446 assert_eq!(memory_config.block_cache_capacity_mb, 512);
447 assert_eq!(memory_config.meta_cache_capacity_mb, 128);
448 assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
449 assert_eq!(memory_config.compactor_memory_limit_mb, 512);
450 }
451
452 #[test]
453 fn test_gradient_reserve_memory_bytes() {
454 assert_eq!(super::gradient_reserve_memory_bytes(4 << 30), 1288490188);
455 assert_eq!(super::gradient_reserve_memory_bytes(8 << 30), 2576980377);
456 assert_eq!(super::gradient_reserve_memory_bytes(16 << 30), 5153960755);
457 assert_eq!(super::gradient_reserve_memory_bytes(24 << 30), 6871947673);
458 assert_eq!(super::gradient_reserve_memory_bytes(32 << 30), 8589934591);
459 assert_eq!(super::gradient_reserve_memory_bytes(54 << 30), 13314398617);
460 assert_eq!(super::gradient_reserve_memory_bytes(64 << 30), 15461882265);
461 assert_eq!(super::gradient_reserve_memory_bytes(100 << 30), 23192823398);
462 assert_eq!(super::gradient_reserve_memory_bytes(128 << 30), 29205777612);
463 }
464}