risingwave_common/config/
mod.rs1pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig, MetaStoreConfig};
28pub mod streaming;
29pub use streaming::{AsyncStackTraceOption, StreamingConfig};
30pub mod server;
31pub use server::{HeapProfilingConfig, ServerConfig};
32pub mod udf;
33pub use udf::UdfConfig;
34pub mod storage;
35pub use storage::{
36 CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
37 extract_storage_memory_config,
38};
39pub mod system;
40pub mod utils;
41use std::collections::BTreeMap;
42use std::fs;
43use std::num::NonZeroUsize;
44
45use anyhow::Context;
46use clap::ValueEnum;
47use educe::Educe;
48use risingwave_common_proc_macro::ConfigDoc;
49pub use risingwave_common_proc_macro::OverrideConfig;
50use risingwave_pb::meta::SystemParams;
51use serde::{Deserialize, Serialize, Serializer};
52use serde_default::DefaultFromSerde;
53use serde_json::Value;
54pub use system::SystemConfig;
55pub use utils::*;
56
57use crate::for_all_params;
58
59pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
62pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; #[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
69#[educe(Debug)]
70pub struct RwConfig {
71 #[serde(default)]
72 #[config_doc(nested)]
73 pub server: ServerConfig,
74
75 #[serde(default)]
76 #[config_doc(nested)]
77 pub meta: MetaConfig,
78
79 #[serde(default)]
80 #[config_doc(nested)]
81 pub batch: BatchConfig,
82
83 #[serde(default)]
84 #[config_doc(nested)]
85 pub frontend: FrontendConfig,
86
87 #[serde(default)]
88 #[config_doc(nested)]
89 pub streaming: StreamingConfig,
90
91 #[serde(default)]
92 #[config_doc(nested)]
93 pub storage: StorageConfig,
94
95 #[serde(default)]
96 #[educe(Debug(ignore))]
97 #[config_doc(nested)]
98 pub system: SystemConfig,
99
100 #[serde(default)]
101 #[config_doc(nested)]
102 pub udf: UdfConfig,
103
104 #[serde(flatten)]
105 #[config_doc(omitted)]
106 pub unrecognized: Unrecognized<Self>,
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
116pub struct RpcClientConfig {
117 #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
118 pub connect_timeout_secs: u64,
119}
120
121pub use risingwave_common_metrics::MetricLevel;
122
123impl RwConfig {
124 pub const fn default_connection_pool_size(&self) -> u16 {
125 self.server.connection_pool_size
126 }
127
128 pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
131 self.streaming
132 .developer
133 .exchange_connection_pool_size
134 .unwrap_or_else(|| self.default_connection_pool_size())
135 }
136
137 pub fn batch_exchange_connection_pool_size(&self) -> u16 {
140 self.batch
141 .developer
142 .exchange_connection_pool_size
143 .unwrap_or_else(|| self.default_connection_pool_size())
144 }
145}
146
147pub mod default {
148
149 pub mod developer {
150 pub fn meta_cached_traces_num() -> u32 {
151 256
152 }
153
154 pub fn meta_cached_traces_memory_limit_bytes() -> usize {
155 1 << 27 }
157
158 pub fn batch_output_channel_size() -> usize {
159 64
160 }
161
162 pub fn batch_receiver_channel_size() -> usize {
163 1000
164 }
165
166 pub fn batch_root_stage_channel_size() -> usize {
167 100
168 }
169
170 pub fn batch_chunk_size() -> usize {
171 1024
172 }
173
174 pub fn batch_local_execute_buffer_size() -> usize {
175 64
176 }
177
178 pub fn batch_exchange_connection_pool_size() -> Option<u16> {
181 None
182 }
183
184 pub fn stream_enable_executor_row_count() -> bool {
185 false
186 }
187
188 pub fn connector_message_buffer_size() -> usize {
189 16
190 }
191
192 pub fn unsafe_stream_extreme_cache_size() -> usize {
193 10
194 }
195
196 pub fn stream_topn_cache_min_capacity() -> usize {
197 10
198 }
199
200 pub fn stream_chunk_size() -> usize {
201 256
202 }
203
204 pub fn stream_exchange_initial_permits() -> usize {
205 2048
206 }
207
208 pub fn stream_exchange_batched_permits() -> usize {
209 256
210 }
211
212 pub fn stream_exchange_concurrent_barriers() -> usize {
213 1
214 }
215
216 pub fn stream_exchange_concurrent_dispatchers() -> usize {
217 0
218 }
219
220 pub fn stream_dml_channel_initial_permits() -> usize {
221 32768
222 }
223
224 pub fn stream_max_barrier_batch_size() -> u32 {
225 1024
226 }
227
228 pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
229 64 << 20 }
231
232 pub fn enable_trivial_move() -> bool {
233 true
234 }
235
236 pub fn enable_check_task_level_overlap() -> bool {
237 false
238 }
239
240 pub fn max_trivial_move_task_count_per_loop() -> usize {
241 256
242 }
243
244 pub fn max_get_task_probe_times() -> usize {
245 5
246 }
247
248 pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
249 100
250 }
251
252 pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
253 400
254 }
255
256 pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
257 10_000
258 }
259
260 pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
261 100
262 }
263
264 pub fn time_travel_vacuum_interval_sec() -> u64 {
265 30
266 }
267 pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
268 1000
269 }
270
271 pub fn hummock_gc_history_insert_batch_size() -> usize {
272 1000
273 }
274
275 pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
276 1000
277 }
278
279 pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
280 false
281 }
282
283 pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
284 10
285 }
286
287 pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
288 1000
289 }
290
291 pub fn memory_controller_threshold_aggressive() -> f64 {
292 0.9
293 }
294
295 pub fn memory_controller_threshold_graceful() -> f64 {
296 0.81
297 }
298
299 pub fn memory_controller_threshold_stable() -> f64 {
300 0.72
301 }
302
303 pub fn memory_controller_eviction_factor_aggressive() -> f64 {
304 2.0
305 }
306
307 pub fn memory_controller_eviction_factor_graceful() -> f64 {
308 1.5
309 }
310
311 pub fn memory_controller_eviction_factor_stable() -> f64 {
312 1.0
313 }
314
315 pub fn memory_controller_update_interval_ms() -> usize {
316 100
317 }
318
319 pub fn memory_controller_sequence_tls_step() -> u64 {
320 128
321 }
322
323 pub fn memory_controller_sequence_tls_lag() -> u64 {
324 32
325 }
326
327 pub fn stream_enable_arrangement_backfill() -> bool {
328 true
329 }
330
331 pub fn enable_shared_source() -> bool {
332 true
333 }
334
335 pub fn stream_high_join_amplification_threshold() -> usize {
336 2048
337 }
338
339 pub fn stream_exchange_connection_pool_size() -> Option<u16> {
341 Some(1)
342 }
343
344 pub fn enable_actor_tokio_metrics() -> bool {
345 false
346 }
347
348 pub fn stream_enable_auto_schema_change() -> bool {
349 true
350 }
351
352 pub fn switch_jdbc_pg_to_native() -> bool {
353 false
354 }
355
356 pub fn streaming_hash_join_entry_state_max_rows() -> usize {
357 30000
359 }
360
361 pub fn streaming_now_progress_ratio() -> Option<f32> {
362 None
363 }
364
365 pub fn enable_explain_analyze_stats() -> bool {
366 true
367 }
368
369 pub fn rpc_client_connect_timeout_secs() -> u64 {
370 5
371 }
372
373 pub fn iceberg_list_interval_sec() -> u64 {
374 1
375 }
376
377 pub fn iceberg_fetch_batch_size() -> u64 {
378 1024
379 }
380
381 pub fn iceberg_sink_positional_delete_cache_size() -> usize {
382 1024
383 }
384
385 pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
386 100_000
387 }
388 }
389}
390
391pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
392pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
393pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; #[cfg(test)]
396pub mod tests {
397 use risingwave_license::LicenseKey;
398
399 use super::*;
400
401 fn default_config_for_docs() -> RwConfig {
402 let mut config = RwConfig::default();
403 config.system.license_key = Some(LicenseKey::empty());
405 config
406 }
407
408 #[test]
412 fn test_example_up_to_date() {
413 const HEADER: &str = "# This file is generated by ./risedev generate-example-config
414# Check detailed comments in src/common/src/config.rs";
415
416 let actual = expect_test::expect_file!["../../../config/example.toml"];
417 let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
418
419 let expected = format!("{HEADER}\n\n{default}");
420 actual.assert_eq(&expected);
421
422 let expected = rw_config_to_markdown();
423 let actual = expect_test::expect_file!["../../../config/docs.md"];
424 actual.assert_eq(&expected);
425 }
426
427 #[derive(Debug)]
428 struct ConfigItemDoc {
429 desc: String,
430 default: String,
431 }
432
433 fn rw_config_to_markdown() -> String {
434 let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
435 RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
436
437 let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
439 .into_iter()
440 .map(|(k, v)| {
441 let docs: BTreeMap<String, ConfigItemDoc> = v
442 .into_iter()
443 .map(|(name, desc)| {
444 (
445 name,
446 ConfigItemDoc {
447 desc,
448 default: "".to_owned(), },
450 )
451 })
452 .collect();
453 (k, docs)
454 })
455 .collect();
456
457 let toml_doc: BTreeMap<String, toml::Value> =
458 toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
459 toml_doc.into_iter().for_each(|(name, value)| {
460 set_default_values("".to_owned(), name, value, &mut configs);
461 });
462
463 let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
464 + "This page is automatically generated by `./risedev generate-example-config`\n";
465 for (section, configs) in configs {
466 if configs.is_empty() {
467 continue;
468 }
469 markdown.push_str(&format!("\n## {}\n\n", section));
470 markdown.push_str("| Config | Description | Default |\n");
471 markdown.push_str("|--------|-------------|---------|\n");
472 for (config, doc) in configs {
473 markdown.push_str(&format!(
474 "| {} | {} | {} |\n",
475 config, doc.desc, doc.default
476 ));
477 }
478 }
479 markdown
480 }
481
482 fn set_default_values(
483 section: String,
484 name: String,
485 value: toml::Value,
486 configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
487 ) {
488 if let toml::Value::Table(table) = value {
490 let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
491 let sub_section = if section.is_empty() {
492 name
493 } else {
494 format!("{}.{}", section, name)
495 };
496 section_configs
497 .into_iter()
498 .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
499 } else if let Some(t) = configs.get_mut(§ion)
500 && let Some(item_doc) = t.get_mut(&name)
501 {
502 item_doc.default = format!("{}", value);
503 }
504 }
505
506 #[test]
507 fn test_object_store_configs_backward_compatibility() {
508 {
510 let config: RwConfig = toml::from_str(
511 r#"
512 [storage.object_store]
513 object_store_set_atomic_write_dir = true
514
515 [storage.object_store.s3]
516 object_store_keepalive_ms = 1
517 object_store_send_buffer_size = 1
518 object_store_recv_buffer_size = 1
519 object_store_nodelay = false
520
521 [storage.object_store.s3.developer]
522 object_store_retry_unknown_service_error = true
523 object_store_retryable_service_error_codes = ['dummy']
524
525
526 "#,
527 )
528 .unwrap();
529
530 assert!(config.storage.object_store.set_atomic_write_dir);
531 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
532 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
533 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
534 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
535 assert!(
536 config
537 .storage
538 .object_store
539 .s3
540 .developer
541 .retry_unknown_service_error
542 );
543 assert_eq!(
544 config
545 .storage
546 .object_store
547 .s3
548 .developer
549 .retryable_service_error_codes,
550 vec!["dummy".to_owned()]
551 );
552 }
553
554 {
556 let config: RwConfig = toml::from_str(
557 r#"
558 [storage.object_store]
559 set_atomic_write_dir = true
560
561 [storage.object_store.s3]
562 keepalive_ms = 1
563 send_buffer_size = 1
564 recv_buffer_size = 1
565 nodelay = false
566
567 [storage.object_store.s3.developer]
568 retry_unknown_service_error = true
569 retryable_service_error_codes = ['dummy']
570
571
572 "#,
573 )
574 .unwrap();
575
576 assert!(config.storage.object_store.set_atomic_write_dir);
577 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
578 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
579 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
580 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
581 assert!(
582 config
583 .storage
584 .object_store
585 .s3
586 .developer
587 .retry_unknown_service_error
588 );
589 assert_eq!(
590 config
591 .storage
592 .object_store
593 .s3
594 .developer
595 .retryable_service_error_codes,
596 vec!["dummy".to_owned()]
597 );
598 }
599 }
600
601 #[test]
602 fn test_meta_configs_backward_compatibility() {
603 {
605 let config: RwConfig = toml::from_str(
606 r#"
607 [meta]
608 periodic_split_compact_group_interval_sec = 1
609 table_write_throughput_threshold = 10
610 min_table_split_write_throughput = 5
611 "#,
612 )
613 .unwrap();
614
615 assert_eq!(
616 config
617 .meta
618 .periodic_scheduling_compaction_group_split_interval_sec,
619 1
620 );
621 assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
622 assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
623 }
624 }
625}