risingwave_common/config/
mod.rs1pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{
28 CheckpointCompression, CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig,
29 MetaStoreConfig,
30};
31pub mod streaming;
32pub use streaming::{AsyncStackTraceOption, StreamingConfig};
33pub mod server;
34pub use server::{HeapProfilingConfig, ServerConfig};
35pub mod udf;
36pub use udf::UdfConfig;
37pub mod storage;
38pub use storage::{
39 CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
40 extract_storage_memory_config,
41};
42pub mod merge;
43pub mod mutate;
44pub mod none_as_empty_string;
45pub mod system;
46pub mod utils;
47
48use std::collections::BTreeMap;
49use std::fs;
50use std::num::NonZeroUsize;
51
52use anyhow::Context;
53use clap::ValueEnum;
54use educe::Educe;
55pub use merge::*;
56use risingwave_common_proc_macro::ConfigDoc;
57pub use risingwave_common_proc_macro::OverrideConfig;
58use risingwave_pb::meta::SystemParams;
59use serde::{Deserialize, Serialize, Serializer};
60use serde_default::DefaultFromSerde;
61use serde_json::Value;
62pub use system::SystemConfig;
63pub use utils::*;
64
65use crate::for_all_params;
66
67pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
70pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; #[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
77#[educe(Debug)]
78pub struct RwConfig {
79 #[serde(default)]
80 #[config_doc(nested)]
81 pub server: ServerConfig,
82
83 #[serde(default)]
84 #[config_doc(nested)]
85 pub meta: MetaConfig,
86
87 #[serde(default)]
88 #[config_doc(nested)]
89 pub batch: BatchConfig,
90
91 #[serde(default)]
92 #[config_doc(nested)]
93 pub frontend: FrontendConfig,
94
95 #[serde(default)]
96 #[config_doc(nested)]
97 pub streaming: StreamingConfig,
98
99 #[serde(default)]
100 #[config_doc(nested)]
101 pub storage: StorageConfig,
102
103 #[serde(default)]
104 #[educe(Debug(ignore))]
105 #[config_doc(nested)]
106 pub system: SystemConfig,
107
108 #[serde(default)]
109 #[config_doc(nested)]
110 pub udf: UdfConfig,
111
112 #[serde(flatten)]
113 #[config_doc(omitted)]
114 pub unrecognized: Unrecognized<Self>,
115}
116
117#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
124#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
125pub struct RpcClientConfig {
126 #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
127 pub connect_timeout_secs: u64,
128 #[serde(default = "default::developer::rpc_client_pool_setup_concurrency")]
131 pub pool_setup_concurrency: usize,
132}
133
134pub use risingwave_common_metrics::MetricLevel;
135
136impl RwConfig {
137 pub const fn default_connection_pool_size(&self) -> u16 {
138 self.server.connection_pool_size
139 }
140
141 pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
144 self.streaming
145 .developer
146 .exchange_connection_pool_size
147 .unwrap_or_else(|| self.default_connection_pool_size())
148 }
149
150 pub fn batch_exchange_connection_pool_size(&self) -> u16 {
153 self.batch
154 .developer
155 .exchange_connection_pool_size
156 .unwrap_or_else(|| self.default_connection_pool_size())
157 }
158}
159
160pub mod default {
161
162 pub mod developer {
163 pub fn meta_cached_traces_num() -> u32 {
164 256
165 }
166
167 pub fn meta_cached_traces_memory_limit_bytes() -> usize {
168 1 << 27 }
170
171 pub fn batch_output_channel_size() -> usize {
172 64
173 }
174
175 pub fn batch_receiver_channel_size() -> usize {
176 1000
177 }
178
179 pub fn batch_root_stage_channel_size() -> usize {
180 100
181 }
182
183 pub fn batch_chunk_size() -> usize {
184 1024
185 }
186
187 pub fn batch_local_execute_buffer_size() -> usize {
188 64
189 }
190
191 pub fn batch_exchange_connection_pool_size() -> Option<u16> {
194 None
195 }
196
197 pub fn stream_enable_executor_row_count() -> bool {
198 false
199 }
200
201 pub fn connector_message_buffer_size() -> usize {
202 16
203 }
204
205 pub fn unsafe_stream_extreme_cache_size() -> usize {
206 10
207 }
208
209 pub fn stream_topn_cache_min_capacity() -> usize {
210 10
211 }
212
213 pub fn stream_chunk_size() -> usize {
214 256
215 }
216
217 pub fn stream_exchange_initial_permits() -> usize {
218 2048
219 }
220
221 pub fn stream_exchange_batched_permits() -> usize {
222 256
223 }
224
225 pub fn stream_exchange_concurrent_barriers() -> usize {
226 1
227 }
228
229 pub fn stream_exchange_concurrent_dispatchers() -> usize {
230 0
231 }
232
233 pub fn stream_dml_channel_initial_permits() -> usize {
234 32768
235 }
236
237 pub fn stream_max_barrier_batch_size() -> u32 {
238 1024
239 }
240
241 pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
242 64 << 20 }
244
245 pub fn enable_trivial_move() -> bool {
246 true
247 }
248
249 pub fn enable_check_task_level_overlap() -> bool {
250 false
251 }
252
253 pub fn max_trivial_move_task_count_per_loop() -> usize {
254 256
255 }
256
257 pub fn max_get_task_probe_times() -> usize {
258 5
259 }
260
261 pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
262 100
263 }
264
265 pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
266 400
267 }
268
269 pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
270 10_000
271 }
272
273 pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
274 100
275 }
276
277 pub fn time_travel_vacuum_interval_sec() -> u64 {
278 30
279 }
280
281 pub fn time_travel_vacuum_max_version_count() -> Option<u32> {
282 Some(10000)
283 }
284
285 pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
286 1000
287 }
288
289 pub fn hummock_time_travel_delta_fetch_batch_size() -> usize {
290 100
291 }
292
293 pub fn hummock_gc_history_insert_batch_size() -> usize {
294 1000
295 }
296
297 pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
298 1000
299 }
300
301 pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
302 false
303 }
304
305 pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
306 10
307 }
308
309 pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
310 1000
311 }
312
313 pub fn memory_controller_threshold_aggressive() -> f64 {
314 0.9
315 }
316
317 pub fn memory_controller_threshold_graceful() -> f64 {
318 0.81
319 }
320
321 pub fn memory_controller_threshold_stable() -> f64 {
322 0.72
323 }
324
325 pub fn memory_controller_eviction_factor_aggressive() -> f64 {
326 2.0
327 }
328
329 pub fn memory_controller_eviction_factor_graceful() -> f64 {
330 1.5
331 }
332
333 pub fn memory_controller_eviction_factor_stable() -> f64 {
334 1.0
335 }
336
337 pub fn memory_controller_update_interval_ms() -> usize {
338 100
339 }
340
341 pub fn memory_controller_sequence_tls_step() -> u64 {
342 128
343 }
344
345 pub fn memory_controller_sequence_tls_lag() -> u64 {
346 32
347 }
348
349 pub fn stream_enable_arrangement_backfill() -> bool {
350 true
351 }
352
353 pub fn stream_enable_snapshot_backfill() -> bool {
354 true
355 }
356
357 pub fn enable_shared_source() -> bool {
358 true
359 }
360
361 pub fn stream_high_join_amplification_threshold() -> usize {
362 2048
363 }
364
365 pub fn stream_exchange_connection_pool_size() -> Option<u16> {
367 Some(1)
368 }
369
370 pub fn enable_actor_tokio_metrics() -> bool {
371 true
372 }
373
374 pub fn stream_enable_auto_schema_change() -> bool {
375 true
376 }
377
378 pub fn switch_jdbc_pg_to_native() -> bool {
379 false
380 }
381
382 pub fn streaming_hash_join_entry_state_max_rows() -> usize {
383 30000
385 }
386
387 pub fn streaming_join_hash_map_evict_interval_rows() -> u32 {
388 16
389 }
390
391 pub fn streaming_now_progress_ratio() -> Option<f32> {
392 None
393 }
394
395 pub fn stream_snapshot_iter_rebuild_interval_secs() -> u64 {
396 10 * 60
397 }
398
399 pub fn enable_explain_analyze_stats() -> bool {
400 true
401 }
402
403 pub fn rpc_client_connect_timeout_secs() -> u64 {
404 5
405 }
406
407 pub fn rpc_client_pool_setup_concurrency() -> usize {
408 0
409 }
410
411 pub fn iceberg_list_interval_sec() -> u64 {
412 10
413 }
414
415 pub fn iceberg_fetch_batch_size() -> u64 {
416 1024
417 }
418
419 pub fn iceberg_sink_positional_delete_cache_size() -> usize {
420 1024
421 }
422
423 pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
424 100_000
425 }
426
427 pub fn materialize_force_overwrite_on_no_check() -> bool {
428 false
429 }
430
431 pub fn refresh_scheduler_interval_sec() -> u64 {
432 60
433 }
434
435 pub fn sync_log_store_pause_duration_ms() -> usize {
436 64
437 }
438
439 pub fn sync_log_store_buffer_size() -> usize {
440 2048
441 }
442
443 pub fn table_change_log_insert_batch_size() -> u64 {
444 1000
445 }
446
447 pub fn table_change_log_delete_batch_size() -> u64 {
448 1000
449 }
450
451 pub fn enable_state_table_vnode_stats_pruning() -> bool {
452 false
453 }
454
455 pub fn enable_vnode_key_stats_for_materialize() -> bool {
456 false
457 }
458
459 pub fn max_concurrent_kv_log_store_historical_read() -> usize {
460 0
461 }
462 }
463}
464
465pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
466pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
467pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; #[cfg(test)]
470pub mod tests {
471 use expect_test::expect;
472 use risingwave_license::LicenseKey;
473
474 use super::*;
475
476 fn default_config_for_docs() -> RwConfig {
477 let mut config = RwConfig::default();
478 config.system.license_key = Some(LicenseKey::empty());
480 config
481 }
482
483 #[test]
487 fn test_example_up_to_date() {
488 const HEADER: &str = "# This file is generated by ./risedev generate-example-config
489# Check detailed comments in src/common/src/config.rs";
490
491 let actual = expect_test::expect_file!["../../../config/example.toml"];
492 let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
493
494 let expected = format!("{HEADER}\n\n{default}");
495 actual.assert_eq(&expected);
496
497 let expected = rw_config_to_markdown();
498 let actual = expect_test::expect_file!["../../../config/docs.md"];
499 actual.assert_eq(&expected);
500 }
501
502 #[derive(Debug)]
503 struct ConfigItemDoc {
504 desc: String,
505 default: String,
506 }
507
508 fn rw_config_to_markdown() -> String {
509 let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
510 RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
511
512 let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
514 .into_iter()
515 .map(|(k, v)| {
516 let docs: BTreeMap<String, ConfigItemDoc> = v
517 .into_iter()
518 .map(|(name, desc)| {
519 (
520 name,
521 ConfigItemDoc {
522 desc,
523 default: "".to_owned(), },
525 )
526 })
527 .collect();
528 (k, docs)
529 })
530 .collect();
531
532 let toml_doc: BTreeMap<String, toml::Value> =
533 toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
534 toml_doc.into_iter().for_each(|(name, value)| {
535 set_default_values("".to_owned(), name, value, &mut configs);
536 });
537
538 let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
539 + "This page is automatically generated by `./risedev generate-example-config`\n";
540 for (section, configs) in configs {
541 if configs.is_empty() {
542 continue;
543 }
544 markdown.push_str(&format!("\n## {}\n\n", section));
545 markdown.push_str("| Config | Description | Default |\n");
546 markdown.push_str("|--------|-------------|---------|\n");
547 for (config, doc) in configs {
548 markdown.push_str(&format!(
549 "| {} | {} | {} |\n",
550 config, doc.desc, doc.default
551 ));
552 }
553 }
554 markdown
555 }
556
557 fn set_default_values(
558 section: String,
559 name: String,
560 value: toml::Value,
561 configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
562 ) {
563 if let toml::Value::Table(table) = value {
565 let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
566 let sub_section = if section.is_empty() {
567 name
568 } else {
569 format!("{}.{}", section, name)
570 };
571 section_configs
572 .into_iter()
573 .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
574 } else if let Some(t) = configs.get_mut(§ion)
575 && let Some(item_doc) = t.get_mut(&name)
576 {
577 item_doc.default = format!("{}", value);
578 }
579 }
580
581 #[test]
582 fn test_object_store_configs_backward_compatibility() {
583 {
585 let config: RwConfig = toml::from_str(
586 r#"
587 [storage.object_store]
588 object_store_set_atomic_write_dir = true
589
590 [storage.object_store.s3]
591 object_store_keepalive_ms = 1
592 object_store_send_buffer_size = 1
593 object_store_recv_buffer_size = 1
594 object_store_nodelay = false
595
596 [storage.object_store.s3.developer]
597 object_store_retry_unknown_service_error = true
598 object_store_retryable_service_error_codes = ['dummy']
599
600
601 "#,
602 )
603 .unwrap();
604
605 assert!(config.storage.object_store.set_atomic_write_dir);
606 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
607 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
608 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
609 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
610 assert!(
611 config
612 .storage
613 .object_store
614 .s3
615 .developer
616 .retry_unknown_service_error
617 );
618 assert_eq!(
619 config
620 .storage
621 .object_store
622 .s3
623 .developer
624 .retryable_service_error_codes,
625 vec!["dummy".to_owned()]
626 );
627 }
628
629 {
631 let config: RwConfig = toml::from_str(
632 r#"
633 [storage.object_store]
634 set_atomic_write_dir = true
635
636 [storage.object_store.s3]
637 keepalive_ms = 1
638 send_buffer_size = 1
639 recv_buffer_size = 1
640 nodelay = false
641
642 [storage.object_store.s3.developer]
643 retry_unknown_service_error = true
644 retryable_service_error_codes = ['dummy']
645
646
647 "#,
648 )
649 .unwrap();
650
651 assert!(config.storage.object_store.set_atomic_write_dir);
652 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
653 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
654 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
655 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
656 assert!(
657 config
658 .storage
659 .object_store
660 .s3
661 .developer
662 .retry_unknown_service_error
663 );
664 assert_eq!(
665 config
666 .storage
667 .object_store
668 .s3
669 .developer
670 .retryable_service_error_codes,
671 vec!["dummy".to_owned()]
672 );
673 }
674 }
675
676 #[test]
677 fn test_meta_configs_backward_compatibility() {
678 {
680 let config: RwConfig = toml::from_str(
681 r#"
682 [meta]
683 periodic_split_compact_group_interval_sec = 1
684 table_write_throughput_threshold = 10
685 min_table_split_write_throughput = 5
686 "#,
687 )
688 .unwrap();
689
690 assert_eq!(
691 config
692 .meta
693 .periodic_scheduling_compaction_group_split_interval_sec,
694 1
695 );
696 assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
697 assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
698 }
699 }
700
701 #[test]
702 fn test_meta_max_normalize_splits_per_round_must_be_positive() {
703 let config = toml::from_str::<RwConfig>(
704 r#"
705 [meta]
706 max_normalize_splits_per_round = 0
707 "#,
708 )
709 .unwrap_err();
710
711 expect![[r#"
712 TOML parse error at line 3, column 46
713 |
714 3 | max_normalize_splits_per_round = 0
715 | ^
716 meta.max_normalize_splits_per_round must be greater than 0
717 "#]]
718 .assert_eq(&config.to_string());
719 }
720
721 #[test]
724 fn test_prefix_alias() {
725 let config: RwConfig = toml::from_str(
726 "
727 [streaming.developer]
728 stream_chunk_size = 114514
729
730 [streaming.developer.stream_compute_client_config]
731 connect_timeout_secs = 42
732 pool_setup_concurrency = 10
733 ",
734 )
735 .unwrap();
736
737 assert_eq!(config.streaming.developer.chunk_size, 114514);
738 assert_eq!(
739 config
740 .streaming
741 .developer
742 .compute_client_config
743 .connect_timeout_secs,
744 42
745 );
746 assert_eq!(
747 config
748 .streaming
749 .developer
750 .compute_client_config
751 .pool_setup_concurrency,
752 10
753 );
754 }
755
756 #[test]
757 fn test_prefix_alias_duplicate() {
758 let config = toml::from_str::<RwConfig>(
759 "
760 [streaming.developer]
761 stream_chunk_size = 114514
762 chunk_size = 1919810
763 ",
764 )
765 .unwrap_err();
766
767 expect![[r#"
768 TOML parse error at line 2, column 13
769 |
770 2 | [streaming.developer]
771 | ^^^^^^^^^^^^^^^^^^^^^
772 duplicate field `chunk_size`
773 "#]]
774 .assert_eq(&config.to_string());
775
776 let config = toml::from_str::<RwConfig>(
777 "
778 [streaming.developer.stream_compute_client_config]
779 connect_timeout_secs = 5
780
781 [streaming.developer.compute_client_config]
782 connect_timeout_secs = 10
783 ",
784 )
785 .unwrap_err();
786
787 expect![[r#"
788 TOML parse error at line 2, column 24
789 |
790 2 | [streaming.developer.stream_compute_client_config]
791 | ^^^^^^^^^
792 duplicate field `compute_client_config`
793 "#]]
794 .assert_eq(&config.to_string());
795 }
796
797 #[test]
798 fn test_storage_max_prefetch_block_number_must_be_positive() {
799 let config = toml::from_str::<RwConfig>(
800 r#"
801 [storage]
802 max_prefetch_block_number = 0
803 "#,
804 )
805 .unwrap_err();
806
807 expect![[r#"
808 TOML parse error at line 3, column 41
809 |
810 3 | max_prefetch_block_number = 0
811 | ^
812 storage.max_prefetch_block_number must be greater than 0
813 "#]]
814 .assert_eq(&config.to_string());
815 }
816
817 #[test]
818 fn test_storage_iceberg_compaction_pull_interval_ms_must_be_positive() {
819 let config = toml::from_str::<RwConfig>(
820 r#"
821 [storage]
822 iceberg_compaction_pull_interval_ms = 0
823 "#,
824 )
825 .unwrap_err();
826
827 expect![[r#"
828 TOML parse error at line 3, column 51
829 |
830 3 | iceberg_compaction_pull_interval_ms = 0
831 | ^
832 storage.iceberg_compaction_pull_interval_ms must be greater than 0
833 "#]]
834 .assert_eq(&config.to_string());
835 }
836}