risingwave_compute/memory/
config.rs1use foyer::{LfuConfig, LruConfig, S3FifoConfig};
16use risingwave_common::config::{
17 CacheEvictionConfig, EvictionConfig, MAX_BLOCK_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
18 MIN_BUFFER_SIZE_PER_SHARD, StorageConfig, StorageMemoryConfig,
19};
20use risingwave_common::util::pretty_bytes::convert;
21
22use crate::ComputeNodeOpts;
23
24pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
26pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;
29
30const RESERVED_MEMORY_LEVELS: [usize; 2] = [16 << 30, usize::MAX];
31
32const RESERVED_MEMORY_PROPORTIONS: [f64; 2] = [0.3, 0.2];
33
34const STORAGE_MEMORY_PROPORTION: f64 = 0.3;
35
36const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;
37
38const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
39
40const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096;
41const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
42const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
43
44const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING: f64 = 0.3;
46const COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING: f64 = 0.6;
47
48pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
52 if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
53 panic!(
54 "The total memory size ({}) is too small. It must be at least {} MB.",
55 convert(opts.total_memory_bytes as _),
56 MIN_COMPUTE_MEMORY_MB
57 );
58 }
59
60 let reserved = opts
62 .reserved_memory_bytes
63 .unwrap_or_else(|| gradient_reserve_memory_bytes(opts.total_memory_bytes));
64
65 let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);
67
68 if reserved >= opts.total_memory_bytes {
69 panic!(
70 "reserved memory ({}) >= total memory ({}).",
71 convert(reserved as _),
72 convert(opts.total_memory_bytes as _)
73 );
74 }
75
76 (reserved, opts.total_memory_bytes - reserved)
77}
78
79pub fn gradient_reserve_memory_bytes(total_memory_bytes: usize) -> usize {
84 let mut total_memory_bytes = total_memory_bytes;
85 let mut reserved = 0;
86 for i in 0..RESERVED_MEMORY_LEVELS.len() {
87 let level_diff = if i == 0 {
88 RESERVED_MEMORY_LEVELS[0]
89 } else {
90 RESERVED_MEMORY_LEVELS[i] - RESERVED_MEMORY_LEVELS[i - 1]
91 };
92 if total_memory_bytes <= level_diff {
93 reserved += (total_memory_bytes as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
94 break;
95 } else {
96 reserved += (level_diff as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
97 total_memory_bytes -= level_diff;
98 }
99 }
100
101 reserved
102}
103
104pub fn storage_memory_config(
107 non_reserved_memory_bytes: usize,
108 embedded_compactor_enabled: bool,
109 storage_config: &StorageConfig,
110 is_serving: bool,
111) -> StorageMemoryConfig {
112 let (storage_memory_proportion, compactor_memory_proportion) = if embedded_compactor_enabled {
113 (STORAGE_MEMORY_PROPORTION, COMPACTOR_MEMORY_PROPORTION)
114 } else {
115 (STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0)
116 };
117
118 let storage_memory_bytes = non_reserved_memory_bytes as f64 * storage_memory_proportion;
119
120 let (
123 config_block_cache_capacity_mb,
124 config_meta_cache_capacity_mb,
125 config_shared_buffer_capacity_mb,
126 ) = match (
127 storage_config.cache.block_cache_capacity_mb,
128 storage_config.cache.meta_cache_capacity_mb,
129 storage_config.shared_buffer_capacity_mb,
130 ) {
131 (
132 Some(block_cache_capacity_mb),
133 Some(meta_cache_capacity_mb),
134 Some(shared_buffer_capacity_mb),
135 ) => {
136 let config_storage_memory_bytes =
137 (block_cache_capacity_mb + meta_cache_capacity_mb + shared_buffer_capacity_mb)
138 << 20;
139 if config_storage_memory_bytes as f64 > storage_memory_bytes {
140 tracing::warn!(
141 "config block_cache_capacity_mb {} + meta_cache_capacity_mb {} + shared_buffer_capacity_mb {} = {} exceeds allowed storage_memory_bytes {}. These configs will be ignored.",
142 block_cache_capacity_mb,
143 meta_cache_capacity_mb,
144 shared_buffer_capacity_mb,
145 convert(config_storage_memory_bytes as _),
146 convert(storage_memory_bytes as _)
147 );
148 (None, None, None)
149 } else {
150 (
151 Some(block_cache_capacity_mb),
152 Some(meta_cache_capacity_mb),
153 Some(shared_buffer_capacity_mb),
154 )
155 }
156 }
157 c => {
158 tracing::warn!(
159 "config (block_cache_capacity_mb, meta_cache_capacity_mb, shared_buffer_capacity_mb): {:?} should be set altogether. These configs will be ignored.",
160 c
161 );
162 (None, None, None)
163 }
164 };
165
166 let mut default_block_cache_capacity_mb =
167 ((storage_memory_bytes * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
168 let default_meta_cache_capacity_mb =
169 ((storage_memory_bytes * STORAGE_META_CACHE_MEMORY_PROPORTION).ceil() as usize) >> 20;
170 let meta_cache_capacity_mb = config_meta_cache_capacity_mb.unwrap_or(
171 storage_config
173 .meta_cache_capacity_mb
174 .unwrap_or(default_meta_cache_capacity_mb),
175 );
176
177 let prefetch_buffer_capacity_mb = storage_config
178 .prefetch_buffer_capacity_mb
179 .unwrap_or(default_block_cache_capacity_mb);
180
181 if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
182 default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
183 default_block_cache_capacity_mb =
184 default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb);
185 }
186
187 let default_shared_buffer_capacity_mb =
188 ((storage_memory_bytes * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION).ceil() as usize) >> 20;
189 let mut shared_buffer_capacity_mb = config_shared_buffer_capacity_mb.unwrap_or(std::cmp::min(
190 default_shared_buffer_capacity_mb,
191 STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
192 ));
193 if is_serving {
194 default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
195 shared_buffer_capacity_mb = 1;
197 } else if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
198 default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
199 default_block_cache_capacity_mb =
200 default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb);
201 }
202 let block_cache_capacity_mb = config_block_cache_capacity_mb.unwrap_or(
203 storage_config
205 .block_cache_capacity_mb
206 .unwrap_or(default_block_cache_capacity_mb),
207 );
208
209 let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
210 ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20,
211 );
212
213 let block_file_cache_flush_buffer_threshold_mb = storage_config
217 .data_file_cache
218 .flush_buffer_threshold_mb
219 .unwrap_or(
220 risingwave_common::config::default::storage::block_file_cache_flush_buffer_threshold_mb(
221 ),
222 );
223 let meta_file_cache_flush_buffer_threshold_mb = storage_config
224 .meta_file_cache
225 .flush_buffer_threshold_mb
226 .unwrap_or(
227 risingwave_common::config::default::storage::meta_file_cache_flush_buffer_threshold_mb(
228 ),
229 );
230
231 let total_calculated_mb = block_cache_capacity_mb
232 + meta_cache_capacity_mb
233 + shared_buffer_capacity_mb
234 + compactor_memory_limit_mb;
235 let soft_limit_mb = (non_reserved_memory_bytes as f64
236 * (storage_memory_proportion + compactor_memory_proportion).ceil())
237 as usize
238 >> 20;
239 if total_calculated_mb > soft_limit_mb + 5 {
241 tracing::warn!(
242 "The storage memory ({}) exceeds soft limit ({}).",
243 convert((total_calculated_mb << 20) as _),
244 convert((soft_limit_mb << 20) as _)
245 );
246 }
247
248 let meta_cache_shard_num = storage_config
249 .cache
250 .meta_cache_shard_num
251 .unwrap_or_else(|| {
252 let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
253 while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
254 && shard_bits > 0
255 {
256 shard_bits -= 1;
257 }
258 1 << shard_bits
259 });
260 let block_cache_shard_num = storage_config
261 .cache
262 .block_cache_shard_num
263 .unwrap_or_else(|| {
264 let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
265 while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
266 && shard_bits > 0
267 {
268 shard_bits -= 1;
269 }
270 1 << shard_bits
271 });
272
273 let get_eviction_config = |c: &CacheEvictionConfig| match c {
274 CacheEvictionConfig::Lru {
275 high_priority_ratio_in_percent,
276 } => EvictionConfig::Lru(LruConfig {
277 high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
278 storage_config.high_priority_ratio_in_percent.unwrap_or(
280 risingwave_common::config::default::storage::high_priority_ratio_in_percent(),
281 ),
282 ) as f64
283 / 100.0,
284 }),
285 CacheEvictionConfig::Lfu {
286 window_capacity_ratio_in_percent,
287 protected_capacity_ratio_in_percent,
288 cmsketch_eps,
289 cmsketch_confidence,
290 } => EvictionConfig::Lfu(LfuConfig {
291 window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or(
292 risingwave_common::config::default::storage::window_capacity_ratio_in_percent(),
293 ) as f64
294 / 100.0,
295 protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or(
296 risingwave_common::config::default::storage::protected_capacity_ratio_in_percent(),
297 ) as f64
298 / 100.0,
299 cmsketch_eps: cmsketch_eps
300 .unwrap_or(risingwave_common::config::default::storage::cmsketch_eps()),
301 cmsketch_confidence: cmsketch_confidence
302 .unwrap_or(risingwave_common::config::default::storage::cmsketch_confidence()),
303 }),
304 CacheEvictionConfig::S3Fifo {
305 small_queue_capacity_ratio_in_percent,
306 ghost_queue_capacity_ratio_in_percent,
307 small_to_main_freq_threshold,
308 } => EvictionConfig::S3Fifo(S3FifoConfig {
309 small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent.unwrap_or(
310 risingwave_common::config::default::storage::small_queue_capacity_ratio_in_percent(
311 ),
312 ) as f64
313 / 100.0,
314 ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent.unwrap_or(
315 risingwave_common::config::default::storage::ghost_queue_capacity_ratio_in_percent(
316 ),
317 ) as f64
318 / 100.0,
319 small_to_main_freq_threshold: small_to_main_freq_threshold.unwrap_or(
320 risingwave_common::config::default::storage::small_to_main_freq_threshold(),
321 ),
322 }),
323 };
324
325 let block_cache_eviction_config =
326 get_eviction_config(&storage_config.cache.block_cache_eviction);
327 let meta_cache_eviction_config = get_eviction_config(&storage_config.cache.meta_cache_eviction);
328 let vector_block_cache_eviction_config =
329 get_eviction_config(&storage_config.cache.vector_block_cache_eviction_config);
330 let vector_meta_cache_eviction_config =
331 get_eviction_config(&storage_config.cache.vector_meta_cache_eviction_config);
332
333 StorageMemoryConfig {
334 block_cache_capacity_mb,
335 block_cache_shard_num,
336 meta_cache_capacity_mb,
337 meta_cache_shard_num,
338 vector_block_cache_capacity_mb: storage_config.cache.vector_block_cache_capacity_mb,
339 vector_block_cache_shard_num: storage_config.cache.vector_block_cache_shard_num,
340 vector_meta_cache_capacity_mb: storage_config.cache.vector_meta_cache_capacity_mb,
341 vector_meta_cache_shard_num: storage_config.cache.vector_meta_cache_shard_num,
342 shared_buffer_capacity_mb,
343 compactor_memory_limit_mb,
344 prefetch_buffer_capacity_mb,
345 block_cache_eviction_config,
346 meta_cache_eviction_config,
347 vector_block_cache_eviction_config,
348 vector_meta_cache_eviction_config,
349 block_file_cache_flush_buffer_threshold_mb,
350 meta_file_cache_flush_buffer_threshold_mb,
351 }
352}
353
354pub fn batch_mem_limit(compute_memory_bytes: usize, is_serving_node: bool) -> u64 {
355 if is_serving_node {
356 (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_SERVING) as u64
357 } else {
358 (compute_memory_bytes as f64 * COMPUTE_BATCH_MEMORY_PROPORTION_FOR_STREAMING) as u64
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use clap::Parser;
365 use risingwave_common::config::StorageConfig;
366
367 use super::{reserve_memory_bytes, storage_memory_config};
368 use crate::ComputeNodeOpts;
369
370 #[test]
371 fn test_reserve_memory_bytes() {
372 {
374 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
375 opts.total_memory_bytes = 1536 << 20;
376 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
377 assert_eq!(reserved, 512 << 20);
378 assert_eq!(non_reserved, 1024 << 20);
379 }
380
381 {
383 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
384 opts.total_memory_bytes = 10 << 30;
385 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
386 assert_eq!(reserved, 3 << 30);
387 assert_eq!(non_reserved, 7 << 30);
388 }
389
390 {
392 let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
393 opts.total_memory_bytes = 10 << 30;
394 opts.reserved_memory_bytes = Some(2 << 30);
395 let (reserved, non_reserved) = reserve_memory_bytes(&opts);
396 assert_eq!(reserved, 2 << 30);
397 assert_eq!(non_reserved, 8 << 30);
398 }
399 }
400
401 #[test]
402 fn test_storage_memory_config() {
403 let mut storage_config = StorageConfig::default();
404
405 let total_non_reserved_memory_bytes = 8 << 30;
406
407 let memory_config = storage_memory_config(
409 total_non_reserved_memory_bytes,
410 true,
411 &storage_config,
412 false,
413 );
414 assert_eq!(memory_config.block_cache_capacity_mb, 737);
415 assert_eq!(memory_config.meta_cache_capacity_mb, 860);
416 assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
417 assert_eq!(memory_config.compactor_memory_limit_mb, 819);
418
419 let memory_config = storage_memory_config(
421 total_non_reserved_memory_bytes,
422 false,
423 &storage_config,
424 true,
425 );
426 assert_eq!(memory_config.block_cache_capacity_mb, 1966);
427 assert_eq!(memory_config.meta_cache_capacity_mb, 1146);
428 assert_eq!(memory_config.shared_buffer_capacity_mb, 1);
429 assert_eq!(memory_config.compactor_memory_limit_mb, 0);
430
431 storage_config.data_file_cache.dir = "data".to_owned();
433 storage_config.meta_file_cache.dir = "meta".to_owned();
434 let memory_config = storage_memory_config(
435 total_non_reserved_memory_bytes,
436 true,
437 &storage_config,
438 false,
439 );
440 assert_eq!(memory_config.block_cache_capacity_mb, 737);
441 assert_eq!(memory_config.meta_cache_capacity_mb, 860);
442 assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
443 assert_eq!(memory_config.compactor_memory_limit_mb, 819);
444
445 storage_config.cache.block_cache_capacity_mb = Some(512);
447 storage_config.cache.meta_cache_capacity_mb = Some(128);
448 storage_config.shared_buffer_capacity_mb = Some(1024);
449 storage_config.compactor_memory_limit_mb = Some(512);
450 let memory_config = storage_memory_config(
451 total_non_reserved_memory_bytes,
452 true,
453 &storage_config,
454 false,
455 );
456 assert_eq!(memory_config.block_cache_capacity_mb, 512);
457 assert_eq!(memory_config.meta_cache_capacity_mb, 128);
458 assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
459 assert_eq!(memory_config.compactor_memory_limit_mb, 512);
460 }
461
462 #[test]
463 fn test_gradient_reserve_memory_bytes() {
464 assert_eq!(super::gradient_reserve_memory_bytes(4 << 30), 1288490188);
465 assert_eq!(super::gradient_reserve_memory_bytes(8 << 30), 2576980377);
466 assert_eq!(super::gradient_reserve_memory_bytes(16 << 30), 5153960755);
467 assert_eq!(super::gradient_reserve_memory_bytes(24 << 30), 6871947673);
468 assert_eq!(super::gradient_reserve_memory_bytes(32 << 30), 8589934591);
469 assert_eq!(super::gradient_reserve_memory_bytes(54 << 30), 13314398617);
470 assert_eq!(super::gradient_reserve_memory_bytes(64 << 30), 15461882265);
471 assert_eq!(super::gradient_reserve_memory_bytes(100 << 30), 23192823398);
472 assert_eq!(super::gradient_reserve_memory_bytes(128 << 30), 29205777612);
473 }
474}