risingwave_common/config/
mod.rs1pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig, MetaStoreConfig};
28pub mod streaming;
29pub use streaming::{AsyncStackTraceOption, StreamingConfig};
30pub mod server;
31pub use server::{HeapProfilingConfig, ServerConfig};
32pub mod udf;
33pub use udf::UdfConfig;
34pub mod storage;
35pub use storage::{
36 CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
37 extract_storage_memory_config,
38};
39pub mod merge;
40pub mod mutate;
41pub mod system;
42pub mod utils;
43
44use std::collections::BTreeMap;
45use std::fs;
46use std::num::NonZeroUsize;
47
48use anyhow::Context;
49use clap::ValueEnum;
50use educe::Educe;
51pub use merge::*;
52use risingwave_common_proc_macro::ConfigDoc;
53pub use risingwave_common_proc_macro::OverrideConfig;
54use risingwave_pb::meta::SystemParams;
55use serde::{Deserialize, Serialize, Serializer};
56use serde_default::DefaultFromSerde;
57use serde_json::Value;
58pub use system::SystemConfig;
59pub use utils::*;
60
61use crate::for_all_params;
62
63pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
66pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; #[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
73#[educe(Debug)]
74pub struct RwConfig {
75 #[serde(default)]
76 #[config_doc(nested)]
77 pub server: ServerConfig,
78
79 #[serde(default)]
80 #[config_doc(nested)]
81 pub meta: MetaConfig,
82
83 #[serde(default)]
84 #[config_doc(nested)]
85 pub batch: BatchConfig,
86
87 #[serde(default)]
88 #[config_doc(nested)]
89 pub frontend: FrontendConfig,
90
91 #[serde(default)]
92 #[config_doc(nested)]
93 pub streaming: StreamingConfig,
94
95 #[serde(default)]
96 #[config_doc(nested)]
97 pub storage: StorageConfig,
98
99 #[serde(default)]
100 #[educe(Debug(ignore))]
101 #[config_doc(nested)]
102 pub system: SystemConfig,
103
104 #[serde(default)]
105 #[config_doc(nested)]
106 pub udf: UdfConfig,
107
108 #[serde(flatten)]
109 #[config_doc(omitted)]
110 pub unrecognized: Unrecognized<Self>,
111}
112
113#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
120pub struct RpcClientConfig {
121 #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
122 pub connect_timeout_secs: u64,
123}
124
125pub use risingwave_common_metrics::MetricLevel;
126
127impl RwConfig {
128 pub const fn default_connection_pool_size(&self) -> u16 {
129 self.server.connection_pool_size
130 }
131
132 pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
135 self.streaming
136 .developer
137 .exchange_connection_pool_size
138 .unwrap_or_else(|| self.default_connection_pool_size())
139 }
140
141 pub fn batch_exchange_connection_pool_size(&self) -> u16 {
144 self.batch
145 .developer
146 .exchange_connection_pool_size
147 .unwrap_or_else(|| self.default_connection_pool_size())
148 }
149}
150
151pub mod default {
152
153 pub mod developer {
154 pub fn meta_cached_traces_num() -> u32 {
155 256
156 }
157
158 pub fn meta_cached_traces_memory_limit_bytes() -> usize {
159 1 << 27 }
161
162 pub fn batch_output_channel_size() -> usize {
163 64
164 }
165
166 pub fn batch_receiver_channel_size() -> usize {
167 1000
168 }
169
170 pub fn batch_root_stage_channel_size() -> usize {
171 100
172 }
173
174 pub fn batch_chunk_size() -> usize {
175 1024
176 }
177
178 pub fn batch_local_execute_buffer_size() -> usize {
179 64
180 }
181
182 pub fn batch_exchange_connection_pool_size() -> Option<u16> {
185 None
186 }
187
188 pub fn stream_enable_executor_row_count() -> bool {
189 false
190 }
191
192 pub fn connector_message_buffer_size() -> usize {
193 16
194 }
195
196 pub fn unsafe_stream_extreme_cache_size() -> usize {
197 10
198 }
199
200 pub fn stream_topn_cache_min_capacity() -> usize {
201 10
202 }
203
204 pub fn stream_chunk_size() -> usize {
205 256
206 }
207
208 pub fn stream_exchange_initial_permits() -> usize {
209 2048
210 }
211
212 pub fn stream_exchange_batched_permits() -> usize {
213 256
214 }
215
216 pub fn stream_exchange_concurrent_barriers() -> usize {
217 1
218 }
219
220 pub fn stream_exchange_concurrent_dispatchers() -> usize {
221 0
222 }
223
224 pub fn stream_dml_channel_initial_permits() -> usize {
225 32768
226 }
227
228 pub fn stream_max_barrier_batch_size() -> u32 {
229 1024
230 }
231
232 pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
233 64 << 20 }
235
236 pub fn enable_trivial_move() -> bool {
237 true
238 }
239
240 pub fn enable_check_task_level_overlap() -> bool {
241 false
242 }
243
244 pub fn max_trivial_move_task_count_per_loop() -> usize {
245 256
246 }
247
248 pub fn max_get_task_probe_times() -> usize {
249 5
250 }
251
252 pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
253 100
254 }
255
256 pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
257 400
258 }
259
260 pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
261 10_000
262 }
263
264 pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
265 100
266 }
267
268 pub fn time_travel_vacuum_interval_sec() -> u64 {
269 30
270 }
271 pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
272 1000
273 }
274
275 pub fn hummock_gc_history_insert_batch_size() -> usize {
276 1000
277 }
278
279 pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
280 1000
281 }
282
283 pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
284 false
285 }
286
287 pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
288 10
289 }
290
291 pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
292 1000
293 }
294
295 pub fn memory_controller_threshold_aggressive() -> f64 {
296 0.9
297 }
298
299 pub fn memory_controller_threshold_graceful() -> f64 {
300 0.81
301 }
302
303 pub fn memory_controller_threshold_stable() -> f64 {
304 0.72
305 }
306
307 pub fn memory_controller_eviction_factor_aggressive() -> f64 {
308 2.0
309 }
310
311 pub fn memory_controller_eviction_factor_graceful() -> f64 {
312 1.5
313 }
314
315 pub fn memory_controller_eviction_factor_stable() -> f64 {
316 1.0
317 }
318
319 pub fn memory_controller_update_interval_ms() -> usize {
320 100
321 }
322
323 pub fn memory_controller_sequence_tls_step() -> u64 {
324 128
325 }
326
327 pub fn memory_controller_sequence_tls_lag() -> u64 {
328 32
329 }
330
331 pub fn stream_enable_arrangement_backfill() -> bool {
332 true
333 }
334
335 pub fn enable_shared_source() -> bool {
336 true
337 }
338
339 pub fn stream_high_join_amplification_threshold() -> usize {
340 2048
341 }
342
343 pub fn stream_exchange_connection_pool_size() -> Option<u16> {
345 Some(1)
346 }
347
348 pub fn enable_actor_tokio_metrics() -> bool {
349 true
350 }
351
352 pub fn stream_enable_auto_schema_change() -> bool {
353 true
354 }
355
356 pub fn switch_jdbc_pg_to_native() -> bool {
357 false
358 }
359
360 pub fn streaming_hash_join_entry_state_max_rows() -> usize {
361 30000
363 }
364
365 pub fn streaming_now_progress_ratio() -> Option<f32> {
366 None
367 }
368
369 pub fn enable_explain_analyze_stats() -> bool {
370 true
371 }
372
373 pub fn rpc_client_connect_timeout_secs() -> u64 {
374 5
375 }
376
377 pub fn iceberg_list_interval_sec() -> u64 {
378 10
379 }
380
381 pub fn iceberg_fetch_batch_size() -> u64 {
382 1024
383 }
384
385 pub fn iceberg_sink_positional_delete_cache_size() -> usize {
386 1024
387 }
388
389 pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
390 100_000
391 }
392
393 pub fn refresh_scheduler_interval_sec() -> u64 {
394 60
395 }
396
397 pub fn sync_log_store_pause_duration_ms() -> usize {
398 64
399 }
400
401 pub fn sync_log_store_buffer_size() -> usize {
402 2048
403 }
404 }
405}
406
407pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
408pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
409pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; #[cfg(test)]
412pub mod tests {
413 use expect_test::expect;
414 use risingwave_license::LicenseKey;
415
416 use super::*;
417
418 fn default_config_for_docs() -> RwConfig {
419 let mut config = RwConfig::default();
420 config.system.license_key = Some(LicenseKey::empty());
422 config
423 }
424
425 #[test]
429 fn test_example_up_to_date() {
430 const HEADER: &str = "# This file is generated by ./risedev generate-example-config
431# Check detailed comments in src/common/src/config.rs";
432
433 let actual = expect_test::expect_file!["../../../config/example.toml"];
434 let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
435
436 let expected = format!("{HEADER}\n\n{default}");
437 actual.assert_eq(&expected);
438
439 let expected = rw_config_to_markdown();
440 let actual = expect_test::expect_file!["../../../config/docs.md"];
441 actual.assert_eq(&expected);
442 }
443
444 #[derive(Debug)]
445 struct ConfigItemDoc {
446 desc: String,
447 default: String,
448 }
449
450 fn rw_config_to_markdown() -> String {
451 let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
452 RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
453
454 let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
456 .into_iter()
457 .map(|(k, v)| {
458 let docs: BTreeMap<String, ConfigItemDoc> = v
459 .into_iter()
460 .map(|(name, desc)| {
461 (
462 name,
463 ConfigItemDoc {
464 desc,
465 default: "".to_owned(), },
467 )
468 })
469 .collect();
470 (k, docs)
471 })
472 .collect();
473
474 let toml_doc: BTreeMap<String, toml::Value> =
475 toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
476 toml_doc.into_iter().for_each(|(name, value)| {
477 set_default_values("".to_owned(), name, value, &mut configs);
478 });
479
480 let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
481 + "This page is automatically generated by `./risedev generate-example-config`\n";
482 for (section, configs) in configs {
483 if configs.is_empty() {
484 continue;
485 }
486 markdown.push_str(&format!("\n## {}\n\n", section));
487 markdown.push_str("| Config | Description | Default |\n");
488 markdown.push_str("|--------|-------------|---------|\n");
489 for (config, doc) in configs {
490 markdown.push_str(&format!(
491 "| {} | {} | {} |\n",
492 config, doc.desc, doc.default
493 ));
494 }
495 }
496 markdown
497 }
498
499 fn set_default_values(
500 section: String,
501 name: String,
502 value: toml::Value,
503 configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
504 ) {
505 if let toml::Value::Table(table) = value {
507 let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
508 let sub_section = if section.is_empty() {
509 name
510 } else {
511 format!("{}.{}", section, name)
512 };
513 section_configs
514 .into_iter()
515 .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
516 } else if let Some(t) = configs.get_mut(§ion)
517 && let Some(item_doc) = t.get_mut(&name)
518 {
519 item_doc.default = format!("{}", value);
520 }
521 }
522
523 #[test]
524 fn test_object_store_configs_backward_compatibility() {
525 {
527 let config: RwConfig = toml::from_str(
528 r#"
529 [storage.object_store]
530 object_store_set_atomic_write_dir = true
531
532 [storage.object_store.s3]
533 object_store_keepalive_ms = 1
534 object_store_send_buffer_size = 1
535 object_store_recv_buffer_size = 1
536 object_store_nodelay = false
537
538 [storage.object_store.s3.developer]
539 object_store_retry_unknown_service_error = true
540 object_store_retryable_service_error_codes = ['dummy']
541
542
543 "#,
544 )
545 .unwrap();
546
547 assert!(config.storage.object_store.set_atomic_write_dir);
548 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
549 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
550 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
551 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
552 assert!(
553 config
554 .storage
555 .object_store
556 .s3
557 .developer
558 .retry_unknown_service_error
559 );
560 assert_eq!(
561 config
562 .storage
563 .object_store
564 .s3
565 .developer
566 .retryable_service_error_codes,
567 vec!["dummy".to_owned()]
568 );
569 }
570
571 {
573 let config: RwConfig = toml::from_str(
574 r#"
575 [storage.object_store]
576 set_atomic_write_dir = true
577
578 [storage.object_store.s3]
579 keepalive_ms = 1
580 send_buffer_size = 1
581 recv_buffer_size = 1
582 nodelay = false
583
584 [storage.object_store.s3.developer]
585 retry_unknown_service_error = true
586 retryable_service_error_codes = ['dummy']
587
588
589 "#,
590 )
591 .unwrap();
592
593 assert!(config.storage.object_store.set_atomic_write_dir);
594 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
595 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
596 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
597 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
598 assert!(
599 config
600 .storage
601 .object_store
602 .s3
603 .developer
604 .retry_unknown_service_error
605 );
606 assert_eq!(
607 config
608 .storage
609 .object_store
610 .s3
611 .developer
612 .retryable_service_error_codes,
613 vec!["dummy".to_owned()]
614 );
615 }
616 }
617
618 #[test]
619 fn test_meta_configs_backward_compatibility() {
620 {
622 let config: RwConfig = toml::from_str(
623 r#"
624 [meta]
625 periodic_split_compact_group_interval_sec = 1
626 table_write_throughput_threshold = 10
627 min_table_split_write_throughput = 5
628 "#,
629 )
630 .unwrap();
631
632 assert_eq!(
633 config
634 .meta
635 .periodic_scheduling_compaction_group_split_interval_sec,
636 1
637 );
638 assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
639 assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
640 }
641 }
642
643 #[test]
646 fn test_prefix_alias() {
647 let config: RwConfig = toml::from_str(
648 "
649 [streaming.developer]
650 stream_chunk_size = 114514
651
652 [streaming.developer.stream_compute_client_config]
653 connect_timeout_secs = 42
654 ",
655 )
656 .unwrap();
657
658 assert_eq!(config.streaming.developer.chunk_size, 114514);
659 assert_eq!(
660 config
661 .streaming
662 .developer
663 .compute_client_config
664 .connect_timeout_secs,
665 42
666 );
667 }
668
669 #[test]
670 fn test_prefix_alias_duplicate() {
671 let config = toml::from_str::<RwConfig>(
672 "
673 [streaming.developer]
674 stream_chunk_size = 114514
675 chunk_size = 1919810
676 ",
677 )
678 .unwrap_err();
679
680 expect![[r#"
681 TOML parse error at line 2, column 13
682 |
683 2 | [streaming.developer]
684 | ^^^^^^^^^^^^^^^^^^^^^
685 duplicate field `chunk_size`
686 "#]]
687 .assert_eq(&config.to_string());
688
689 let config = toml::from_str::<RwConfig>(
690 "
691 [streaming.developer.stream_compute_client_config]
692 connect_timeout_secs = 5
693
694 [streaming.developer.compute_client_config]
695 connect_timeout_secs = 10
696 ",
697 )
698 .unwrap_err();
699
700 expect![[r#"
701 TOML parse error at line 2, column 24
702 |
703 2 | [streaming.developer.stream_compute_client_config]
704 | ^^^^^^^^^
705 duplicate field `compute_client_config`
706 "#]]
707 .assert_eq(&config.to_string());
708 }
709}