risingwave_common/config/
mod.rs1pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{
28 CheckpointCompression, CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig,
29 MetaStoreConfig,
30};
31pub mod streaming;
32pub use streaming::{AsyncStackTraceOption, StreamingConfig};
33pub mod server;
34pub use server::{HeapProfilingConfig, ServerConfig};
35pub mod udf;
36pub use udf::UdfConfig;
37pub mod storage;
38pub use storage::{
39 CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
40 extract_storage_memory_config,
41};
42pub mod merge;
43pub mod mutate;
44pub mod none_as_empty_string;
45pub mod system;
46pub mod utils;
47
48use std::collections::BTreeMap;
49use std::fs;
50use std::num::NonZeroUsize;
51
52use anyhow::Context;
53use clap::ValueEnum;
54use educe::Educe;
55pub use merge::*;
56use risingwave_common_proc_macro::ConfigDoc;
57pub use risingwave_common_proc_macro::OverrideConfig;
58use risingwave_pb::meta::SystemParams;
59use serde::{Deserialize, Serialize, Serializer};
60use serde_default::DefaultFromSerde;
61use serde_json::Value;
62pub use system::SystemConfig;
63pub use utils::*;
64
65use crate::for_all_params;
66
67pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
70pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; #[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
77#[educe(Debug)]
78pub struct RwConfig {
79 #[serde(default)]
80 #[config_doc(nested)]
81 pub server: ServerConfig,
82
83 #[serde(default)]
84 #[config_doc(nested)]
85 pub meta: MetaConfig,
86
87 #[serde(default)]
88 #[config_doc(nested)]
89 pub batch: BatchConfig,
90
91 #[serde(default)]
92 #[config_doc(nested)]
93 pub frontend: FrontendConfig,
94
95 #[serde(default)]
96 #[config_doc(nested)]
97 pub streaming: StreamingConfig,
98
99 #[serde(default)]
100 #[config_doc(nested)]
101 pub storage: StorageConfig,
102
103 #[serde(default)]
104 #[educe(Debug(ignore))]
105 #[config_doc(nested)]
106 pub system: SystemConfig,
107
108 #[serde(default)]
109 #[config_doc(nested)]
110 pub udf: UdfConfig,
111
112 #[serde(flatten)]
113 #[config_doc(omitted)]
114 pub unrecognized: Unrecognized<Self>,
115}
116
117#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
124#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
125pub struct RpcClientConfig {
126 #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
127 pub connect_timeout_secs: u64,
128}
129
130pub use risingwave_common_metrics::MetricLevel;
131
132impl RwConfig {
133 pub const fn default_connection_pool_size(&self) -> u16 {
134 self.server.connection_pool_size
135 }
136
137 pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
140 self.streaming
141 .developer
142 .exchange_connection_pool_size
143 .unwrap_or_else(|| self.default_connection_pool_size())
144 }
145
146 pub fn batch_exchange_connection_pool_size(&self) -> u16 {
149 self.batch
150 .developer
151 .exchange_connection_pool_size
152 .unwrap_or_else(|| self.default_connection_pool_size())
153 }
154}
155
156pub mod default {
157
158 pub mod developer {
159 pub fn meta_cached_traces_num() -> u32 {
160 256
161 }
162
163 pub fn meta_cached_traces_memory_limit_bytes() -> usize {
164 1 << 27 }
166
167 pub fn batch_output_channel_size() -> usize {
168 64
169 }
170
171 pub fn batch_receiver_channel_size() -> usize {
172 1000
173 }
174
175 pub fn batch_root_stage_channel_size() -> usize {
176 100
177 }
178
179 pub fn batch_chunk_size() -> usize {
180 1024
181 }
182
183 pub fn batch_local_execute_buffer_size() -> usize {
184 64
185 }
186
187 pub fn batch_exchange_connection_pool_size() -> Option<u16> {
190 None
191 }
192
193 pub fn stream_enable_executor_row_count() -> bool {
194 false
195 }
196
197 pub fn connector_message_buffer_size() -> usize {
198 16
199 }
200
201 pub fn unsafe_stream_extreme_cache_size() -> usize {
202 10
203 }
204
205 pub fn stream_topn_cache_min_capacity() -> usize {
206 10
207 }
208
209 pub fn stream_chunk_size() -> usize {
210 256
211 }
212
213 pub fn stream_exchange_initial_permits() -> usize {
214 2048
215 }
216
217 pub fn stream_exchange_batched_permits() -> usize {
218 256
219 }
220
221 pub fn stream_exchange_concurrent_barriers() -> usize {
222 1
223 }
224
225 pub fn stream_exchange_concurrent_dispatchers() -> usize {
226 0
227 }
228
229 pub fn stream_dml_channel_initial_permits() -> usize {
230 32768
231 }
232
233 pub fn stream_max_barrier_batch_size() -> u32 {
234 1024
235 }
236
237 pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
238 64 << 20 }
240
241 pub fn enable_trivial_move() -> bool {
242 true
243 }
244
245 pub fn enable_check_task_level_overlap() -> bool {
246 false
247 }
248
249 pub fn max_trivial_move_task_count_per_loop() -> usize {
250 256
251 }
252
253 pub fn max_get_task_probe_times() -> usize {
254 5
255 }
256
257 pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
258 100
259 }
260
261 pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
262 400
263 }
264
265 pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
266 10_000
267 }
268
269 pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
270 100
271 }
272
273 pub fn time_travel_vacuum_interval_sec() -> u64 {
274 30
275 }
276
277 pub fn time_travel_vacuum_max_version_count() -> Option<u32> {
278 Some(10000)
279 }
280
281 pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
282 1000
283 }
284
285 pub fn hummock_gc_history_insert_batch_size() -> usize {
286 1000
287 }
288
289 pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
290 1000
291 }
292
293 pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
294 false
295 }
296
297 pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
298 10
299 }
300
301 pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
302 1000
303 }
304
305 pub fn memory_controller_threshold_aggressive() -> f64 {
306 0.9
307 }
308
309 pub fn memory_controller_threshold_graceful() -> f64 {
310 0.81
311 }
312
313 pub fn memory_controller_threshold_stable() -> f64 {
314 0.72
315 }
316
317 pub fn memory_controller_eviction_factor_aggressive() -> f64 {
318 2.0
319 }
320
321 pub fn memory_controller_eviction_factor_graceful() -> f64 {
322 1.5
323 }
324
325 pub fn memory_controller_eviction_factor_stable() -> f64 {
326 1.0
327 }
328
329 pub fn memory_controller_update_interval_ms() -> usize {
330 100
331 }
332
333 pub fn memory_controller_sequence_tls_step() -> u64 {
334 128
335 }
336
337 pub fn memory_controller_sequence_tls_lag() -> u64 {
338 32
339 }
340
341 pub fn stream_enable_arrangement_backfill() -> bool {
342 true
343 }
344
345 pub fn stream_enable_snapshot_backfill() -> bool {
346 true
347 }
348
349 pub fn enable_shared_source() -> bool {
350 true
351 }
352
353 pub fn stream_high_join_amplification_threshold() -> usize {
354 2048
355 }
356
357 pub fn stream_exchange_connection_pool_size() -> Option<u16> {
359 Some(1)
360 }
361
362 pub fn enable_actor_tokio_metrics() -> bool {
363 true
364 }
365
366 pub fn stream_enable_auto_schema_change() -> bool {
367 true
368 }
369
370 pub fn switch_jdbc_pg_to_native() -> bool {
371 false
372 }
373
374 pub fn streaming_hash_join_entry_state_max_rows() -> usize {
375 30000
377 }
378
379 pub fn streaming_now_progress_ratio() -> Option<f32> {
380 None
381 }
382
383 pub fn stream_snapshot_iter_rebuild_interval_secs() -> u64 {
384 10 * 60
385 }
386
387 pub fn enable_explain_analyze_stats() -> bool {
388 true
389 }
390
391 pub fn rpc_client_connect_timeout_secs() -> u64 {
392 5
393 }
394
395 pub fn iceberg_list_interval_sec() -> u64 {
396 10
397 }
398
399 pub fn iceberg_fetch_batch_size() -> u64 {
400 1024
401 }
402
403 pub fn iceberg_sink_positional_delete_cache_size() -> usize {
404 1024
405 }
406
407 pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
408 100_000
409 }
410
411 pub fn materialize_force_overwrite_on_no_check() -> bool {
412 false
413 }
414
415 pub fn refresh_scheduler_interval_sec() -> u64 {
416 60
417 }
418
419 pub fn sync_log_store_pause_duration_ms() -> usize {
420 64
421 }
422
423 pub fn sync_log_store_buffer_size() -> usize {
424 2048
425 }
426
427 pub fn enable_state_table_vnode_stats_pruning() -> bool {
428 false
429 }
430 }
431}
432
433pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
434pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
435pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; #[cfg(test)]
438pub mod tests {
439 use expect_test::expect;
440 use risingwave_license::LicenseKey;
441
442 use super::*;
443
444 fn default_config_for_docs() -> RwConfig {
445 let mut config = RwConfig::default();
446 config.system.license_key = Some(LicenseKey::empty());
448 config
449 }
450
451 #[test]
455 fn test_example_up_to_date() {
456 const HEADER: &str = "# This file is generated by ./risedev generate-example-config
457# Check detailed comments in src/common/src/config.rs";
458
459 let actual = expect_test::expect_file!["../../../config/example.toml"];
460 let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
461
462 let expected = format!("{HEADER}\n\n{default}");
463 actual.assert_eq(&expected);
464
465 let expected = rw_config_to_markdown();
466 let actual = expect_test::expect_file!["../../../config/docs.md"];
467 actual.assert_eq(&expected);
468 }
469
470 #[derive(Debug)]
471 struct ConfigItemDoc {
472 desc: String,
473 default: String,
474 }
475
476 fn rw_config_to_markdown() -> String {
477 let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
478 RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
479
480 let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
482 .into_iter()
483 .map(|(k, v)| {
484 let docs: BTreeMap<String, ConfigItemDoc> = v
485 .into_iter()
486 .map(|(name, desc)| {
487 (
488 name,
489 ConfigItemDoc {
490 desc,
491 default: "".to_owned(), },
493 )
494 })
495 .collect();
496 (k, docs)
497 })
498 .collect();
499
500 let toml_doc: BTreeMap<String, toml::Value> =
501 toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
502 toml_doc.into_iter().for_each(|(name, value)| {
503 set_default_values("".to_owned(), name, value, &mut configs);
504 });
505
506 let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
507 + "This page is automatically generated by `./risedev generate-example-config`\n";
508 for (section, configs) in configs {
509 if configs.is_empty() {
510 continue;
511 }
512 markdown.push_str(&format!("\n## {}\n\n", section));
513 markdown.push_str("| Config | Description | Default |\n");
514 markdown.push_str("|--------|-------------|---------|\n");
515 for (config, doc) in configs {
516 markdown.push_str(&format!(
517 "| {} | {} | {} |\n",
518 config, doc.desc, doc.default
519 ));
520 }
521 }
522 markdown
523 }
524
525 fn set_default_values(
526 section: String,
527 name: String,
528 value: toml::Value,
529 configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
530 ) {
531 if let toml::Value::Table(table) = value {
533 let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
534 let sub_section = if section.is_empty() {
535 name
536 } else {
537 format!("{}.{}", section, name)
538 };
539 section_configs
540 .into_iter()
541 .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
542 } else if let Some(t) = configs.get_mut(§ion)
543 && let Some(item_doc) = t.get_mut(&name)
544 {
545 item_doc.default = format!("{}", value);
546 }
547 }
548
549 #[test]
550 fn test_object_store_configs_backward_compatibility() {
551 {
553 let config: RwConfig = toml::from_str(
554 r#"
555 [storage.object_store]
556 object_store_set_atomic_write_dir = true
557
558 [storage.object_store.s3]
559 object_store_keepalive_ms = 1
560 object_store_send_buffer_size = 1
561 object_store_recv_buffer_size = 1
562 object_store_nodelay = false
563
564 [storage.object_store.s3.developer]
565 object_store_retry_unknown_service_error = true
566 object_store_retryable_service_error_codes = ['dummy']
567
568
569 "#,
570 )
571 .unwrap();
572
573 assert!(config.storage.object_store.set_atomic_write_dir);
574 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
575 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
576 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
577 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
578 assert!(
579 config
580 .storage
581 .object_store
582 .s3
583 .developer
584 .retry_unknown_service_error
585 );
586 assert_eq!(
587 config
588 .storage
589 .object_store
590 .s3
591 .developer
592 .retryable_service_error_codes,
593 vec!["dummy".to_owned()]
594 );
595 }
596
597 {
599 let config: RwConfig = toml::from_str(
600 r#"
601 [storage.object_store]
602 set_atomic_write_dir = true
603
604 [storage.object_store.s3]
605 keepalive_ms = 1
606 send_buffer_size = 1
607 recv_buffer_size = 1
608 nodelay = false
609
610 [storage.object_store.s3.developer]
611 retry_unknown_service_error = true
612 retryable_service_error_codes = ['dummy']
613
614
615 "#,
616 )
617 .unwrap();
618
619 assert!(config.storage.object_store.set_atomic_write_dir);
620 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
621 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
622 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
623 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
624 assert!(
625 config
626 .storage
627 .object_store
628 .s3
629 .developer
630 .retry_unknown_service_error
631 );
632 assert_eq!(
633 config
634 .storage
635 .object_store
636 .s3
637 .developer
638 .retryable_service_error_codes,
639 vec!["dummy".to_owned()]
640 );
641 }
642 }
643
644 #[test]
645 fn test_meta_configs_backward_compatibility() {
646 {
648 let config: RwConfig = toml::from_str(
649 r#"
650 [meta]
651 periodic_split_compact_group_interval_sec = 1
652 table_write_throughput_threshold = 10
653 min_table_split_write_throughput = 5
654 "#,
655 )
656 .unwrap();
657
658 assert_eq!(
659 config
660 .meta
661 .periodic_scheduling_compaction_group_split_interval_sec,
662 1
663 );
664 assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
665 assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
666 }
667 }
668
669 #[test]
672 fn test_prefix_alias() {
673 let config: RwConfig = toml::from_str(
674 "
675 [streaming.developer]
676 stream_chunk_size = 114514
677
678 [streaming.developer.stream_compute_client_config]
679 connect_timeout_secs = 42
680 ",
681 )
682 .unwrap();
683
684 assert_eq!(config.streaming.developer.chunk_size, 114514);
685 assert_eq!(
686 config
687 .streaming
688 .developer
689 .compute_client_config
690 .connect_timeout_secs,
691 42
692 );
693 }
694
695 #[test]
696 fn test_prefix_alias_duplicate() {
697 let config = toml::from_str::<RwConfig>(
698 "
699 [streaming.developer]
700 stream_chunk_size = 114514
701 chunk_size = 1919810
702 ",
703 )
704 .unwrap_err();
705
706 expect![[r#"
707 TOML parse error at line 2, column 13
708 |
709 2 | [streaming.developer]
710 | ^^^^^^^^^^^^^^^^^^^^^
711 duplicate field `chunk_size`
712 "#]]
713 .assert_eq(&config.to_string());
714
715 let config = toml::from_str::<RwConfig>(
716 "
717 [streaming.developer.stream_compute_client_config]
718 connect_timeout_secs = 5
719
720 [streaming.developer.compute_client_config]
721 connect_timeout_secs = 10
722 ",
723 )
724 .unwrap_err();
725
726 expect![[r#"
727 TOML parse error at line 2, column 24
728 |
729 2 | [streaming.developer.stream_compute_client_config]
730 | ^^^^^^^^^
731 duplicate field `compute_client_config`
732 "#]]
733 .assert_eq(&config.to_string());
734 }
735}