1use foyer::{
16 Compression, LfuConfig, LruConfig, RecoverMode, RuntimeOptions, S3FifoConfig, Throttle,
17};
18use serde::de::Error as _;
19
20use super::*;
21
22#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
24#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
25pub struct StorageConfig {
26 #[serde(default = "default::storage::share_buffers_sync_parallelism")]
28 pub share_buffers_sync_parallelism: u32,
29
30 #[serde(default = "default::storage::share_buffer_compaction_worker_threads_number")]
33 pub share_buffer_compaction_worker_threads_number: u32,
34
35 #[serde(default)]
40 pub shared_buffer_capacity_mb: Option<usize>,
41
42 #[serde(default = "default::storage::shared_buffer_flush_ratio")]
45 pub shared_buffer_flush_ratio: f32,
46
47 #[serde(default = "default::storage::shared_buffer_min_batch_flush_size_mb")]
50 pub shared_buffer_min_batch_flush_size_mb: usize,
51
52 #[serde(default = "default::storage::imm_merge_threshold")]
54 #[deprecated]
55 pub imm_merge_threshold: usize,
56
57 #[serde(default = "default::storage::write_conflict_detection_enabled")]
59 pub write_conflict_detection_enabled: bool,
60
61 #[serde(default)]
62 #[config_doc(nested)]
63 pub cache: CacheConfig,
64
65 #[serde(default)]
67 pub block_cache_capacity_mb: Option<usize>,
68
69 #[serde(default)]
71 pub meta_cache_capacity_mb: Option<usize>,
72
73 #[serde(default)]
75 pub high_priority_ratio_in_percent: Option<usize>,
76
77 #[serde(default)]
79 pub prefetch_buffer_capacity_mb: Option<usize>,
80
81 #[serde(default = "default::storage::max_cached_recent_versions_number")]
82 pub max_cached_recent_versions_number: usize,
83
84 #[serde(
86 default = "default::storage::max_prefetch_block_number",
87 deserialize_with = "deserialize_max_prefetch_block_number"
88 )]
89 pub max_prefetch_block_number: usize,
90
91 #[serde(default = "default::storage::disable_remote_compactor")]
92 pub disable_remote_compactor: bool,
93
94 #[serde(default = "default::storage::share_buffer_upload_concurrency")]
96 pub share_buffer_upload_concurrency: usize,
97
98 #[serde(default)]
99 pub compactor_memory_limit_mb: Option<usize>,
100
101 #[serde(default = "default::storage::compactor_max_task_multiplier")]
105 pub compactor_max_task_multiplier: f32,
106
107 #[serde(default = "default::storage::compactor_memory_available_proportion")]
110 pub compactor_memory_available_proportion: f64,
111
112 #[serde(default = "default::storage::sstable_id_remote_fetch_number")]
114 pub sstable_id_remote_fetch_number: u32,
115
116 #[serde(default = "default::storage::min_sstable_size_mb")]
117 pub min_sstable_size_mb: u32,
118
119 #[serde(default)]
120 #[config_doc(nested)]
121 pub data_file_cache: FileCacheConfig,
122
123 #[serde(default)]
124 #[config_doc(nested)]
125 pub meta_file_cache: FileCacheConfig,
126
127 #[serde(default = "default::storage::sst_skip_bloom_filter_in_serde")]
131 pub sst_skip_bloom_filter_in_serde: bool,
132
133 #[serde(default)]
134 #[config_doc(nested)]
135 pub cache_refill: CacheRefillConfig,
136
137 #[serde(default = "default::storage::min_sst_size_for_streaming_upload")]
139 pub min_sst_size_for_streaming_upload: u64,
140
141 #[serde(default = "default::storage::max_concurrent_compaction_task_number")]
142 pub max_concurrent_compaction_task_number: u64,
143
144 #[serde(default = "default::storage::max_preload_wait_time_mill")]
145 pub max_preload_wait_time_mill: u64,
146
147 #[serde(default = "default::storage::max_version_pinning_duration_sec")]
148 pub max_version_pinning_duration_sec: u64,
149
150 #[serde(default = "default::storage::compactor_max_sst_key_count")]
151 pub compactor_max_sst_key_count: u64,
152 #[serde(default = "default::storage::compact_iter_recreate_timeout_ms")]
154 pub compact_iter_recreate_timeout_ms: u64,
155 #[serde(default = "default::storage::compactor_max_sst_size")]
156 pub compactor_max_sst_size: u64,
157 #[serde(default = "default::storage::enable_fast_compaction")]
158 pub enable_fast_compaction: bool,
159 #[serde(default = "default::storage::check_compaction_result")]
160 pub check_compaction_result: bool,
161 #[serde(default = "default::storage::max_preload_io_retry_times")]
162 pub max_preload_io_retry_times: usize,
163 #[serde(default = "default::storage::compactor_fast_max_compact_delete_ratio")]
164 pub compactor_fast_max_compact_delete_ratio: u32,
165 #[serde(default = "default::storage::compactor_fast_max_compact_task_size")]
166 pub compactor_fast_max_compact_task_size: u64,
167 #[serde(default = "default::storage::compactor_iter_max_io_retry_times")]
168 pub compactor_iter_max_io_retry_times: usize,
169
170 #[serde(default = "default::storage::shorten_block_meta_key_threshold")]
175 pub shorten_block_meta_key_threshold: Option<usize>,
176
177 #[serde(default = "default::storage::table_info_statistic_history_times")]
179 #[deprecated]
180 pub table_info_statistic_history_times: usize,
181
182 #[serde(default, flatten)]
183 #[config_doc(omitted)]
184 pub unrecognized: Unrecognized<Self>,
185
186 #[serde(default = "default::storage::mem_table_spill_threshold")]
188 pub mem_table_spill_threshold: usize,
189
190 #[serde(default = "default::storage::compactor_concurrent_uploading_sst_count")]
192 pub compactor_concurrent_uploading_sst_count: Option<usize>,
193
194 #[serde(default = "default::storage::compactor_max_overlap_sst_count")]
195 pub compactor_max_overlap_sst_count: usize,
196
197 #[serde(default = "default::storage::compactor_max_preload_meta_file_count")]
201 pub compactor_max_preload_meta_file_count: usize,
202
203 #[serde(default = "default::storage::vector_file_block_size_kb")]
204 pub vector_file_block_size_kb: usize,
205
206 #[serde(default)]
211 pub object_store: ObjectStoreConfig,
212
213 #[serde(default = "default::storage::time_travel_version_cache_capacity")]
214 pub time_travel_version_cache_capacity: u64,
215
216 #[serde(default = "default::storage::table_change_log_cache_capacity")]
217 pub table_change_log_cache_capacity: u64,
218
219 #[serde(default = "default::storage::iceberg_compaction_enable_validate")]
221 pub iceberg_compaction_enable_validate: bool,
222 #[serde(default = "default::storage::iceberg_compaction_max_record_batch_rows")]
223 pub iceberg_compaction_max_record_batch_rows: usize,
224 #[serde(default = "default::storage::iceberg_compaction_min_size_per_partition_mb")]
225 pub iceberg_compaction_min_size_per_partition_mb: u32,
226 #[serde(default = "default::storage::iceberg_compaction_max_file_count_per_partition")]
227 pub iceberg_compaction_max_file_count_per_partition: u32,
228 #[serde(default = "default::storage::iceberg_compaction_write_parquet_max_row_group_rows")]
231 #[deprecated(
232 note = "This config is deprecated. Use sink config `compaction.write_parquet_max_row_group_rows` instead."
233 )]
234 pub iceberg_compaction_write_parquet_max_row_group_rows: usize,
235
236 #[serde(default = "default::storage::iceberg_compaction_task_parallelism_ratio")]
238 pub iceberg_compaction_task_parallelism_ratio: f32,
239 #[serde(default = "default::storage::iceberg_compaction_enable_heuristic_output_parallelism")]
241 pub iceberg_compaction_enable_heuristic_output_parallelism: bool,
242 #[serde(default = "default::storage::iceberg_compaction_max_concurrent_closes")]
244 pub iceberg_compaction_max_concurrent_closes: usize,
245 #[serde(default = "default::storage::iceberg_compaction_enable_dynamic_size_estimation")]
247 pub iceberg_compaction_enable_dynamic_size_estimation: bool,
248 #[serde(default = "default::storage::iceberg_compaction_size_estimation_smoothing_factor")]
250 pub iceberg_compaction_size_estimation_smoothing_factor: f64,
251 #[serde(
255 default = "default::storage::iceberg_compaction_pending_parallelism_budget_multiplier"
256 )]
257 pub iceberg_compaction_pending_parallelism_budget_multiplier: f32,
258
259 #[serde(default = "default::storage::iceberg_compaction_target_binpack_group_size_mb")]
260 pub iceberg_compaction_target_binpack_group_size_mb: Option<u64>,
261 #[serde(default = "default::storage::iceberg_compaction_min_group_size_mb")]
262 pub iceberg_compaction_min_group_size_mb: Option<u64>,
263 #[serde(default = "default::storage::iceberg_compaction_min_group_file_count")]
264 pub iceberg_compaction_min_group_file_count: Option<usize>,
265}
266
267#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
269#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
270pub struct CacheConfig {
271 #[serde(default)]
276 pub block_cache_capacity_mb: Option<usize>,
277
278 #[serde(default)]
281 pub block_cache_shard_num: Option<usize>,
282
283 #[serde(default)]
284 #[config_doc(omitted)]
285 pub block_cache_eviction: CacheEvictionConfig,
286
287 #[serde(default)]
292 pub meta_cache_capacity_mb: Option<usize>,
293
294 #[serde(default)]
297 pub meta_cache_shard_num: Option<usize>,
298
299 #[serde(default)]
300 #[config_doc(omitted)]
301 pub meta_cache_eviction: CacheEvictionConfig,
302
303 #[serde(default = "default::storage::vector_block_cache_capacity_mb")]
304 pub vector_block_cache_capacity_mb: usize,
305 #[serde(default = "default::storage::vector_block_cache_shard_num")]
306 pub vector_block_cache_shard_num: usize,
307 #[serde(default)]
308 #[config_doc(omitted)]
309 pub vector_block_cache_eviction_config: CacheEvictionConfig,
310 #[serde(default = "default::storage::vector_meta_cache_capacity_mb")]
311 pub vector_meta_cache_capacity_mb: usize,
312 #[serde(default = "default::storage::vector_meta_cache_shard_num")]
313 pub vector_meta_cache_shard_num: usize,
314 #[serde(default)]
315 #[config_doc(omitted)]
316 pub vector_meta_cache_eviction_config: CacheEvictionConfig,
317}
318
319#[derive(Clone, Debug, Serialize, Deserialize)]
321#[serde(tag = "algorithm")]
322pub enum CacheEvictionConfig {
323 Lru {
324 high_priority_ratio_in_percent: Option<usize>,
325 },
326 Lfu {
327 window_capacity_ratio_in_percent: Option<usize>,
328 protected_capacity_ratio_in_percent: Option<usize>,
329 cmsketch_eps: Option<f64>,
330 cmsketch_confidence: Option<f64>,
331 },
332 S3Fifo {
333 small_queue_capacity_ratio_in_percent: Option<usize>,
334 ghost_queue_capacity_ratio_in_percent: Option<usize>,
335 small_to_main_freq_threshold: Option<u8>,
336 },
337}
338
339impl Default for CacheEvictionConfig {
340 fn default() -> Self {
341 Self::Lru {
342 high_priority_ratio_in_percent: None,
343 }
344 }
345}
346
347#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
348#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
349pub struct CacheRefillConfig {
350 #[serde(default = "default::cache_refill::meta_refill_concurrency")]
354 pub meta_refill_concurrency: usize,
355
356 #[serde(default = "default::cache_refill::data_refill_levels")]
358 pub data_refill_levels: Vec<u32>,
359
360 #[serde(default = "default::cache_refill::timeout_ms")]
362 pub timeout_ms: u64,
363
364 #[serde(default = "default::cache_refill::concurrency")]
366 pub concurrency: usize,
367
368 #[serde(default = "default::cache_refill::unit")]
370 pub unit: usize,
371
372 #[serde(default = "default::cache_refill::threshold")]
376 pub threshold: f64,
377
378 #[serde(default = "default::cache_refill::recent_filter_shards")]
380 pub recent_filter_shards: usize,
381
382 #[serde(default = "default::cache_refill::recent_filter_layers")]
384 pub recent_filter_layers: usize,
385
386 #[serde(default = "default::cache_refill::recent_filter_rotate_interval_ms")]
388 pub recent_filter_rotate_interval_ms: usize,
389
390 #[serde(default = "default::cache_refill::skip_recent_filter")]
394 pub skip_recent_filter: bool,
395
396 #[serde(default, flatten)]
397 #[config_doc(omitted)]
398 pub unrecognized: Unrecognized<Self>,
399}
400
401#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
405#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
406pub struct FileCacheConfig {
407 #[serde(default = "default::file_cache::dir")]
408 pub dir: String,
409
410 #[serde(default = "default::file_cache::capacity_mb")]
411 pub capacity_mb: usize,
412
413 #[serde(default = "default::file_cache::file_capacity_mb")]
414 pub file_capacity_mb: usize,
415
416 #[serde(default = "default::file_cache::flushers")]
417 pub flushers: usize,
418
419 #[serde(default = "default::file_cache::reclaimers")]
420 pub reclaimers: usize,
421
422 #[serde(default = "default::file_cache::recover_concurrency")]
423 pub recover_concurrency: usize,
424
425 #[serde(default = "default::file_cache::insert_rate_limit_mb")]
427 pub insert_rate_limit_mb: usize,
428
429 #[serde(default = "default::file_cache::indexer_shards")]
430 pub indexer_shards: usize,
431
432 #[serde(default = "default::file_cache::compression")]
433 pub compression: Compression,
434
435 #[serde(default = "default::file_cache::flush_buffer_threshold_mb")]
436 pub flush_buffer_threshold_mb: Option<usize>,
437
438 #[serde(default = "default::file_cache::throttle")]
439 pub throttle: Throttle,
440
441 #[serde(default = "default::file_cache::fifo_probation_ratio")]
442 pub fifo_probation_ratio: f64,
443
444 #[serde(default = "default::file_cache::blob_index_size_kb")]
456 pub blob_index_size_kb: usize,
457
458 #[serde(default = "default::file_cache::recover_mode")]
468 pub recover_mode: RecoverMode,
469
470 #[serde(default = "default::file_cache::runtime_config")]
471 pub runtime_config: RuntimeOptions,
472
473 #[serde(default, flatten)]
474 #[config_doc(omitted)]
475 pub unrecognized: Unrecognized<Self>,
476}
477
478#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
480#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
481pub struct ObjectStoreConfig {
482 #[serde(
484 default = "default::object_store_config::set_atomic_write_dir",
485 alias = "object_store_set_atomic_write_dir"
486 )]
487 pub set_atomic_write_dir: bool,
488
489 #[serde(default)]
493 pub retry: ObjectStoreRetryConfig,
494
495 #[serde(default)]
497 pub s3: S3ObjectStoreConfig,
498
499 #[serde(default = "default::object_store_config::opendal_upload_concurrency")]
501 pub opendal_upload_concurrency: usize,
502
503 #[serde(default)]
505 pub opendal_writer_abort_on_err: bool,
506
507 #[serde(default = "default::object_store_config::upload_part_size")]
508 pub upload_part_size: usize,
509}
510
511fn deserialize_max_prefetch_block_number<'de, D>(deserializer: D) -> Result<usize, D::Error>
512where
513 D: serde::Deserializer<'de>,
514{
515 let value = usize::deserialize(deserializer)?;
516 if value == 0 {
517 return Err(D::Error::custom(
518 "storage.max_prefetch_block_number must be greater than 0",
519 ));
520 }
521 Ok(value)
522}
523
524impl ObjectStoreConfig {
525 pub fn set_atomic_write_dir(&mut self) {
526 self.set_atomic_write_dir = true;
527 }
528}
529
530#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
532#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
533pub struct S3ObjectStoreConfig {
534 #[serde(
536 default = "default::object_store_config::s3::keepalive_ms",
537 alias = "object_store_keepalive_ms"
538 )]
539 pub keepalive_ms: Option<u64>,
540 #[serde(
541 default = "default::object_store_config::s3::recv_buffer_size",
542 alias = "object_store_recv_buffer_size"
543 )]
544 pub recv_buffer_size: Option<usize>,
545 #[serde(
546 default = "default::object_store_config::s3::send_buffer_size",
547 alias = "object_store_send_buffer_size"
548 )]
549 pub send_buffer_size: Option<usize>,
550 #[serde(
551 default = "default::object_store_config::s3::nodelay",
552 alias = "object_store_nodelay"
553 )]
554 pub nodelay: Option<bool>,
555 #[serde(default = "default::object_store_config::s3::developer::retry_unknown_service_error")]
557 pub retry_unknown_service_error: bool,
558 #[serde(default = "default::object_store_config::s3::identity_resolution_timeout_s")]
559 pub identity_resolution_timeout_s: u64,
560 #[serde(default)]
561 pub developer: S3ObjectStoreDeveloperConfig,
562}
563
564#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
566#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
567pub struct S3ObjectStoreDeveloperConfig {
568 #[serde(
570 default = "default::object_store_config::s3::developer::retry_unknown_service_error",
571 alias = "object_store_retry_unknown_service_error"
572 )]
573 pub retry_unknown_service_error: bool,
574 #[serde(
577 default = "default::object_store_config::s3::developer::retryable_service_error_codes",
578 alias = "object_store_retryable_service_error_codes"
579 )]
580 pub retryable_service_error_codes: Vec<String>,
581
582 #[serde(default = "default::object_store_config::s3::developer::use_opendal")]
584 pub use_opendal: bool,
585}
586
587#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
588#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
589pub struct ObjectStoreRetryConfig {
590 #[serde(default = "default::object_store_config::object_store_req_backoff_interval_ms")]
594 pub req_backoff_interval_ms: u64,
595
596 #[serde(default = "default::object_store_config::object_store_req_backoff_max_delay_ms")]
598 pub req_backoff_max_delay_ms: u64,
599
600 #[serde(default = "default::object_store_config::object_store_req_backoff_factor")]
602 pub req_backoff_factor: u64,
603
604 #[serde(default = "default::object_store_config::object_store_upload_attempt_timeout_ms")]
606 pub upload_attempt_timeout_ms: u64,
607
608 #[serde(default = "default::object_store_config::object_store_upload_retry_attempts")]
610 pub upload_retry_attempts: usize,
611
612 #[serde(
614 default = "default::object_store_config::object_store_streaming_upload_attempt_timeout_ms"
615 )]
616 pub streaming_upload_attempt_timeout_ms: u64,
617
618 #[serde(
620 default = "default::object_store_config::object_store_streaming_upload_retry_attempts"
621 )]
622 pub streaming_upload_retry_attempts: usize,
623
624 #[serde(default = "default::object_store_config::object_store_read_attempt_timeout_ms")]
626 pub read_attempt_timeout_ms: u64,
627
628 #[serde(default = "default::object_store_config::object_store_read_retry_attempts")]
630 pub read_retry_attempts: usize,
631
632 #[serde(
634 default = "default::object_store_config::object_store_streaming_read_attempt_timeout_ms"
635 )]
636 pub streaming_read_attempt_timeout_ms: u64,
637
638 #[serde(default = "default::object_store_config::object_store_streaming_read_retry_attempts")]
640 pub streaming_read_retry_attempts: usize,
641
642 #[serde(default = "default::object_store_config::object_store_metadata_attempt_timeout_ms")]
644 pub metadata_attempt_timeout_ms: u64,
645
646 #[serde(default = "default::object_store_config::object_store_metadata_retry_attempts")]
648 pub metadata_retry_attempts: usize,
649
650 #[serde(default = "default::object_store_config::object_store_delete_attempt_timeout_ms")]
652 pub delete_attempt_timeout_ms: u64,
653
654 #[serde(default = "default::object_store_config::object_store_delete_retry_attempts")]
656 pub delete_retry_attempts: usize,
657
658 #[serde(
660 default = "default::object_store_config::object_store_delete_objects_attempt_timeout_ms"
661 )]
662 pub delete_objects_attempt_timeout_ms: u64,
663
664 #[serde(default = "default::object_store_config::object_store_delete_objects_retry_attempts")]
666 pub delete_objects_retry_attempts: usize,
667
668 #[serde(default = "default::object_store_config::object_store_list_attempt_timeout_ms")]
670 pub list_attempt_timeout_ms: u64,
671
672 #[serde(default = "default::object_store_config::object_store_list_retry_attempts")]
674 pub list_retry_attempts: usize,
675}
676
677#[derive(Debug, Clone)]
678pub enum EvictionConfig {
679 Lru(LruConfig),
680 Lfu(LfuConfig),
681 S3Fifo(S3FifoConfig),
682}
683
684impl EvictionConfig {
685 pub fn for_test() -> Self {
686 Self::Lru(LruConfig {
687 high_priority_pool_ratio: 0.0,
688 })
689 }
690}
691
692impl From<EvictionConfig> for foyer::EvictionConfig {
693 fn from(value: EvictionConfig) -> Self {
694 match value {
695 EvictionConfig::Lru(lru) => foyer::EvictionConfig::Lru(lru),
696 EvictionConfig::Lfu(lfu) => foyer::EvictionConfig::Lfu(lfu),
697 EvictionConfig::S3Fifo(s3fifo) => foyer::EvictionConfig::S3Fifo(s3fifo),
698 }
699 }
700}
701
702pub struct StorageMemoryConfig {
703 pub block_cache_capacity_mb: usize,
704 pub block_cache_shard_num: usize,
705 pub meta_cache_capacity_mb: usize,
706 pub meta_cache_shard_num: usize,
707 pub vector_block_cache_capacity_mb: usize,
708 pub vector_block_cache_shard_num: usize,
709 pub vector_meta_cache_capacity_mb: usize,
710 pub vector_meta_cache_shard_num: usize,
711 pub shared_buffer_capacity_mb: usize,
712 pub compactor_memory_limit_mb: usize,
713 pub prefetch_buffer_capacity_mb: usize,
714 pub block_cache_eviction_config: EvictionConfig,
715 pub meta_cache_eviction_config: EvictionConfig,
716 pub vector_block_cache_eviction_config: EvictionConfig,
717 pub vector_meta_cache_eviction_config: EvictionConfig,
718 pub block_file_cache_flush_buffer_threshold_mb: usize,
719 pub meta_file_cache_flush_buffer_threshold_mb: usize,
720}
721
722pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
723 let block_cache_capacity_mb = s.storage.cache.block_cache_capacity_mb.unwrap_or(
724 s.storage
726 .block_cache_capacity_mb
727 .unwrap_or(default::storage::block_cache_capacity_mb()),
728 );
729 let meta_cache_capacity_mb = s.storage.cache.meta_cache_capacity_mb.unwrap_or(
730 s.storage
732 .block_cache_capacity_mb
733 .unwrap_or(default::storage::meta_cache_capacity_mb()),
734 );
735 let shared_buffer_capacity_mb = s
736 .storage
737 .shared_buffer_capacity_mb
738 .unwrap_or(default::storage::shared_buffer_capacity_mb());
739 let meta_cache_shard_num = s.storage.cache.meta_cache_shard_num.unwrap_or_else(|| {
740 let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
741 while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 {
742 shard_bits -= 1;
743 }
744 shard_bits
745 });
746 let block_cache_shard_num = s.storage.cache.block_cache_shard_num.unwrap_or_else(|| {
747 let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
748 while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0
749 {
750 shard_bits -= 1;
751 }
752 shard_bits
753 });
754 let compactor_memory_limit_mb = s
755 .storage
756 .compactor_memory_limit_mb
757 .unwrap_or(default::storage::compactor_memory_limit_mb());
758
759 let get_eviction_config = |c: &CacheEvictionConfig| {
760 match c {
761 CacheEvictionConfig::Lru {
762 high_priority_ratio_in_percent,
763 } => EvictionConfig::Lru(LruConfig {
764 high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
765 s.storage
767 .high_priority_ratio_in_percent
768 .unwrap_or(default::storage::high_priority_ratio_in_percent()),
769 ) as f64
770 / 100.0,
771 }),
772 CacheEvictionConfig::Lfu {
773 window_capacity_ratio_in_percent,
774 protected_capacity_ratio_in_percent,
775 cmsketch_eps,
776 cmsketch_confidence,
777 } => EvictionConfig::Lfu(LfuConfig {
778 window_capacity_ratio: window_capacity_ratio_in_percent
779 .unwrap_or(default::storage::window_capacity_ratio_in_percent())
780 as f64
781 / 100.0,
782 protected_capacity_ratio: protected_capacity_ratio_in_percent
783 .unwrap_or(default::storage::protected_capacity_ratio_in_percent())
784 as f64
785 / 100.0,
786 cmsketch_eps: cmsketch_eps.unwrap_or(default::storage::cmsketch_eps()),
787 cmsketch_confidence: cmsketch_confidence
788 .unwrap_or(default::storage::cmsketch_confidence()),
789 }),
790 CacheEvictionConfig::S3Fifo {
791 small_queue_capacity_ratio_in_percent,
792 ghost_queue_capacity_ratio_in_percent,
793 small_to_main_freq_threshold,
794 } => EvictionConfig::S3Fifo(S3FifoConfig {
795 small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent
796 .unwrap_or(default::storage::small_queue_capacity_ratio_in_percent())
797 as f64
798 / 100.0,
799 ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent
800 .unwrap_or(default::storage::ghost_queue_capacity_ratio_in_percent())
801 as f64
802 / 100.0,
803 small_to_main_freq_threshold: small_to_main_freq_threshold
804 .unwrap_or(default::storage::small_to_main_freq_threshold()),
805 }),
806 }
807 };
808
809 let block_cache_eviction_config = get_eviction_config(&s.storage.cache.block_cache_eviction);
810 let meta_cache_eviction_config = get_eviction_config(&s.storage.cache.meta_cache_eviction);
811 let vector_block_cache_eviction_config =
812 get_eviction_config(&s.storage.cache.vector_block_cache_eviction_config);
813 let vector_meta_cache_eviction_config =
814 get_eviction_config(&s.storage.cache.vector_meta_cache_eviction_config);
815
816 let prefetch_buffer_capacity_mb =
817 s.storage
818 .shared_buffer_capacity_mb
819 .unwrap_or(match &block_cache_eviction_config {
820 EvictionConfig::Lru(lru) => {
821 ((1.0 - lru.high_priority_pool_ratio) * block_cache_capacity_mb as f64) as usize
822 }
823 EvictionConfig::Lfu(lfu) => {
824 ((1.0 - lfu.protected_capacity_ratio) * block_cache_capacity_mb as f64) as usize
825 }
826 EvictionConfig::S3Fifo(s3fifo) => {
827 (s3fifo.small_queue_capacity_ratio * block_cache_capacity_mb as f64) as usize
828 }
829 });
830
831 let block_file_cache_flush_buffer_threshold_mb = s
832 .storage
833 .data_file_cache
834 .flush_buffer_threshold_mb
835 .unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb());
836 let meta_file_cache_flush_buffer_threshold_mb = s
837 .storage
838 .meta_file_cache
839 .flush_buffer_threshold_mb
840 .unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb());
841
842 StorageMemoryConfig {
843 block_cache_capacity_mb,
844 block_cache_shard_num,
845 meta_cache_capacity_mb,
846 meta_cache_shard_num,
847 vector_block_cache_capacity_mb: s.storage.cache.vector_block_cache_capacity_mb,
848 vector_block_cache_shard_num: s.storage.cache.vector_block_cache_shard_num,
849 vector_meta_cache_capacity_mb: s.storage.cache.vector_meta_cache_capacity_mb,
850 vector_meta_cache_shard_num: s.storage.cache.vector_meta_cache_shard_num,
851 shared_buffer_capacity_mb,
852 compactor_memory_limit_mb,
853 prefetch_buffer_capacity_mb,
854 block_cache_eviction_config,
855 meta_cache_eviction_config,
856 vector_block_cache_eviction_config,
857 vector_meta_cache_eviction_config,
858 block_file_cache_flush_buffer_threshold_mb,
859 meta_file_cache_flush_buffer_threshold_mb,
860 }
861}
862
863pub mod default {
864
865 pub mod storage {
866 pub fn share_buffers_sync_parallelism() -> u32 {
867 1
868 }
869
870 pub fn share_buffer_compaction_worker_threads_number() -> u32 {
871 4
872 }
873
874 pub fn shared_buffer_capacity_mb() -> usize {
875 1024
876 }
877
878 pub fn shared_buffer_flush_ratio() -> f32 {
879 0.8
880 }
881
882 pub fn shared_buffer_min_batch_flush_size_mb() -> usize {
883 800
884 }
885
886 pub fn imm_merge_threshold() -> usize {
887 0 }
889
890 pub fn write_conflict_detection_enabled() -> bool {
891 cfg!(debug_assertions)
892 }
893
894 pub fn max_cached_recent_versions_number() -> usize {
895 60
896 }
897
898 pub fn block_cache_capacity_mb() -> usize {
899 512
900 }
901
902 pub fn high_priority_ratio_in_percent() -> usize {
903 70
904 }
905
906 pub fn window_capacity_ratio_in_percent() -> usize {
907 10
908 }
909
910 pub fn protected_capacity_ratio_in_percent() -> usize {
911 80
912 }
913
914 pub fn cmsketch_eps() -> f64 {
915 0.002
916 }
917
918 pub fn cmsketch_confidence() -> f64 {
919 0.95
920 }
921
922 pub fn small_queue_capacity_ratio_in_percent() -> usize {
923 10
924 }
925
926 pub fn ghost_queue_capacity_ratio_in_percent() -> usize {
927 1000
928 }
929
930 pub fn small_to_main_freq_threshold() -> u8 {
931 1
932 }
933
934 pub fn meta_cache_capacity_mb() -> usize {
935 128
936 }
937
938 pub fn disable_remote_compactor() -> bool {
939 false
940 }
941
942 pub fn share_buffer_upload_concurrency() -> usize {
943 8
944 }
945
946 pub fn compactor_memory_limit_mb() -> usize {
947 512
948 }
949
950 pub fn compactor_max_task_multiplier() -> f32 {
951 match std::env::var("RW_COMPACTOR_MODE")
952 .unwrap_or_default()
953 .as_str()
954 {
955 mode if mode.contains("iceberg") => 12.0000,
956 _ => 3.0000,
957 }
958 }
959
960 pub fn compactor_memory_available_proportion() -> f64 {
961 0.8
962 }
963
964 pub fn sstable_id_remote_fetch_number() -> u32 {
965 10
966 }
967
968 pub fn min_sstable_size_mb() -> u32 {
969 32
970 }
971
972 pub fn min_sst_size_for_streaming_upload() -> u64 {
973 32 * 1024 * 1024
975 }
976
977 pub fn max_concurrent_compaction_task_number() -> u64 {
978 16
979 }
980
981 pub fn max_preload_wait_time_mill() -> u64 {
982 0
983 }
984
985 pub fn max_version_pinning_duration_sec() -> u64 {
986 3 * 3600
987 }
988
989 pub fn compactor_max_sst_key_count() -> u64 {
990 2 * 1024 * 1024 }
992
993 pub fn compact_iter_recreate_timeout_ms() -> u64 {
994 10 * 60 * 1000
995 }
996
997 pub fn compactor_iter_max_io_retry_times() -> usize {
998 8
999 }
1000
1001 pub fn shorten_block_meta_key_threshold() -> Option<usize> {
1002 None
1003 }
1004
1005 pub fn compactor_max_sst_size() -> u64 {
1006 512 * 1024 * 1024 }
1008
1009 pub fn enable_fast_compaction() -> bool {
1010 true
1011 }
1012
1013 pub fn check_compaction_result() -> bool {
1014 false
1015 }
1016
1017 pub fn max_preload_io_retry_times() -> usize {
1018 3
1019 }
1020
1021 pub fn mem_table_spill_threshold() -> usize {
1022 4 << 20
1023 }
1024
1025 pub fn compactor_fast_max_compact_delete_ratio() -> u32 {
1026 40
1027 }
1028
1029 pub fn compactor_fast_max_compact_task_size() -> u64 {
1030 2 * 1024 * 1024 * 1024 }
1032
1033 pub fn max_prefetch_block_number() -> usize {
1034 16
1035 }
1036
1037 pub fn compactor_concurrent_uploading_sst_count() -> Option<usize> {
1038 None
1039 }
1040
1041 pub fn compactor_max_overlap_sst_count() -> usize {
1042 64
1043 }
1044
1045 pub fn compactor_max_preload_meta_file_count() -> usize {
1046 32
1047 }
1048
1049 pub fn vector_file_block_size_kb() -> usize {
1050 1024
1051 }
1052
1053 pub fn vector_block_cache_capacity_mb() -> usize {
1054 16
1055 }
1056
1057 pub fn vector_block_cache_shard_num() -> usize {
1058 16
1059 }
1060
1061 pub fn vector_meta_cache_capacity_mb() -> usize {
1062 16
1063 }
1064
1065 pub fn vector_meta_cache_shard_num() -> usize {
1066 16
1067 }
1068
1069 pub fn table_info_statistic_history_times() -> usize {
1071 240
1072 }
1073
1074 pub fn block_file_cache_flush_buffer_threshold_mb() -> usize {
1075 256
1076 }
1077
1078 pub fn meta_file_cache_flush_buffer_threshold_mb() -> usize {
1079 64
1080 }
1081
1082 pub fn time_travel_version_cache_capacity() -> u64 {
1083 10
1084 }
1085
1086 pub fn table_change_log_cache_capacity() -> u64 {
1087 60
1088 }
1089
1090 pub fn sst_skip_bloom_filter_in_serde() -> bool {
1091 false
1092 }
1093
1094 pub fn iceberg_compaction_enable_validate() -> bool {
1095 false
1096 }
1097
1098 pub fn iceberg_compaction_max_record_batch_rows() -> usize {
1099 1024
1100 }
1101
1102 pub fn iceberg_compaction_write_parquet_max_row_group_rows() -> usize {
1103 1024 * 100 }
1105
1106 pub fn iceberg_compaction_min_size_per_partition_mb() -> u32 {
1107 1024
1108 }
1109
1110 pub fn iceberg_compaction_max_file_count_per_partition() -> u32 {
1111 32
1112 }
1113
1114 pub fn iceberg_compaction_task_parallelism_ratio() -> f32 {
1115 4.0
1116 }
1117
1118 pub fn iceberg_compaction_enable_heuristic_output_parallelism() -> bool {
1119 false
1120 }
1121
1122 pub fn iceberg_compaction_max_concurrent_closes() -> usize {
1123 8
1124 }
1125
1126 pub fn iceberg_compaction_enable_dynamic_size_estimation() -> bool {
1127 true
1128 }
1129
1130 pub fn iceberg_compaction_size_estimation_smoothing_factor() -> f64 {
1131 0.3
1132 }
1133
1134 pub fn iceberg_compaction_pending_parallelism_budget_multiplier() -> f32 {
1135 4.0
1136 }
1137
1138 pub fn iceberg_compaction_target_binpack_group_size_mb() -> Option<u64> {
1139 Some(100 * 1024) }
1141
1142 pub fn iceberg_compaction_min_group_size_mb() -> Option<u64> {
1143 None
1144 }
1145
1146 pub fn iceberg_compaction_min_group_file_count() -> Option<usize> {
1147 None
1148 }
1149 }
1150
1151 pub mod file_cache {
1152 use std::num::NonZeroUsize;
1153
1154 use foyer::{Compression, RecoverMode, RuntimeOptions, Throttle, TokioRuntimeOptions};
1155
1156 pub fn dir() -> String {
1157 "".to_owned()
1158 }
1159
1160 pub fn capacity_mb() -> usize {
1161 1024
1162 }
1163
1164 pub fn file_capacity_mb() -> usize {
1165 64
1166 }
1167
1168 pub fn flushers() -> usize {
1169 4
1170 }
1171
1172 pub fn reclaimers() -> usize {
1173 4
1174 }
1175
1176 pub fn recover_concurrency() -> usize {
1177 8
1178 }
1179
1180 pub fn insert_rate_limit_mb() -> usize {
1181 0
1182 }
1183
1184 pub fn indexer_shards() -> usize {
1185 64
1186 }
1187
1188 pub fn compression() -> Compression {
1189 Compression::None
1190 }
1191
1192 pub fn flush_buffer_threshold_mb() -> Option<usize> {
1193 None
1194 }
1195
1196 pub fn fifo_probation_ratio() -> f64 {
1197 0.1
1198 }
1199
1200 pub fn blob_index_size_kb() -> usize {
1201 16
1202 }
1203
1204 pub fn recover_mode() -> RecoverMode {
1205 RecoverMode::Quiet
1206 }
1207
1208 pub fn runtime_config() -> RuntimeOptions {
1209 RuntimeOptions::Unified(TokioRuntimeOptions::default())
1210 }
1211
1212 pub fn throttle() -> Throttle {
1213 Throttle::new()
1214 .with_iops_counter(foyer::IopsCounter::PerIoSize(
1215 NonZeroUsize::new(128 * 1024).unwrap(),
1216 ))
1217 .with_read_iops(100000)
1218 .with_write_iops(100000)
1219 .with_write_throughput(1024 * 1024 * 1024)
1220 .with_read_throughput(1024 * 1024 * 1024)
1221 }
1222 }
1223
1224 pub mod cache_refill {
1225 pub fn meta_refill_concurrency() -> usize {
1226 0
1227 }
1228
1229 pub fn data_refill_levels() -> Vec<u32> {
1230 vec![]
1231 }
1232
1233 pub fn timeout_ms() -> u64 {
1234 6000
1235 }
1236
1237 pub fn concurrency() -> usize {
1238 10
1239 }
1240
1241 pub fn unit() -> usize {
1242 64
1243 }
1244
1245 pub fn threshold() -> f64 {
1246 0.5
1247 }
1248
1249 pub fn recent_filter_shards() -> usize {
1250 16
1251 }
1252
1253 pub fn recent_filter_layers() -> usize {
1254 6
1255 }
1256
1257 pub fn recent_filter_rotate_interval_ms() -> usize {
1258 10000
1259 }
1260
1261 pub fn skip_recent_filter() -> bool {
1262 false
1263 }
1264 }
1265
1266 pub mod object_store_config {
1267 const DEFAULT_REQ_BACKOFF_INTERVAL_MS: u64 = 1000; const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3;
1270
1271 pub fn set_atomic_write_dir() -> bool {
1272 false
1273 }
1274
1275 pub fn object_store_req_backoff_interval_ms() -> u64 {
1276 DEFAULT_REQ_BACKOFF_INTERVAL_MS
1277 }
1278
1279 pub fn object_store_req_backoff_max_delay_ms() -> u64 {
1280 DEFAULT_REQ_BACKOFF_MAX_DELAY_MS }
1282
1283 pub fn object_store_req_backoff_factor() -> u64 {
1284 2
1285 }
1286
1287 pub fn object_store_upload_attempt_timeout_ms() -> u64 {
1288 8 * 1000 }
1290
1291 pub fn object_store_upload_retry_attempts() -> usize {
1292 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1293 }
1294
1295 pub fn object_store_streaming_upload_attempt_timeout_ms() -> u64 {
1297 5 * 1000 }
1299
1300 pub fn object_store_streaming_upload_retry_attempts() -> usize {
1301 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1302 }
1303
1304 pub fn object_store_read_attempt_timeout_ms() -> u64 {
1306 8 * 1000 }
1308
1309 pub fn object_store_read_retry_attempts() -> usize {
1310 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1311 }
1312
1313 pub fn object_store_streaming_read_attempt_timeout_ms() -> u64 {
1314 3 * 1000 }
1316
1317 pub fn object_store_streaming_read_retry_attempts() -> usize {
1318 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1319 }
1320
1321 pub fn object_store_metadata_attempt_timeout_ms() -> u64 {
1322 60 * 1000 }
1324
1325 pub fn object_store_metadata_retry_attempts() -> usize {
1326 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1327 }
1328
1329 pub fn object_store_delete_attempt_timeout_ms() -> u64 {
1330 5 * 1000
1331 }
1332
1333 pub fn object_store_delete_retry_attempts() -> usize {
1334 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1335 }
1336
1337 pub fn object_store_delete_objects_attempt_timeout_ms() -> u64 {
1339 5 * 1000
1340 }
1341
1342 pub fn object_store_delete_objects_retry_attempts() -> usize {
1343 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1344 }
1345
1346 pub fn object_store_list_attempt_timeout_ms() -> u64 {
1347 10 * 60 * 1000
1348 }
1349
1350 pub fn object_store_list_retry_attempts() -> usize {
1351 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1352 }
1353
1354 pub fn opendal_upload_concurrency() -> usize {
1355 256
1356 }
1357
1358 pub fn upload_part_size() -> usize {
1359 16 * 1024 * 1024
1361 }
1362
1363 pub mod s3 {
1364 const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5;
1365
1366 const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; pub fn keepalive_ms() -> Option<u64> {
1369 Some(DEFAULT_KEEPALIVE_MS) }
1371
1372 pub fn recv_buffer_size() -> Option<usize> {
1373 Some(1 << 21) }
1375
1376 pub fn send_buffer_size() -> Option<usize> {
1377 None
1378 }
1379
1380 pub fn nodelay() -> Option<bool> {
1381 Some(true)
1382 }
1383
1384 pub fn identity_resolution_timeout_s() -> u64 {
1385 DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S
1386 }
1387
1388 pub mod developer {
1389 pub fn retry_unknown_service_error() -> bool {
1390 false
1391 }
1392
1393 pub fn retryable_service_error_codes() -> Vec<String> {
1394 vec!["SlowDown".into(), "TooManyRequests".into()]
1395 }
1396
1397 pub fn use_opendal() -> bool {
1398 true
1399 }
1400 }
1401 }
1402 }
1403}