risingwave_common/config/
mod.rs1pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig, MetaStoreConfig};
28pub mod streaming;
29pub use streaming::{AsyncStackTraceOption, StreamingConfig};
30pub mod server;
31pub use server::{HeapProfilingConfig, ServerConfig};
32pub mod udf;
33pub use udf::UdfConfig;
34pub mod storage;
35pub use storage::{
36 CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
37 extract_storage_memory_config,
38};
39pub mod merge;
40pub mod mutate;
41pub mod none_as_empty_string;
42pub mod system;
43pub mod utils;
44
45use std::collections::BTreeMap;
46use std::fs;
47use std::num::NonZeroUsize;
48
49use anyhow::Context;
50use clap::ValueEnum;
51use educe::Educe;
52pub use merge::*;
53use risingwave_common_proc_macro::ConfigDoc;
54pub use risingwave_common_proc_macro::OverrideConfig;
55use risingwave_pb::meta::SystemParams;
56use serde::{Deserialize, Serialize, Serializer};
57use serde_default::DefaultFromSerde;
58use serde_json::Value;
59pub use system::SystemConfig;
60pub use utils::*;
61
62use crate::for_all_params;
63
64pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
67pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; #[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
74#[educe(Debug)]
75pub struct RwConfig {
76 #[serde(default)]
77 #[config_doc(nested)]
78 pub server: ServerConfig,
79
80 #[serde(default)]
81 #[config_doc(nested)]
82 pub meta: MetaConfig,
83
84 #[serde(default)]
85 #[config_doc(nested)]
86 pub batch: BatchConfig,
87
88 #[serde(default)]
89 #[config_doc(nested)]
90 pub frontend: FrontendConfig,
91
92 #[serde(default)]
93 #[config_doc(nested)]
94 pub streaming: StreamingConfig,
95
96 #[serde(default)]
97 #[config_doc(nested)]
98 pub storage: StorageConfig,
99
100 #[serde(default)]
101 #[educe(Debug(ignore))]
102 #[config_doc(nested)]
103 pub system: SystemConfig,
104
105 #[serde(default)]
106 #[config_doc(nested)]
107 pub udf: UdfConfig,
108
109 #[serde(flatten)]
110 #[config_doc(omitted)]
111 pub unrecognized: Unrecognized<Self>,
112}
113
114#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
121#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
122pub struct RpcClientConfig {
123 #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
124 pub connect_timeout_secs: u64,
125}
126
127pub use risingwave_common_metrics::MetricLevel;
128
129impl RwConfig {
130 pub const fn default_connection_pool_size(&self) -> u16 {
131 self.server.connection_pool_size
132 }
133
134 pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
137 self.streaming
138 .developer
139 .exchange_connection_pool_size
140 .unwrap_or_else(|| self.default_connection_pool_size())
141 }
142
143 pub fn batch_exchange_connection_pool_size(&self) -> u16 {
146 self.batch
147 .developer
148 .exchange_connection_pool_size
149 .unwrap_or_else(|| self.default_connection_pool_size())
150 }
151}
152
153pub mod default {
154
155 pub mod developer {
156 pub fn meta_cached_traces_num() -> u32 {
157 256
158 }
159
160 pub fn meta_cached_traces_memory_limit_bytes() -> usize {
161 1 << 27 }
163
164 pub fn batch_output_channel_size() -> usize {
165 64
166 }
167
168 pub fn batch_receiver_channel_size() -> usize {
169 1000
170 }
171
172 pub fn batch_root_stage_channel_size() -> usize {
173 100
174 }
175
176 pub fn batch_chunk_size() -> usize {
177 1024
178 }
179
180 pub fn batch_local_execute_buffer_size() -> usize {
181 64
182 }
183
184 pub fn batch_exchange_connection_pool_size() -> Option<u16> {
187 None
188 }
189
190 pub fn stream_enable_executor_row_count() -> bool {
191 false
192 }
193
194 pub fn connector_message_buffer_size() -> usize {
195 16
196 }
197
198 pub fn unsafe_stream_extreme_cache_size() -> usize {
199 10
200 }
201
202 pub fn stream_topn_cache_min_capacity() -> usize {
203 10
204 }
205
206 pub fn stream_chunk_size() -> usize {
207 256
208 }
209
210 pub fn stream_exchange_initial_permits() -> usize {
211 2048
212 }
213
214 pub fn stream_exchange_batched_permits() -> usize {
215 256
216 }
217
218 pub fn stream_exchange_concurrent_barriers() -> usize {
219 1
220 }
221
222 pub fn stream_exchange_concurrent_dispatchers() -> usize {
223 0
224 }
225
226 pub fn stream_dml_channel_initial_permits() -> usize {
227 32768
228 }
229
230 pub fn stream_max_barrier_batch_size() -> u32 {
231 1024
232 }
233
234 pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
235 64 << 20 }
237
238 pub fn enable_trivial_move() -> bool {
239 true
240 }
241
242 pub fn enable_check_task_level_overlap() -> bool {
243 false
244 }
245
246 pub fn max_trivial_move_task_count_per_loop() -> usize {
247 256
248 }
249
250 pub fn max_get_task_probe_times() -> usize {
251 5
252 }
253
254 pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
255 100
256 }
257
258 pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
259 400
260 }
261
262 pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
263 10_000
264 }
265
266 pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
267 100
268 }
269
270 pub fn time_travel_vacuum_interval_sec() -> u64 {
271 30
272 }
273
274 pub fn time_travel_vacuum_max_version_count() -> Option<u32> {
275 Some(10000)
276 }
277
278 pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
279 1000
280 }
281
282 pub fn hummock_gc_history_insert_batch_size() -> usize {
283 1000
284 }
285
286 pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
287 1000
288 }
289
290 pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
291 false
292 }
293
294 pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
295 10
296 }
297
298 pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
299 1000
300 }
301
302 pub fn memory_controller_threshold_aggressive() -> f64 {
303 0.9
304 }
305
306 pub fn memory_controller_threshold_graceful() -> f64 {
307 0.81
308 }
309
310 pub fn memory_controller_threshold_stable() -> f64 {
311 0.72
312 }
313
314 pub fn memory_controller_eviction_factor_aggressive() -> f64 {
315 2.0
316 }
317
318 pub fn memory_controller_eviction_factor_graceful() -> f64 {
319 1.5
320 }
321
322 pub fn memory_controller_eviction_factor_stable() -> f64 {
323 1.0
324 }
325
326 pub fn memory_controller_update_interval_ms() -> usize {
327 100
328 }
329
330 pub fn memory_controller_sequence_tls_step() -> u64 {
331 128
332 }
333
334 pub fn memory_controller_sequence_tls_lag() -> u64 {
335 32
336 }
337
338 pub fn stream_enable_arrangement_backfill() -> bool {
339 true
340 }
341
342 pub fn stream_enable_snapshot_backfill() -> bool {
343 true
344 }
345
346 pub fn enable_shared_source() -> bool {
347 true
348 }
349
350 pub fn stream_high_join_amplification_threshold() -> usize {
351 2048
352 }
353
354 pub fn stream_exchange_connection_pool_size() -> Option<u16> {
356 Some(1)
357 }
358
359 pub fn enable_actor_tokio_metrics() -> bool {
360 true
361 }
362
363 pub fn stream_enable_auto_schema_change() -> bool {
364 true
365 }
366
367 pub fn switch_jdbc_pg_to_native() -> bool {
368 false
369 }
370
371 pub fn streaming_hash_join_entry_state_max_rows() -> usize {
372 30000
374 }
375
376 pub fn streaming_now_progress_ratio() -> Option<f32> {
377 None
378 }
379
380 pub fn stream_snapshot_iter_rebuild_interval_secs() -> u64 {
381 10 * 60
382 }
383
384 pub fn enable_explain_analyze_stats() -> bool {
385 true
386 }
387
388 pub fn rpc_client_connect_timeout_secs() -> u64 {
389 5
390 }
391
392 pub fn iceberg_list_interval_sec() -> u64 {
393 10
394 }
395
396 pub fn iceberg_fetch_batch_size() -> u64 {
397 1024
398 }
399
400 pub fn iceberg_sink_positional_delete_cache_size() -> usize {
401 1024
402 }
403
404 pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
405 100_000
406 }
407
408 pub fn materialize_force_overwrite_on_no_check() -> bool {
409 false
410 }
411
412 pub fn refresh_scheduler_interval_sec() -> u64 {
413 60
414 }
415
416 pub fn sync_log_store_pause_duration_ms() -> usize {
417 64
418 }
419
420 pub fn sync_log_store_buffer_size() -> usize {
421 2048
422 }
423
424 pub fn enable_state_table_vnode_stats_pruning() -> bool {
425 false
426 }
427 }
428}
429
430pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
431pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
432pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; #[cfg(test)]
435pub mod tests {
436 use expect_test::expect;
437 use risingwave_license::LicenseKey;
438
439 use super::*;
440
441 fn default_config_for_docs() -> RwConfig {
442 let mut config = RwConfig::default();
443 config.system.license_key = Some(LicenseKey::empty());
445 config
446 }
447
448 #[test]
452 fn test_example_up_to_date() {
453 const HEADER: &str = "# This file is generated by ./risedev generate-example-config
454# Check detailed comments in src/common/src/config.rs";
455
456 let actual = expect_test::expect_file!["../../../config/example.toml"];
457 let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
458
459 let expected = format!("{HEADER}\n\n{default}");
460 actual.assert_eq(&expected);
461
462 let expected = rw_config_to_markdown();
463 let actual = expect_test::expect_file!["../../../config/docs.md"];
464 actual.assert_eq(&expected);
465 }
466
467 #[derive(Debug)]
468 struct ConfigItemDoc {
469 desc: String,
470 default: String,
471 }
472
473 fn rw_config_to_markdown() -> String {
474 let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
475 RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
476
477 let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
479 .into_iter()
480 .map(|(k, v)| {
481 let docs: BTreeMap<String, ConfigItemDoc> = v
482 .into_iter()
483 .map(|(name, desc)| {
484 (
485 name,
486 ConfigItemDoc {
487 desc,
488 default: "".to_owned(), },
490 )
491 })
492 .collect();
493 (k, docs)
494 })
495 .collect();
496
497 let toml_doc: BTreeMap<String, toml::Value> =
498 toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
499 toml_doc.into_iter().for_each(|(name, value)| {
500 set_default_values("".to_owned(), name, value, &mut configs);
501 });
502
503 let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
504 + "This page is automatically generated by `./risedev generate-example-config`\n";
505 for (section, configs) in configs {
506 if configs.is_empty() {
507 continue;
508 }
509 markdown.push_str(&format!("\n## {}\n\n", section));
510 markdown.push_str("| Config | Description | Default |\n");
511 markdown.push_str("|--------|-------------|---------|\n");
512 for (config, doc) in configs {
513 markdown.push_str(&format!(
514 "| {} | {} | {} |\n",
515 config, doc.desc, doc.default
516 ));
517 }
518 }
519 markdown
520 }
521
522 fn set_default_values(
523 section: String,
524 name: String,
525 value: toml::Value,
526 configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
527 ) {
528 if let toml::Value::Table(table) = value {
530 let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
531 let sub_section = if section.is_empty() {
532 name
533 } else {
534 format!("{}.{}", section, name)
535 };
536 section_configs
537 .into_iter()
538 .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
539 } else if let Some(t) = configs.get_mut(§ion)
540 && let Some(item_doc) = t.get_mut(&name)
541 {
542 item_doc.default = format!("{}", value);
543 }
544 }
545
546 #[test]
547 fn test_object_store_configs_backward_compatibility() {
548 {
550 let config: RwConfig = toml::from_str(
551 r#"
552 [storage.object_store]
553 object_store_set_atomic_write_dir = true
554
555 [storage.object_store.s3]
556 object_store_keepalive_ms = 1
557 object_store_send_buffer_size = 1
558 object_store_recv_buffer_size = 1
559 object_store_nodelay = false
560
561 [storage.object_store.s3.developer]
562 object_store_retry_unknown_service_error = true
563 object_store_retryable_service_error_codes = ['dummy']
564
565
566 "#,
567 )
568 .unwrap();
569
570 assert!(config.storage.object_store.set_atomic_write_dir);
571 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
572 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
573 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
574 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
575 assert!(
576 config
577 .storage
578 .object_store
579 .s3
580 .developer
581 .retry_unknown_service_error
582 );
583 assert_eq!(
584 config
585 .storage
586 .object_store
587 .s3
588 .developer
589 .retryable_service_error_codes,
590 vec!["dummy".to_owned()]
591 );
592 }
593
594 {
596 let config: RwConfig = toml::from_str(
597 r#"
598 [storage.object_store]
599 set_atomic_write_dir = true
600
601 [storage.object_store.s3]
602 keepalive_ms = 1
603 send_buffer_size = 1
604 recv_buffer_size = 1
605 nodelay = false
606
607 [storage.object_store.s3.developer]
608 retry_unknown_service_error = true
609 retryable_service_error_codes = ['dummy']
610
611
612 "#,
613 )
614 .unwrap();
615
616 assert!(config.storage.object_store.set_atomic_write_dir);
617 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
618 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
619 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
620 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
621 assert!(
622 config
623 .storage
624 .object_store
625 .s3
626 .developer
627 .retry_unknown_service_error
628 );
629 assert_eq!(
630 config
631 .storage
632 .object_store
633 .s3
634 .developer
635 .retryable_service_error_codes,
636 vec!["dummy".to_owned()]
637 );
638 }
639 }
640
641 #[test]
642 fn test_meta_configs_backward_compatibility() {
643 {
645 let config: RwConfig = toml::from_str(
646 r#"
647 [meta]
648 periodic_split_compact_group_interval_sec = 1
649 table_write_throughput_threshold = 10
650 min_table_split_write_throughput = 5
651 "#,
652 )
653 .unwrap();
654
655 assert_eq!(
656 config
657 .meta
658 .periodic_scheduling_compaction_group_split_interval_sec,
659 1
660 );
661 assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
662 assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
663 }
664 }
665
666 #[test]
669 fn test_prefix_alias() {
670 let config: RwConfig = toml::from_str(
671 "
672 [streaming.developer]
673 stream_chunk_size = 114514
674
675 [streaming.developer.stream_compute_client_config]
676 connect_timeout_secs = 42
677 ",
678 )
679 .unwrap();
680
681 assert_eq!(config.streaming.developer.chunk_size, 114514);
682 assert_eq!(
683 config
684 .streaming
685 .developer
686 .compute_client_config
687 .connect_timeout_secs,
688 42
689 );
690 }
691
692 #[test]
693 fn test_prefix_alias_duplicate() {
694 let config = toml::from_str::<RwConfig>(
695 "
696 [streaming.developer]
697 stream_chunk_size = 114514
698 chunk_size = 1919810
699 ",
700 )
701 .unwrap_err();
702
703 expect![[r#"
704 TOML parse error at line 2, column 13
705 |
706 2 | [streaming.developer]
707 | ^^^^^^^^^^^^^^^^^^^^^
708 duplicate field `chunk_size`
709 "#]]
710 .assert_eq(&config.to_string());
711
712 let config = toml::from_str::<RwConfig>(
713 "
714 [streaming.developer.stream_compute_client_config]
715 connect_timeout_secs = 5
716
717 [streaming.developer.compute_client_config]
718 connect_timeout_secs = 10
719 ",
720 )
721 .unwrap_err();
722
723 expect![[r#"
724 TOML parse error at line 2, column 24
725 |
726 2 | [streaming.developer.stream_compute_client_config]
727 | ^^^^^^^^^
728 duplicate field `compute_client_config`
729 "#]]
730 .assert_eq(&config.to_string());
731 }
732}