risingwave_common/config/
mod.rs1pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{
28 CheckpointCompression, CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig,
29 MetaStoreConfig,
30};
31pub mod streaming;
32pub use streaming::{AsyncStackTraceOption, StreamingConfig};
33pub mod server;
34pub use server::{HeapProfilingConfig, ServerConfig};
35
36pub use crate::session_config::SessionInitConfig;
37pub mod udf;
38pub use udf::UdfConfig;
39pub mod storage;
40pub use storage::{
41 CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
42 extract_storage_memory_config,
43};
44pub mod merge;
45pub mod mutate;
46pub mod none_as_empty_string;
47pub mod system;
48pub mod utils;
49
50use std::collections::BTreeMap;
51use std::fs;
52use std::num::NonZeroUsize;
53
54use anyhow::Context;
55use clap::ValueEnum;
56use educe::Educe;
57pub use merge::*;
58use risingwave_common_proc_macro::ConfigDoc;
59pub use risingwave_common_proc_macro::OverrideConfig;
60use risingwave_pb::meta::SystemParams;
61use serde::{Deserialize, Serialize, Serializer};
62use serde_default::DefaultFromSerde;
63use serde_json::Value;
64pub use system::SystemConfig;
65pub use utils::*;
66
67use crate::for_all_params;
68
69pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
72pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; #[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
79#[educe(Debug)]
80pub struct RwConfig {
81 #[serde(default)]
82 #[config_doc(nested)]
83 pub server: ServerConfig,
84
85 #[serde(default)]
86 #[config_doc(nested)]
87 pub meta: MetaConfig,
88
89 #[serde(default)]
90 #[config_doc(nested)]
91 pub batch: BatchConfig,
92
93 #[serde(default)]
94 #[config_doc(nested)]
95 pub frontend: FrontendConfig,
96
97 #[serde(default)]
98 #[config_doc(nested)]
99 pub streaming: StreamingConfig,
100
101 #[serde(default)]
102 #[config_doc(nested)]
103 pub storage: StorageConfig,
104
105 #[serde(default)]
106 #[educe(Debug(ignore))]
107 #[config_doc(nested)]
108 pub system: SystemConfig,
109
110 #[serde(default)]
111 #[config_doc(nested)]
112 pub udf: UdfConfig,
113
114 #[serde(default)]
115 #[config_doc(nested)]
116 pub session_init: SessionInitConfig,
117
118 #[serde(flatten)]
119 #[config_doc(omitted)]
120 pub unrecognized: Unrecognized<Self>,
121}
122
123#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
130#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
131pub struct RpcClientConfig {
132 #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
133 pub connect_timeout_secs: u64,
134 #[serde(default = "default::developer::rpc_client_pool_setup_concurrency")]
137 pub pool_setup_concurrency: usize,
138}
139
140pub use risingwave_common_metrics::MetricLevel;
141
142impl RwConfig {
143 pub const fn default_connection_pool_size(&self) -> u16 {
144 self.server.connection_pool_size
145 }
146
147 pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
150 self.streaming
151 .developer
152 .exchange_connection_pool_size
153 .unwrap_or_else(|| self.default_connection_pool_size())
154 }
155
156 pub fn batch_exchange_connection_pool_size(&self) -> u16 {
159 self.batch
160 .developer
161 .exchange_connection_pool_size
162 .unwrap_or_else(|| self.default_connection_pool_size())
163 }
164}
165
166pub mod default {
167
168 pub mod developer {
169 pub fn meta_cached_traces_num() -> u32 {
170 256
171 }
172
173 pub fn meta_cached_traces_memory_limit_bytes() -> usize {
174 1 << 27 }
176
177 pub fn batch_output_channel_size() -> usize {
178 64
179 }
180
181 pub fn batch_receiver_channel_size() -> usize {
182 1000
183 }
184
185 pub fn batch_root_stage_channel_size() -> usize {
186 100
187 }
188
189 pub fn batch_chunk_size() -> usize {
190 1024
191 }
192
193 pub fn batch_local_execute_buffer_size() -> usize {
194 64
195 }
196
197 pub fn batch_exchange_connection_pool_size() -> Option<u16> {
200 None
201 }
202
203 pub fn stream_enable_executor_row_count() -> bool {
204 false
205 }
206
207 pub fn connector_message_buffer_size() -> usize {
208 16
209 }
210
211 pub fn unsafe_stream_extreme_cache_size() -> usize {
212 10
213 }
214
215 pub fn stream_topn_cache_min_capacity() -> usize {
216 10
217 }
218
219 pub fn stream_chunk_size() -> usize {
220 256
221 }
222
223 pub fn stream_exchange_initial_permits() -> usize {
224 2048
225 }
226
227 pub fn stream_exchange_batched_permits() -> usize {
228 256
229 }
230
231 pub fn stream_exchange_concurrent_barriers() -> usize {
232 1
233 }
234
235 pub fn stream_exchange_concurrent_dispatchers() -> usize {
236 0
237 }
238
239 pub fn stream_dml_channel_initial_permits() -> usize {
240 32768
241 }
242
243 pub fn stream_max_barrier_batch_size() -> u32 {
244 1024
245 }
246
247 pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
248 64 << 20 }
250
251 pub fn enable_trivial_move() -> bool {
252 true
253 }
254
255 pub fn enable_check_task_level_overlap() -> bool {
256 false
257 }
258
259 pub fn max_trivial_move_task_count_per_loop() -> usize {
260 256
261 }
262
263 pub fn max_get_task_probe_times() -> usize {
264 5
265 }
266
267 pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
268 100
269 }
270
271 pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
272 400
273 }
274
275 pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
276 10_000
277 }
278
279 pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
280 100
281 }
282
283 pub fn time_travel_vacuum_interval_sec() -> u64 {
284 30
285 }
286
287 pub fn time_travel_vacuum_max_version_count() -> Option<u32> {
288 Some(10000)
289 }
290
291 pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
292 1000
293 }
294
295 pub fn hummock_time_travel_delta_fetch_batch_size() -> usize {
296 100
297 }
298
299 pub fn hummock_gc_history_insert_batch_size() -> usize {
300 1000
301 }
302
303 pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
304 1000
305 }
306
307 pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
308 false
309 }
310
311 pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
312 10
313 }
314
315 pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
316 1000
317 }
318
319 pub fn memory_controller_threshold_aggressive() -> f64 {
320 0.9
321 }
322
323 pub fn memory_controller_threshold_graceful() -> f64 {
324 0.81
325 }
326
327 pub fn memory_controller_threshold_stable() -> f64 {
328 0.72
329 }
330
331 pub fn memory_controller_eviction_factor_aggressive() -> f64 {
332 2.0
333 }
334
335 pub fn memory_controller_eviction_factor_graceful() -> f64 {
336 1.5
337 }
338
339 pub fn memory_controller_eviction_factor_stable() -> f64 {
340 1.0
341 }
342
343 pub fn memory_controller_update_interval_ms() -> usize {
344 100
345 }
346
347 pub fn memory_controller_sequence_tls_step() -> u64 {
348 128
349 }
350
351 pub fn memory_controller_sequence_tls_lag() -> u64 {
352 32
353 }
354
355 pub fn stream_enable_arrangement_backfill() -> bool {
356 true
357 }
358
359 pub fn stream_enable_snapshot_backfill() -> bool {
360 true
361 }
362
363 pub fn enable_shared_source() -> bool {
364 true
365 }
366
367 pub fn stream_high_join_amplification_threshold() -> usize {
368 2048
369 }
370
371 pub fn stream_high_gap_fill_amplification_threshold() -> usize {
372 2048
373 }
374
375 pub fn stream_exchange_connection_pool_size() -> Option<u16> {
377 Some(1)
378 }
379
380 pub fn enable_actor_tokio_metrics() -> bool {
381 true
382 }
383
384 pub fn stream_enable_auto_schema_change() -> bool {
385 true
386 }
387
388 pub fn switch_jdbc_pg_to_native() -> bool {
389 false
390 }
391
392 pub fn streaming_hash_join_entry_state_max_rows() -> usize {
393 30000
395 }
396
397 pub fn streaming_join_hash_map_evict_interval_rows() -> u32 {
398 16
399 }
400
401 pub fn streaming_now_progress_ratio() -> Option<f32> {
402 None
403 }
404
405 pub fn stream_snapshot_iter_rebuild_interval_secs() -> u64 {
406 10 * 60
407 }
408
409 pub fn enable_explain_analyze_stats() -> bool {
410 true
411 }
412
413 pub fn rpc_client_connect_timeout_secs() -> u64 {
414 5
415 }
416
417 pub fn rpc_client_pool_setup_concurrency() -> usize {
418 0
419 }
420
421 pub fn iceberg_list_interval_sec() -> u64 {
422 10
423 }
424
425 pub fn iceberg_fetch_batch_size() -> u64 {
426 1024
427 }
428
429 pub fn iceberg_sink_positional_delete_cache_size() -> usize {
430 1024
431 }
432
433 pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
434 100_000
435 }
436
437 pub fn materialize_force_overwrite_on_no_check() -> bool {
438 false
439 }
440
441 pub fn refresh_scheduler_interval_sec() -> u64 {
442 60
443 }
444
445 pub fn sync_log_store_pause_duration_ms() -> usize {
446 64
447 }
448
449 pub fn sync_log_store_buffer_size() -> usize {
450 2048
451 }
452
453 pub fn disable_sync_log_store_dispatcher() -> bool {
454 false
455 }
456
457 pub fn table_change_log_insert_batch_size() -> u64 {
458 1000
459 }
460
461 pub fn table_change_log_delete_batch_size() -> u64 {
462 1000
463 }
464
465 pub fn enable_state_table_vnode_stats_pruning() -> bool {
466 false
467 }
468
469 pub fn enable_vnode_key_stats_for_materialize() -> bool {
470 false
471 }
472
473 pub fn max_concurrent_kv_log_store_historical_read() -> usize {
474 0
475 }
476 }
477}
478
479pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
480pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
481pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; #[cfg(test)]
484pub mod tests {
485 use expect_test::expect;
486 use risingwave_license::LicenseKey;
487
488 use super::*;
489
490 fn default_config_for_docs() -> RwConfig {
491 let mut config = RwConfig::default();
492 config.system.license_key = Some(LicenseKey::empty());
494 config.frontend.unsafe_enable_local_fs_connector = false;
496 config
497 }
498
499 #[test]
503 fn test_example_up_to_date() {
504 const HEADER: &str = "# This file is generated by ./risedev generate-example-config
505# Check detailed comments in src/common/src/config.rs";
506
507 let actual = expect_test::expect_file!["../../../config/example.toml"];
508 let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
509
510 let expected = format!("{HEADER}\n\n{default}");
511 actual.assert_eq(&expected);
512
513 let expected = rw_config_to_markdown();
514 let actual = expect_test::expect_file!["../../../config/docs.md"];
515 actual.assert_eq(&expected);
516 }
517
518 #[test]
519 fn test_session_init_entries_distinguishes_omitted_from_default() {
520 let config: RwConfig = toml::from_str(
521 r#"
522 [session_init]
523 streaming_parallelism = "bounded(8)"
524 streaming_parallelism_for_table = "default"
525 "#,
526 )
527 .unwrap();
528
529 assert_eq!(
531 config.session_init.streaming_parallelism.as_deref(),
532 Some("bounded(8)")
533 );
534 assert_eq!(
535 config
536 .session_init
537 .streaming_parallelism_for_table
538 .as_deref(),
539 Some("default")
540 );
541 assert_eq!(config.session_init.streaming_parallelism_for_sink, None);
542
543 assert_eq!(
545 config.session_init.entries(),
546 vec![
547 ("streaming_parallelism", "bounded(8)"),
548 ("streaming_parallelism_for_table", "default"),
549 ]
550 );
551 }
552
553 #[test]
554 fn test_session_init_default_is_empty() {
555 assert!(SessionInitConfig::default().entries().is_empty());
556 }
557
558 #[test]
559 fn test_session_init_rejects_unrecognized_key() {
560 let err = toml::from_str::<RwConfig>(
561 r#"
562 [session_init]
563 streaming_parallelism = "bounded(8)"
564 not_a_real_param = "oops"
565 "#,
566 )
567 .unwrap_err();
568
569 assert!(err.to_string().contains("unknown field `not_a_real_param`"));
570 }
571
572 #[derive(Debug)]
573 struct ConfigItemDoc {
574 desc: String,
575 default: String,
576 }
577
578 fn rw_config_to_markdown() -> String {
579 let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
580 RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
581
582 let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
584 .into_iter()
585 .map(|(k, v)| {
586 let docs: BTreeMap<String, ConfigItemDoc> = v
587 .into_iter()
588 .map(|(name, desc)| {
589 (
590 name,
591 ConfigItemDoc {
592 desc,
593 default: "".to_owned(), },
595 )
596 })
597 .collect();
598 (k, docs)
599 })
600 .collect();
601
602 let toml_doc: BTreeMap<String, toml::Value> =
603 toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
604 toml_doc.into_iter().for_each(|(name, value)| {
605 set_default_values("".to_owned(), name, value, &mut configs);
606 });
607
608 let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
609 + "This page is automatically generated by `./risedev generate-example-config`\n";
610 for (section, configs) in configs {
611 if configs.is_empty() {
612 continue;
613 }
614 markdown.push_str(&format!("\n## {}\n\n", section));
615 markdown.push_str("| Config | Description | Default |\n");
616 markdown.push_str("|--------|-------------|---------|\n");
617 for (config, doc) in configs {
618 markdown.push_str(&format!(
619 "| {} | {} | {} |\n",
620 config, doc.desc, doc.default
621 ));
622 }
623 }
624 markdown
625 }
626
627 fn set_default_values(
628 section: String,
629 name: String,
630 value: toml::Value,
631 configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
632 ) {
633 if let toml::Value::Table(table) = value {
635 let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
636 let sub_section = if section.is_empty() {
637 name
638 } else {
639 format!("{}.{}", section, name)
640 };
641 section_configs
642 .into_iter()
643 .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
644 } else if let Some(t) = configs.get_mut(§ion)
645 && let Some(item_doc) = t.get_mut(&name)
646 {
647 item_doc.default = format!("{}", value);
648 }
649 }
650
651 #[test]
652 fn test_object_store_configs_backward_compatibility() {
653 {
655 let config: RwConfig = toml::from_str(
656 r#"
657 [storage.object_store]
658 object_store_set_atomic_write_dir = true
659
660 [storage.object_store.s3]
661 object_store_keepalive_ms = 1
662 object_store_send_buffer_size = 1
663 object_store_recv_buffer_size = 1
664 object_store_nodelay = false
665
666 [storage.object_store.s3.developer]
667 object_store_retry_unknown_service_error = true
668 object_store_retryable_service_error_codes = ['dummy']
669
670
671 "#,
672 )
673 .unwrap();
674
675 assert!(config.storage.object_store.set_atomic_write_dir);
676 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
677 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
678 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
679 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
680 assert!(
681 config
682 .storage
683 .object_store
684 .s3
685 .developer
686 .retry_unknown_service_error
687 );
688 assert_eq!(
689 config
690 .storage
691 .object_store
692 .s3
693 .developer
694 .retryable_service_error_codes,
695 vec!["dummy".to_owned()]
696 );
697 }
698
699 {
701 let config: RwConfig = toml::from_str(
702 r#"
703 [storage.object_store]
704 set_atomic_write_dir = true
705
706 [storage.object_store.s3]
707 keepalive_ms = 1
708 send_buffer_size = 1
709 recv_buffer_size = 1
710 nodelay = false
711
712 [storage.object_store.s3.developer]
713 retry_unknown_service_error = true
714 retryable_service_error_codes = ['dummy']
715
716
717 "#,
718 )
719 .unwrap();
720
721 assert!(config.storage.object_store.set_atomic_write_dir);
722 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
723 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
724 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
725 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
726 assert!(
727 config
728 .storage
729 .object_store
730 .s3
731 .developer
732 .retry_unknown_service_error
733 );
734 assert_eq!(
735 config
736 .storage
737 .object_store
738 .s3
739 .developer
740 .retryable_service_error_codes,
741 vec!["dummy".to_owned()]
742 );
743 }
744 }
745
746 #[test]
747 fn test_meta_configs_backward_compatibility() {
748 {
750 let config: RwConfig = toml::from_str(
751 r#"
752 [meta]
753 periodic_split_compact_group_interval_sec = 1
754 table_write_throughput_threshold = 10
755 min_table_split_write_throughput = 5
756 "#,
757 )
758 .unwrap();
759
760 assert_eq!(
761 config
762 .meta
763 .periodic_scheduling_compaction_group_split_interval_sec,
764 1
765 );
766 assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
767 assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
768 }
769 }
770
771 #[test]
772 fn test_meta_max_normalize_splits_per_round_must_be_positive() {
773 let config = toml::from_str::<RwConfig>(
774 r#"
775 [meta]
776 max_normalize_splits_per_round = 0
777 "#,
778 )
779 .unwrap_err();
780
781 expect![[r#"
782 TOML parse error at line 3, column 46
783 |
784 3 | max_normalize_splits_per_round = 0
785 | ^
786 meta.max_normalize_splits_per_round must be greater than 0
787 "#]]
788 .assert_eq(&config.to_string());
789 }
790
791 #[test]
794 fn test_prefix_alias() {
795 let config: RwConfig = toml::from_str(
796 "
797 [streaming.developer]
798 stream_chunk_size = 114514
799
800 [streaming.developer.stream_compute_client_config]
801 connect_timeout_secs = 42
802 pool_setup_concurrency = 10
803 ",
804 )
805 .unwrap();
806
807 assert_eq!(config.streaming.developer.chunk_size, 114514);
808 assert_eq!(
809 config
810 .streaming
811 .developer
812 .compute_client_config
813 .connect_timeout_secs,
814 42
815 );
816 assert_eq!(
817 config
818 .streaming
819 .developer
820 .compute_client_config
821 .pool_setup_concurrency,
822 10
823 );
824 }
825
826 #[test]
827 fn test_prefix_alias_duplicate() {
828 let config = toml::from_str::<RwConfig>(
829 "
830 [streaming.developer]
831 stream_chunk_size = 114514
832 chunk_size = 1919810
833 ",
834 )
835 .unwrap_err();
836
837 expect![[r#"
838 TOML parse error at line 2, column 13
839 |
840 2 | [streaming.developer]
841 | ^^^^^^^^^^^^^^^^^^^^^
842 duplicate field `chunk_size`
843 "#]]
844 .assert_eq(&config.to_string());
845
846 let config = toml::from_str::<RwConfig>(
847 "
848 [streaming.developer.stream_compute_client_config]
849 connect_timeout_secs = 5
850
851 [streaming.developer.compute_client_config]
852 connect_timeout_secs = 10
853 ",
854 )
855 .unwrap_err();
856
857 expect![[r#"
858 TOML parse error at line 2, column 24
859 |
860 2 | [streaming.developer.stream_compute_client_config]
861 | ^^^^^^^^^
862 duplicate field `compute_client_config`
863 "#]]
864 .assert_eq(&config.to_string());
865 }
866
867 #[test]
868 fn test_storage_max_prefetch_block_number_must_be_positive() {
869 let config = toml::from_str::<RwConfig>(
870 r#"
871 [storage]
872 max_prefetch_block_number = 0
873 "#,
874 )
875 .unwrap_err();
876
877 expect![[r#"
878 TOML parse error at line 3, column 41
879 |
880 3 | max_prefetch_block_number = 0
881 | ^
882 storage.max_prefetch_block_number must be greater than 0
883 "#]]
884 .assert_eq(&config.to_string());
885 }
886
887 #[test]
888 fn test_iceberg_compaction_enable_prefetch_default_is_false() {
889 let config = StorageConfig::default();
890 assert!(
891 !config.iceberg_compaction_enable_prefetch,
892 "enable_prefetch must default to false to avoid unexpected memory usage in existing deployments"
893 );
894 }
895
896 #[test]
897 fn test_storage_iceberg_compaction_pull_interval_ms_must_be_positive() {
898 let config = toml::from_str::<RwConfig>(
899 r#"
900 [storage]
901 iceberg_compaction_pull_interval_ms = 0
902 "#,
903 )
904 .unwrap_err();
905
906 expect![[r#"
907 TOML parse error at line 3, column 51
908 |
909 3 | iceberg_compaction_pull_interval_ms = 0
910 | ^
911 storage.iceberg_compaction_pull_interval_ms must be greater than 0
912 "#]]
913 .assert_eq(&config.to_string());
914 }
915}