1use foyer::{
16 Compression, LfuConfig, LruConfig, RecoverMode, RuntimeOptions, S3FifoConfig, Throttle,
17};
18use serde::de::Error as _;
19
20use super::*;
21
22#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
24#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
25pub struct StorageConfig {
26 #[serde(default = "default::storage::share_buffers_sync_parallelism")]
28 pub share_buffers_sync_parallelism: u32,
29
30 #[serde(default = "default::storage::share_buffer_compaction_worker_threads_number")]
33 pub share_buffer_compaction_worker_threads_number: u32,
34
35 #[serde(default)]
40 pub shared_buffer_capacity_mb: Option<usize>,
41
42 #[serde(default = "default::storage::shared_buffer_flush_ratio")]
45 pub shared_buffer_flush_ratio: f32,
46
47 #[serde(default = "default::storage::shared_buffer_min_batch_flush_size_mb")]
50 pub shared_buffer_min_batch_flush_size_mb: usize,
51
52 #[serde(default = "default::storage::imm_merge_threshold")]
54 #[deprecated]
55 pub imm_merge_threshold: usize,
56
57 #[serde(default = "default::storage::write_conflict_detection_enabled")]
59 pub write_conflict_detection_enabled: bool,
60
61 #[serde(default)]
62 #[config_doc(nested)]
63 pub cache: CacheConfig,
64
65 #[serde(default)]
67 pub block_cache_capacity_mb: Option<usize>,
68
69 #[serde(default)]
71 pub meta_cache_capacity_mb: Option<usize>,
72
73 #[serde(default)]
75 pub high_priority_ratio_in_percent: Option<usize>,
76
77 #[serde(default)]
79 pub prefetch_buffer_capacity_mb: Option<usize>,
80
81 #[serde(default = "default::storage::max_cached_recent_versions_number")]
82 pub max_cached_recent_versions_number: usize,
83
84 #[serde(
86 default = "default::storage::max_prefetch_block_number",
87 deserialize_with = "deserialize_max_prefetch_block_number"
88 )]
89 pub max_prefetch_block_number: usize,
90
91 #[serde(default = "default::storage::disable_remote_compactor")]
92 pub disable_remote_compactor: bool,
93
94 #[serde(default = "default::storage::share_buffer_upload_concurrency")]
96 pub share_buffer_upload_concurrency: usize,
97
98 #[serde(default)]
99 pub compactor_memory_limit_mb: Option<usize>,
100
101 #[serde(default = "default::storage::compactor_max_task_multiplier")]
105 pub compactor_max_task_multiplier: f32,
106
107 #[serde(default = "default::storage::compactor_memory_available_proportion")]
110 pub compactor_memory_available_proportion: f64,
111
112 #[serde(default = "default::storage::sstable_id_remote_fetch_number")]
114 pub sstable_id_remote_fetch_number: u32,
115
116 #[serde(default = "default::storage::min_sstable_size_mb")]
117 pub min_sstable_size_mb: u32,
118
119 #[serde(default)]
120 #[config_doc(nested)]
121 pub data_file_cache: FileCacheConfig,
122
123 #[serde(default)]
124 #[config_doc(nested)]
125 pub meta_file_cache: FileCacheConfig,
126
127 #[serde(default = "default::storage::sst_skip_bloom_filter_in_serde")]
131 pub sst_skip_bloom_filter_in_serde: bool,
132
133 #[serde(default)]
134 #[config_doc(nested)]
135 pub cache_refill: CacheRefillConfig,
136
137 #[serde(default = "default::storage::min_sst_size_for_streaming_upload")]
139 pub min_sst_size_for_streaming_upload: u64,
140
141 #[serde(default = "default::storage::max_concurrent_compaction_task_number")]
142 pub max_concurrent_compaction_task_number: u64,
143
144 #[serde(default = "default::storage::max_preload_wait_time_mill")]
145 pub max_preload_wait_time_mill: u64,
146
147 #[serde(default = "default::storage::max_version_pinning_duration_sec")]
148 pub max_version_pinning_duration_sec: u64,
149
150 #[serde(default = "default::storage::compactor_max_sst_key_count")]
151 pub compactor_max_sst_key_count: u64,
152 #[serde(default = "default::storage::compact_iter_recreate_timeout_ms")]
154 pub compact_iter_recreate_timeout_ms: u64,
155 #[serde(default = "default::storage::compactor_max_sst_size")]
156 pub compactor_max_sst_size: u64,
157 #[serde(default = "default::storage::enable_fast_compaction")]
158 pub enable_fast_compaction: bool,
159 #[serde(default = "default::storage::check_compaction_result")]
160 pub check_compaction_result: bool,
161 #[serde(default = "default::storage::max_preload_io_retry_times")]
162 pub max_preload_io_retry_times: usize,
163 #[serde(default = "default::storage::compactor_fast_max_compact_delete_ratio")]
164 pub compactor_fast_max_compact_delete_ratio: u32,
165 #[serde(default = "default::storage::compactor_fast_max_compact_task_size")]
166 pub compactor_fast_max_compact_task_size: u64,
167 #[serde(default = "default::storage::compactor_iter_max_io_retry_times")]
168 pub compactor_iter_max_io_retry_times: usize,
169
170 #[serde(default = "default::storage::shorten_block_meta_key_threshold")]
175 pub shorten_block_meta_key_threshold: Option<usize>,
176
177 #[serde(default = "default::storage::table_info_statistic_history_times")]
179 #[deprecated]
180 pub table_info_statistic_history_times: usize,
181
182 #[serde(default, flatten)]
183 #[config_doc(omitted)]
184 pub unrecognized: Unrecognized<Self>,
185
186 #[serde(default = "default::storage::mem_table_spill_threshold")]
188 pub mem_table_spill_threshold: usize,
189
190 #[serde(default = "default::storage::compactor_concurrent_uploading_sst_count")]
192 pub compactor_concurrent_uploading_sst_count: Option<usize>,
193
194 #[serde(default = "default::storage::compactor_max_overlap_sst_count")]
195 pub compactor_max_overlap_sst_count: usize,
196
197 #[serde(default = "default::storage::compactor_max_preload_meta_file_count")]
201 pub compactor_max_preload_meta_file_count: usize,
202
203 #[serde(default = "default::storage::vector_file_block_size_kb")]
204 pub vector_file_block_size_kb: usize,
205
206 #[serde(default)]
211 pub object_store: ObjectStoreConfig,
212
213 #[serde(default = "default::storage::time_travel_version_cache_capacity")]
214 pub time_travel_version_cache_capacity: u64,
215
216 #[serde(default = "default::storage::table_change_log_cache_capacity")]
217 pub table_change_log_cache_capacity: u64,
218
219 #[serde(default = "default::storage::iceberg_compaction_enable_validate")]
221 pub iceberg_compaction_enable_validate: bool,
222 #[serde(default = "default::storage::iceberg_compaction_max_record_batch_rows")]
223 pub iceberg_compaction_max_record_batch_rows: usize,
224 #[serde(default = "default::storage::iceberg_compaction_min_size_per_partition_mb")]
225 pub iceberg_compaction_min_size_per_partition_mb: u32,
226 #[serde(default = "default::storage::iceberg_compaction_max_file_count_per_partition")]
227 pub iceberg_compaction_max_file_count_per_partition: u32,
228 #[serde(default = "default::storage::iceberg_compaction_write_parquet_max_row_group_rows")]
231 #[deprecated(
232 note = "This config is deprecated. Use sink config `compaction.write_parquet_max_row_group_rows` instead."
233 )]
234 pub iceberg_compaction_write_parquet_max_row_group_rows: usize,
235
236 #[serde(default = "default::storage::iceberg_compaction_task_parallelism_ratio")]
238 pub iceberg_compaction_task_parallelism_ratio: f32,
239 #[serde(default = "default::storage::iceberg_compaction_enable_heuristic_output_parallelism")]
241 pub iceberg_compaction_enable_heuristic_output_parallelism: bool,
242 #[serde(default = "default::storage::iceberg_compaction_max_concurrent_closes")]
244 pub iceberg_compaction_max_concurrent_closes: usize,
245 #[serde(default = "default::storage::iceberg_compaction_enable_dynamic_size_estimation")]
247 pub iceberg_compaction_enable_dynamic_size_estimation: bool,
248 #[serde(default = "default::storage::iceberg_compaction_size_estimation_smoothing_factor")]
250 pub iceberg_compaction_size_estimation_smoothing_factor: f64,
251 #[serde(
255 default = "default::storage::iceberg_compaction_pending_parallelism_budget_multiplier"
256 )]
257 pub iceberg_compaction_pending_parallelism_budget_multiplier: f32,
258 #[serde(
260 default = "default::storage::iceberg_compaction_pull_interval_ms",
261 deserialize_with = "deserialize_iceberg_compaction_pull_interval_ms"
262 )]
263 pub iceberg_compaction_pull_interval_ms: u64,
264
265 #[serde(default = "default::storage::iceberg_compaction_target_binpack_group_size_mb")]
266 pub iceberg_compaction_target_binpack_group_size_mb: Option<u64>,
267 #[serde(default = "default::storage::iceberg_compaction_min_group_size_mb")]
268 pub iceberg_compaction_min_group_size_mb: Option<u64>,
269 #[serde(default = "default::storage::iceberg_compaction_min_group_file_count")]
270 pub iceberg_compaction_min_group_file_count: Option<usize>,
271}
272
273#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
275#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
276pub struct CacheConfig {
277 #[serde(default)]
282 pub block_cache_capacity_mb: Option<usize>,
283
284 #[serde(default)]
287 pub block_cache_shard_num: Option<usize>,
288
289 #[serde(default)]
290 #[config_doc(omitted)]
291 pub block_cache_eviction: CacheEvictionConfig,
292
293 #[serde(default)]
298 pub meta_cache_capacity_mb: Option<usize>,
299
300 #[serde(default)]
303 pub meta_cache_shard_num: Option<usize>,
304
305 #[serde(default)]
306 #[config_doc(omitted)]
307 pub meta_cache_eviction: CacheEvictionConfig,
308
309 #[serde(default = "default::storage::vector_block_cache_capacity_mb")]
310 pub vector_block_cache_capacity_mb: usize,
311 #[serde(default = "default::storage::vector_block_cache_shard_num")]
312 pub vector_block_cache_shard_num: usize,
313 #[serde(default)]
314 #[config_doc(omitted)]
315 pub vector_block_cache_eviction_config: CacheEvictionConfig,
316 #[serde(default = "default::storage::vector_meta_cache_capacity_mb")]
317 pub vector_meta_cache_capacity_mb: usize,
318 #[serde(default = "default::storage::vector_meta_cache_shard_num")]
319 pub vector_meta_cache_shard_num: usize,
320 #[serde(default)]
321 #[config_doc(omitted)]
322 pub vector_meta_cache_eviction_config: CacheEvictionConfig,
323}
324
325#[derive(Clone, Debug, Serialize, Deserialize)]
327#[serde(tag = "algorithm")]
328pub enum CacheEvictionConfig {
329 Lru {
330 high_priority_ratio_in_percent: Option<usize>,
331 },
332 Lfu {
333 window_capacity_ratio_in_percent: Option<usize>,
334 protected_capacity_ratio_in_percent: Option<usize>,
335 cmsketch_eps: Option<f64>,
336 cmsketch_confidence: Option<f64>,
337 },
338 S3Fifo {
339 small_queue_capacity_ratio_in_percent: Option<usize>,
340 ghost_queue_capacity_ratio_in_percent: Option<usize>,
341 small_to_main_freq_threshold: Option<u8>,
342 },
343}
344
345impl Default for CacheEvictionConfig {
346 fn default() -> Self {
347 Self::Lru {
348 high_priority_ratio_in_percent: None,
349 }
350 }
351}
352
353#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
354#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
355pub struct CacheRefillConfig {
356 #[serde(default = "default::cache_refill::meta_refill_concurrency")]
360 pub meta_refill_concurrency: usize,
361
362 #[serde(default = "default::cache_refill::data_refill_levels")]
364 pub data_refill_levels: Vec<u32>,
365
366 #[serde(default = "default::cache_refill::timeout_ms")]
368 pub timeout_ms: u64,
369
370 #[serde(default = "default::cache_refill::concurrency")]
372 pub concurrency: usize,
373
374 #[serde(default = "default::cache_refill::unit")]
376 pub unit: usize,
377
378 #[serde(default = "default::cache_refill::threshold")]
382 pub threshold: f64,
383
384 #[serde(default = "default::cache_refill::recent_filter_shards")]
386 pub recent_filter_shards: usize,
387
388 #[serde(default = "default::cache_refill::recent_filter_layers")]
390 pub recent_filter_layers: usize,
391
392 #[serde(default = "default::cache_refill::recent_filter_rotate_interval_ms")]
394 pub recent_filter_rotate_interval_ms: usize,
395
396 #[serde(default = "default::cache_refill::skip_recent_filter")]
400 pub skip_recent_filter: bool,
401
402 #[serde(default, flatten)]
403 #[config_doc(omitted)]
404 pub unrecognized: Unrecognized<Self>,
405}
406
407#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
411#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
412pub struct FileCacheConfig {
413 #[serde(default = "default::file_cache::dir")]
414 pub dir: String,
415
416 #[serde(default = "default::file_cache::capacity_mb")]
417 pub capacity_mb: usize,
418
419 #[serde(default = "default::file_cache::file_capacity_mb")]
420 pub file_capacity_mb: usize,
421
422 #[serde(default = "default::file_cache::flushers")]
423 pub flushers: usize,
424
425 #[serde(default = "default::file_cache::reclaimers")]
426 pub reclaimers: usize,
427
428 #[serde(default = "default::file_cache::recover_concurrency")]
429 pub recover_concurrency: usize,
430
431 #[serde(default = "default::file_cache::insert_rate_limit_mb")]
433 pub insert_rate_limit_mb: usize,
434
435 #[serde(default = "default::file_cache::indexer_shards")]
436 pub indexer_shards: usize,
437
438 #[serde(default = "default::file_cache::compression")]
439 pub compression: Compression,
440
441 #[serde(default = "default::file_cache::flush_buffer_threshold_mb")]
442 pub flush_buffer_threshold_mb: Option<usize>,
443
444 #[serde(default = "default::file_cache::throttle")]
445 pub throttle: Throttle,
446
447 #[serde(default = "default::file_cache::fifo_probation_ratio")]
448 pub fifo_probation_ratio: f64,
449
450 #[serde(default = "default::file_cache::blob_index_size_kb")]
462 pub blob_index_size_kb: usize,
463
464 #[serde(default = "default::file_cache::recover_mode")]
474 pub recover_mode: RecoverMode,
475
476 #[serde(default = "default::file_cache::runtime_config")]
477 pub runtime_config: RuntimeOptions,
478
479 #[serde(default, flatten)]
480 #[config_doc(omitted)]
481 pub unrecognized: Unrecognized<Self>,
482}
483
484#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
486#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
487pub struct ObjectStoreConfig {
488 #[serde(
490 default = "default::object_store_config::set_atomic_write_dir",
491 alias = "object_store_set_atomic_write_dir"
492 )]
493 pub set_atomic_write_dir: bool,
494
495 #[serde(default)]
499 pub retry: ObjectStoreRetryConfig,
500
501 #[serde(default)]
503 pub s3: S3ObjectStoreConfig,
504
505 #[serde(default = "default::object_store_config::object_store_req_concurrency_limit")]
510 pub req_concurrency_limit: usize,
511
512 #[serde(default = "default::object_store_config::http_concurrent_limit")]
515 pub http_concurrent_limit: usize,
516
517 #[serde(default = "default::object_store_config::opendal_upload_concurrency")]
519 pub opendal_upload_concurrency: usize,
520
521 #[serde(default)]
523 pub opendal_writer_abort_on_err: bool,
524
525 #[serde(default = "default::object_store_config::upload_part_size")]
526 pub upload_part_size: usize,
527}
528
529fn deserialize_max_prefetch_block_number<'de, D>(deserializer: D) -> Result<usize, D::Error>
530where
531 D: serde::Deserializer<'de>,
532{
533 let value = usize::deserialize(deserializer)?;
534 if value == 0 {
535 return Err(D::Error::custom(
536 "storage.max_prefetch_block_number must be greater than 0",
537 ));
538 }
539 Ok(value)
540}
541
542fn deserialize_iceberg_compaction_pull_interval_ms<'de, D>(deserializer: D) -> Result<u64, D::Error>
543where
544 D: serde::Deserializer<'de>,
545{
546 let value = u64::deserialize(deserializer)?;
547 if value == 0 {
548 return Err(D::Error::custom(
549 "storage.iceberg_compaction_pull_interval_ms must be greater than 0",
550 ));
551 }
552 Ok(value)
553}
554
555impl ObjectStoreConfig {
556 pub fn set_atomic_write_dir(&mut self) {
557 self.set_atomic_write_dir = true;
558 }
559}
560
561#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
563#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
564pub struct S3ObjectStoreConfig {
565 #[serde(
567 default = "default::object_store_config::s3::keepalive_ms",
568 alias = "object_store_keepalive_ms"
569 )]
570 pub keepalive_ms: Option<u64>,
571 #[serde(
572 default = "default::object_store_config::s3::recv_buffer_size",
573 alias = "object_store_recv_buffer_size"
574 )]
575 pub recv_buffer_size: Option<usize>,
576 #[serde(
577 default = "default::object_store_config::s3::send_buffer_size",
578 alias = "object_store_send_buffer_size"
579 )]
580 pub send_buffer_size: Option<usize>,
581 #[serde(
582 default = "default::object_store_config::s3::nodelay",
583 alias = "object_store_nodelay"
584 )]
585 pub nodelay: Option<bool>,
586 #[serde(default = "default::object_store_config::s3::developer::retry_unknown_service_error")]
588 pub retry_unknown_service_error: bool,
589 #[serde(default = "default::object_store_config::s3::identity_resolution_timeout_s")]
590 pub identity_resolution_timeout_s: u64,
591 #[serde(default)]
592 pub developer: S3ObjectStoreDeveloperConfig,
593}
594
595#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
597#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
598pub struct S3ObjectStoreDeveloperConfig {
599 #[serde(
601 default = "default::object_store_config::s3::developer::retry_unknown_service_error",
602 alias = "object_store_retry_unknown_service_error"
603 )]
604 pub retry_unknown_service_error: bool,
605 #[serde(
608 default = "default::object_store_config::s3::developer::retryable_service_error_codes",
609 alias = "object_store_retryable_service_error_codes"
610 )]
611 pub retryable_service_error_codes: Vec<String>,
612
613 #[serde(default = "default::object_store_config::s3::developer::use_opendal")]
615 pub use_opendal: bool,
616}
617
618#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
619#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
620pub struct ObjectStoreRetryConfig {
621 #[serde(default = "default::object_store_config::object_store_req_backoff_interval_ms")]
625 pub req_backoff_interval_ms: u64,
626
627 #[serde(default = "default::object_store_config::object_store_req_backoff_max_delay_ms")]
629 pub req_backoff_max_delay_ms: u64,
630
631 #[serde(default = "default::object_store_config::object_store_req_backoff_factor")]
633 pub req_backoff_factor: u64,
634
635 #[serde(default = "default::object_store_config::object_store_upload_attempt_timeout_ms")]
637 pub upload_attempt_timeout_ms: u64,
638
639 #[serde(default = "default::object_store_config::object_store_upload_retry_attempts")]
641 pub upload_retry_attempts: usize,
642
643 #[serde(
645 default = "default::object_store_config::object_store_streaming_upload_attempt_timeout_ms"
646 )]
647 pub streaming_upload_attempt_timeout_ms: u64,
648
649 #[serde(
651 default = "default::object_store_config::object_store_streaming_upload_retry_attempts"
652 )]
653 pub streaming_upload_retry_attempts: usize,
654
655 #[serde(default = "default::object_store_config::object_store_read_attempt_timeout_ms")]
657 pub read_attempt_timeout_ms: u64,
658
659 #[serde(default = "default::object_store_config::object_store_read_retry_attempts")]
661 pub read_retry_attempts: usize,
662
663 #[serde(
665 default = "default::object_store_config::object_store_streaming_read_attempt_timeout_ms"
666 )]
667 pub streaming_read_attempt_timeout_ms: u64,
668
669 #[serde(default = "default::object_store_config::object_store_streaming_read_retry_attempts")]
671 pub streaming_read_retry_attempts: usize,
672
673 #[serde(default = "default::object_store_config::object_store_metadata_attempt_timeout_ms")]
675 pub metadata_attempt_timeout_ms: u64,
676
677 #[serde(default = "default::object_store_config::object_store_metadata_retry_attempts")]
679 pub metadata_retry_attempts: usize,
680
681 #[serde(default = "default::object_store_config::object_store_delete_attempt_timeout_ms")]
683 pub delete_attempt_timeout_ms: u64,
684
685 #[serde(default = "default::object_store_config::object_store_delete_retry_attempts")]
687 pub delete_retry_attempts: usize,
688
689 #[serde(
691 default = "default::object_store_config::object_store_delete_objects_attempt_timeout_ms"
692 )]
693 pub delete_objects_attempt_timeout_ms: u64,
694
695 #[serde(default = "default::object_store_config::object_store_delete_objects_retry_attempts")]
697 pub delete_objects_retry_attempts: usize,
698
699 #[serde(default = "default::object_store_config::object_store_list_attempt_timeout_ms")]
701 pub list_attempt_timeout_ms: u64,
702
703 #[serde(default = "default::object_store_config::object_store_list_retry_attempts")]
705 pub list_retry_attempts: usize,
706}
707
708#[derive(Debug, Clone)]
709pub enum EvictionConfig {
710 Lru(LruConfig),
711 Lfu(LfuConfig),
712 S3Fifo(S3FifoConfig),
713}
714
715impl EvictionConfig {
716 pub fn for_test() -> Self {
717 Self::Lru(LruConfig {
718 high_priority_pool_ratio: 0.0,
719 })
720 }
721}
722
723impl From<EvictionConfig> for foyer::EvictionConfig {
724 fn from(value: EvictionConfig) -> Self {
725 match value {
726 EvictionConfig::Lru(lru) => foyer::EvictionConfig::Lru(lru),
727 EvictionConfig::Lfu(lfu) => foyer::EvictionConfig::Lfu(lfu),
728 EvictionConfig::S3Fifo(s3fifo) => foyer::EvictionConfig::S3Fifo(s3fifo),
729 }
730 }
731}
732
733pub struct StorageMemoryConfig {
734 pub block_cache_capacity_mb: usize,
735 pub block_cache_shard_num: usize,
736 pub meta_cache_capacity_mb: usize,
737 pub meta_cache_shard_num: usize,
738 pub vector_block_cache_capacity_mb: usize,
739 pub vector_block_cache_shard_num: usize,
740 pub vector_meta_cache_capacity_mb: usize,
741 pub vector_meta_cache_shard_num: usize,
742 pub shared_buffer_capacity_mb: usize,
743 pub compactor_memory_limit_mb: usize,
744 pub prefetch_buffer_capacity_mb: usize,
745 pub block_cache_eviction_config: EvictionConfig,
746 pub meta_cache_eviction_config: EvictionConfig,
747 pub vector_block_cache_eviction_config: EvictionConfig,
748 pub vector_meta_cache_eviction_config: EvictionConfig,
749 pub block_file_cache_flush_buffer_threshold_mb: usize,
750 pub meta_file_cache_flush_buffer_threshold_mb: usize,
751}
752
753pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
754 let block_cache_capacity_mb = s.storage.cache.block_cache_capacity_mb.unwrap_or(
755 s.storage
757 .block_cache_capacity_mb
758 .unwrap_or(default::storage::block_cache_capacity_mb()),
759 );
760 let meta_cache_capacity_mb = s.storage.cache.meta_cache_capacity_mb.unwrap_or(
761 s.storage
763 .block_cache_capacity_mb
764 .unwrap_or(default::storage::meta_cache_capacity_mb()),
765 );
766 let shared_buffer_capacity_mb = s
767 .storage
768 .shared_buffer_capacity_mb
769 .unwrap_or(default::storage::shared_buffer_capacity_mb());
770 let meta_cache_shard_num = s.storage.cache.meta_cache_shard_num.unwrap_or_else(|| {
771 let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
772 while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 {
773 shard_bits -= 1;
774 }
775 shard_bits
776 });
777 let block_cache_shard_num = s.storage.cache.block_cache_shard_num.unwrap_or_else(|| {
778 let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
779 while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0
780 {
781 shard_bits -= 1;
782 }
783 shard_bits
784 });
785 let compactor_memory_limit_mb = s
786 .storage
787 .compactor_memory_limit_mb
788 .unwrap_or(default::storage::compactor_memory_limit_mb());
789
790 let get_eviction_config = |c: &CacheEvictionConfig| {
791 match c {
792 CacheEvictionConfig::Lru {
793 high_priority_ratio_in_percent,
794 } => EvictionConfig::Lru(LruConfig {
795 high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
796 s.storage
798 .high_priority_ratio_in_percent
799 .unwrap_or(default::storage::high_priority_ratio_in_percent()),
800 ) as f64
801 / 100.0,
802 }),
803 CacheEvictionConfig::Lfu {
804 window_capacity_ratio_in_percent,
805 protected_capacity_ratio_in_percent,
806 cmsketch_eps,
807 cmsketch_confidence,
808 } => EvictionConfig::Lfu(LfuConfig {
809 window_capacity_ratio: window_capacity_ratio_in_percent
810 .unwrap_or(default::storage::window_capacity_ratio_in_percent())
811 as f64
812 / 100.0,
813 protected_capacity_ratio: protected_capacity_ratio_in_percent
814 .unwrap_or(default::storage::protected_capacity_ratio_in_percent())
815 as f64
816 / 100.0,
817 cmsketch_eps: cmsketch_eps.unwrap_or(default::storage::cmsketch_eps()),
818 cmsketch_confidence: cmsketch_confidence
819 .unwrap_or(default::storage::cmsketch_confidence()),
820 }),
821 CacheEvictionConfig::S3Fifo {
822 small_queue_capacity_ratio_in_percent,
823 ghost_queue_capacity_ratio_in_percent,
824 small_to_main_freq_threshold,
825 } => EvictionConfig::S3Fifo(S3FifoConfig {
826 small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent
827 .unwrap_or(default::storage::small_queue_capacity_ratio_in_percent())
828 as f64
829 / 100.0,
830 ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent
831 .unwrap_or(default::storage::ghost_queue_capacity_ratio_in_percent())
832 as f64
833 / 100.0,
834 small_to_main_freq_threshold: small_to_main_freq_threshold
835 .unwrap_or(default::storage::small_to_main_freq_threshold()),
836 }),
837 }
838 };
839
840 let block_cache_eviction_config = get_eviction_config(&s.storage.cache.block_cache_eviction);
841 let meta_cache_eviction_config = get_eviction_config(&s.storage.cache.meta_cache_eviction);
842 let vector_block_cache_eviction_config =
843 get_eviction_config(&s.storage.cache.vector_block_cache_eviction_config);
844 let vector_meta_cache_eviction_config =
845 get_eviction_config(&s.storage.cache.vector_meta_cache_eviction_config);
846
847 let prefetch_buffer_capacity_mb =
848 s.storage
849 .shared_buffer_capacity_mb
850 .unwrap_or(match &block_cache_eviction_config {
851 EvictionConfig::Lru(lru) => {
852 ((1.0 - lru.high_priority_pool_ratio) * block_cache_capacity_mb as f64) as usize
853 }
854 EvictionConfig::Lfu(lfu) => {
855 ((1.0 - lfu.protected_capacity_ratio) * block_cache_capacity_mb as f64) as usize
856 }
857 EvictionConfig::S3Fifo(s3fifo) => {
858 (s3fifo.small_queue_capacity_ratio * block_cache_capacity_mb as f64) as usize
859 }
860 });
861
862 let block_file_cache_flush_buffer_threshold_mb = s
863 .storage
864 .data_file_cache
865 .flush_buffer_threshold_mb
866 .unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb());
867 let meta_file_cache_flush_buffer_threshold_mb = s
868 .storage
869 .meta_file_cache
870 .flush_buffer_threshold_mb
871 .unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb());
872
873 StorageMemoryConfig {
874 block_cache_capacity_mb,
875 block_cache_shard_num,
876 meta_cache_capacity_mb,
877 meta_cache_shard_num,
878 vector_block_cache_capacity_mb: s.storage.cache.vector_block_cache_capacity_mb,
879 vector_block_cache_shard_num: s.storage.cache.vector_block_cache_shard_num,
880 vector_meta_cache_capacity_mb: s.storage.cache.vector_meta_cache_capacity_mb,
881 vector_meta_cache_shard_num: s.storage.cache.vector_meta_cache_shard_num,
882 shared_buffer_capacity_mb,
883 compactor_memory_limit_mb,
884 prefetch_buffer_capacity_mb,
885 block_cache_eviction_config,
886 meta_cache_eviction_config,
887 vector_block_cache_eviction_config,
888 vector_meta_cache_eviction_config,
889 block_file_cache_flush_buffer_threshold_mb,
890 meta_file_cache_flush_buffer_threshold_mb,
891 }
892}
893
894pub mod default {
895
896 pub mod storage {
897 pub fn share_buffers_sync_parallelism() -> u32 {
898 1
899 }
900
901 pub fn share_buffer_compaction_worker_threads_number() -> u32 {
902 4
903 }
904
905 pub fn shared_buffer_capacity_mb() -> usize {
906 1024
907 }
908
909 pub fn shared_buffer_flush_ratio() -> f32 {
910 0.8
911 }
912
913 pub fn shared_buffer_min_batch_flush_size_mb() -> usize {
914 800
915 }
916
917 pub fn imm_merge_threshold() -> usize {
918 0 }
920
921 pub fn write_conflict_detection_enabled() -> bool {
922 cfg!(debug_assertions)
923 }
924
925 pub fn max_cached_recent_versions_number() -> usize {
926 60
927 }
928
929 pub fn block_cache_capacity_mb() -> usize {
930 512
931 }
932
933 pub fn high_priority_ratio_in_percent() -> usize {
934 70
935 }
936
937 pub fn window_capacity_ratio_in_percent() -> usize {
938 10
939 }
940
941 pub fn protected_capacity_ratio_in_percent() -> usize {
942 80
943 }
944
945 pub fn cmsketch_eps() -> f64 {
946 0.002
947 }
948
949 pub fn cmsketch_confidence() -> f64 {
950 0.95
951 }
952
953 pub fn small_queue_capacity_ratio_in_percent() -> usize {
954 10
955 }
956
957 pub fn ghost_queue_capacity_ratio_in_percent() -> usize {
958 1000
959 }
960
961 pub fn small_to_main_freq_threshold() -> u8 {
962 1
963 }
964
965 pub fn meta_cache_capacity_mb() -> usize {
966 128
967 }
968
969 pub fn disable_remote_compactor() -> bool {
970 false
971 }
972
973 pub fn share_buffer_upload_concurrency() -> usize {
974 8
975 }
976
977 pub fn compactor_memory_limit_mb() -> usize {
978 512
979 }
980
981 pub fn compactor_max_task_multiplier() -> f32 {
982 match std::env::var("RW_COMPACTOR_MODE")
983 .unwrap_or_default()
984 .as_str()
985 {
986 mode if mode.contains("iceberg") => 12.0000,
987 _ => 3.0000,
988 }
989 }
990
991 pub fn compactor_memory_available_proportion() -> f64 {
992 0.8
993 }
994
995 pub fn sstable_id_remote_fetch_number() -> u32 {
996 10
997 }
998
999 pub fn min_sstable_size_mb() -> u32 {
1000 32
1001 }
1002
1003 pub fn min_sst_size_for_streaming_upload() -> u64 {
1004 32 * 1024 * 1024
1006 }
1007
1008 pub fn max_concurrent_compaction_task_number() -> u64 {
1009 16
1010 }
1011
1012 pub fn max_preload_wait_time_mill() -> u64 {
1013 0
1014 }
1015
1016 pub fn max_version_pinning_duration_sec() -> u64 {
1017 3 * 3600
1018 }
1019
1020 pub fn compactor_max_sst_key_count() -> u64 {
1021 2 * 1024 * 1024 }
1023
1024 pub fn compact_iter_recreate_timeout_ms() -> u64 {
1025 10 * 60 * 1000
1026 }
1027
1028 pub fn compactor_iter_max_io_retry_times() -> usize {
1029 8
1030 }
1031
1032 pub fn shorten_block_meta_key_threshold() -> Option<usize> {
1033 None
1034 }
1035
1036 pub fn compactor_max_sst_size() -> u64 {
1037 512 * 1024 * 1024 }
1039
1040 pub fn enable_fast_compaction() -> bool {
1041 true
1042 }
1043
1044 pub fn check_compaction_result() -> bool {
1045 false
1046 }
1047
1048 pub fn max_preload_io_retry_times() -> usize {
1049 3
1050 }
1051
1052 pub fn mem_table_spill_threshold() -> usize {
1053 4 << 20
1054 }
1055
1056 pub fn compactor_fast_max_compact_delete_ratio() -> u32 {
1057 40
1058 }
1059
1060 pub fn compactor_fast_max_compact_task_size() -> u64 {
1061 2 * 1024 * 1024 * 1024 }
1063
1064 pub fn max_prefetch_block_number() -> usize {
1065 16
1066 }
1067
1068 pub fn compactor_concurrent_uploading_sst_count() -> Option<usize> {
1069 None
1070 }
1071
1072 pub fn compactor_max_overlap_sst_count() -> usize {
1073 64
1074 }
1075
1076 pub fn compactor_max_preload_meta_file_count() -> usize {
1077 32
1078 }
1079
1080 pub fn vector_file_block_size_kb() -> usize {
1081 1024
1082 }
1083
1084 pub fn vector_block_cache_capacity_mb() -> usize {
1085 16
1086 }
1087
1088 pub fn vector_block_cache_shard_num() -> usize {
1089 16
1090 }
1091
1092 pub fn vector_meta_cache_capacity_mb() -> usize {
1093 16
1094 }
1095
1096 pub fn vector_meta_cache_shard_num() -> usize {
1097 16
1098 }
1099
1100 pub fn table_info_statistic_history_times() -> usize {
1102 240
1103 }
1104
1105 pub fn block_file_cache_flush_buffer_threshold_mb() -> usize {
1106 256
1107 }
1108
1109 pub fn meta_file_cache_flush_buffer_threshold_mb() -> usize {
1110 64
1111 }
1112
1113 pub fn time_travel_version_cache_capacity() -> u64 {
1114 10
1115 }
1116
1117 pub fn table_change_log_cache_capacity() -> u64 {
1118 60
1119 }
1120
1121 pub fn sst_skip_bloom_filter_in_serde() -> bool {
1122 false
1123 }
1124
1125 pub fn iceberg_compaction_enable_validate() -> bool {
1126 false
1127 }
1128
1129 pub fn iceberg_compaction_max_record_batch_rows() -> usize {
1130 1024
1131 }
1132
1133 pub fn iceberg_compaction_write_parquet_max_row_group_rows() -> usize {
1134 1024 * 100 }
1136
1137 pub fn iceberg_compaction_min_size_per_partition_mb() -> u32 {
1138 1024
1139 }
1140
1141 pub fn iceberg_compaction_max_file_count_per_partition() -> u32 {
1142 32
1143 }
1144
1145 pub fn iceberg_compaction_task_parallelism_ratio() -> f32 {
1146 4.0
1147 }
1148
1149 pub fn iceberg_compaction_enable_heuristic_output_parallelism() -> bool {
1150 false
1151 }
1152
1153 pub fn iceberg_compaction_max_concurrent_closes() -> usize {
1154 8
1155 }
1156
1157 pub fn iceberg_compaction_enable_dynamic_size_estimation() -> bool {
1158 true
1159 }
1160
1161 pub fn iceberg_compaction_size_estimation_smoothing_factor() -> f64 {
1162 0.3
1163 }
1164
1165 pub fn iceberg_compaction_pending_parallelism_budget_multiplier() -> f32 {
1166 4.0
1167 }
1168
1169 pub fn iceberg_compaction_pull_interval_ms() -> u64 {
1170 5000
1171 }
1172
1173 pub fn iceberg_compaction_target_binpack_group_size_mb() -> Option<u64> {
1174 Some(100 * 1024) }
1176
1177 pub fn iceberg_compaction_min_group_size_mb() -> Option<u64> {
1178 None
1179 }
1180
1181 pub fn iceberg_compaction_min_group_file_count() -> Option<usize> {
1182 None
1183 }
1184 }
1185
1186 pub mod file_cache {
1187 use std::num::NonZeroUsize;
1188
1189 use foyer::{Compression, RecoverMode, RuntimeOptions, Throttle, TokioRuntimeOptions};
1190
1191 pub fn dir() -> String {
1192 "".to_owned()
1193 }
1194
1195 pub fn capacity_mb() -> usize {
1196 1024
1197 }
1198
1199 pub fn file_capacity_mb() -> usize {
1200 64
1201 }
1202
1203 pub fn flushers() -> usize {
1204 4
1205 }
1206
1207 pub fn reclaimers() -> usize {
1208 4
1209 }
1210
1211 pub fn recover_concurrency() -> usize {
1212 8
1213 }
1214
1215 pub fn insert_rate_limit_mb() -> usize {
1216 0
1217 }
1218
1219 pub fn indexer_shards() -> usize {
1220 64
1221 }
1222
1223 pub fn compression() -> Compression {
1224 Compression::None
1225 }
1226
1227 pub fn flush_buffer_threshold_mb() -> Option<usize> {
1228 None
1229 }
1230
1231 pub fn fifo_probation_ratio() -> f64 {
1232 0.1
1233 }
1234
1235 pub fn blob_index_size_kb() -> usize {
1236 16
1237 }
1238
1239 pub fn recover_mode() -> RecoverMode {
1240 RecoverMode::Quiet
1241 }
1242
1243 pub fn runtime_config() -> RuntimeOptions {
1244 RuntimeOptions::Unified(TokioRuntimeOptions::default())
1245 }
1246
1247 pub fn throttle() -> Throttle {
1248 Throttle::new()
1249 .with_iops_counter(foyer::IopsCounter::PerIoSize(
1250 NonZeroUsize::new(128 * 1024).unwrap(),
1251 ))
1252 .with_read_iops(100000)
1253 .with_write_iops(100000)
1254 .with_write_throughput(1024 * 1024 * 1024)
1255 .with_read_throughput(1024 * 1024 * 1024)
1256 }
1257 }
1258
1259 pub mod cache_refill {
1260 pub fn meta_refill_concurrency() -> usize {
1261 0
1262 }
1263
1264 pub fn data_refill_levels() -> Vec<u32> {
1265 vec![]
1266 }
1267
1268 pub fn timeout_ms() -> u64 {
1269 6000
1270 }
1271
1272 pub fn concurrency() -> usize {
1273 10
1274 }
1275
1276 pub fn unit() -> usize {
1277 64
1278 }
1279
1280 pub fn threshold() -> f64 {
1281 0.5
1282 }
1283
1284 pub fn recent_filter_shards() -> usize {
1285 16
1286 }
1287
1288 pub fn recent_filter_layers() -> usize {
1289 6
1290 }
1291
1292 pub fn recent_filter_rotate_interval_ms() -> usize {
1293 10000
1294 }
1295
1296 pub fn skip_recent_filter() -> bool {
1297 false
1298 }
1299 }
1300
1301 pub mod object_store_config {
1302 const DEFAULT_REQ_BACKOFF_INTERVAL_MS: u64 = 1000; const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3;
1305
1306 pub fn set_atomic_write_dir() -> bool {
1307 false
1308 }
1309
1310 pub fn object_store_req_concurrency_limit() -> usize {
1311 0
1312 }
1313
1314 pub fn http_concurrent_limit() -> usize {
1315 0
1316 }
1317
1318 pub fn object_store_req_backoff_interval_ms() -> u64 {
1319 DEFAULT_REQ_BACKOFF_INTERVAL_MS
1320 }
1321
1322 pub fn object_store_req_backoff_max_delay_ms() -> u64 {
1323 DEFAULT_REQ_BACKOFF_MAX_DELAY_MS }
1325
1326 pub fn object_store_req_backoff_factor() -> u64 {
1327 2
1328 }
1329
1330 pub fn object_store_upload_attempt_timeout_ms() -> u64 {
1331 8 * 1000 }
1333
1334 pub fn object_store_upload_retry_attempts() -> usize {
1335 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1336 }
1337
1338 pub fn object_store_streaming_upload_attempt_timeout_ms() -> u64 {
1340 5 * 1000 }
1342
1343 pub fn object_store_streaming_upload_retry_attempts() -> usize {
1344 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1345 }
1346
1347 pub fn object_store_read_attempt_timeout_ms() -> u64 {
1349 8 * 1000 }
1351
1352 pub fn object_store_read_retry_attempts() -> usize {
1353 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1354 }
1355
1356 pub fn object_store_streaming_read_attempt_timeout_ms() -> u64 {
1357 3 * 1000 }
1359
1360 pub fn object_store_streaming_read_retry_attempts() -> usize {
1361 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1362 }
1363
1364 pub fn object_store_metadata_attempt_timeout_ms() -> u64 {
1365 60 * 1000 }
1367
1368 pub fn object_store_metadata_retry_attempts() -> usize {
1369 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1370 }
1371
1372 pub fn object_store_delete_attempt_timeout_ms() -> u64 {
1373 5 * 1000
1374 }
1375
1376 pub fn object_store_delete_retry_attempts() -> usize {
1377 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1378 }
1379
1380 pub fn object_store_delete_objects_attempt_timeout_ms() -> u64 {
1382 5 * 1000
1383 }
1384
1385 pub fn object_store_delete_objects_retry_attempts() -> usize {
1386 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1387 }
1388
1389 pub fn object_store_list_attempt_timeout_ms() -> u64 {
1390 10 * 60 * 1000
1391 }
1392
1393 pub fn object_store_list_retry_attempts() -> usize {
1394 DEFAULT_REQ_MAX_RETRY_ATTEMPTS
1395 }
1396
1397 pub fn opendal_upload_concurrency() -> usize {
1398 256
1399 }
1400
1401 pub fn upload_part_size() -> usize {
1402 16 * 1024 * 1024
1404 }
1405
1406 pub mod s3 {
1407 const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5;
1408
1409 const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; pub fn keepalive_ms() -> Option<u64> {
1412 Some(DEFAULT_KEEPALIVE_MS) }
1414
1415 pub fn recv_buffer_size() -> Option<usize> {
1416 Some(1 << 21) }
1418
1419 pub fn send_buffer_size() -> Option<usize> {
1420 None
1421 }
1422
1423 pub fn nodelay() -> Option<bool> {
1424 Some(true)
1425 }
1426
1427 pub fn identity_resolution_timeout_s() -> u64 {
1428 DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S
1429 }
1430
1431 pub mod developer {
1432 pub fn retry_unknown_service_error() -> bool {
1433 false
1434 }
1435
1436 pub fn retryable_service_error_codes() -> Vec<String> {
1437 vec!["SlowDown".into(), "TooManyRequests".into()]
1438 }
1439
1440 pub fn use_opendal() -> bool {
1441 true
1442 }
1443 }
1444 }
1445 }
1446}