risingwave_common/config/
mod.rs1pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod meta;
25pub use meta::{CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig, MetaStoreConfig};
26pub mod streaming;
27pub use streaming::{AsyncStackTraceOption, StreamingConfig};
28pub mod server;
29pub use server::{HeapProfilingConfig, ServerConfig};
30pub mod udf;
31pub use udf::UdfConfig;
32pub mod storage;
33pub use storage::{
34 CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
35 extract_storage_memory_config,
36};
37pub mod system;
38pub mod utils;
39use std::collections::BTreeMap;
40use std::fs;
41use std::num::NonZeroUsize;
42
43use anyhow::Context;
44use clap::ValueEnum;
45use educe::Educe;
46use risingwave_common_proc_macro::ConfigDoc;
47pub use risingwave_common_proc_macro::OverrideConfig;
48use risingwave_pb::meta::SystemParams;
49use serde::{Deserialize, Serialize, Serializer};
50use serde_default::DefaultFromSerde;
51use serde_json::Value;
52pub use system::SystemConfig;
53pub use utils::*;
54
55use crate::for_all_params;
56
57pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
60pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; #[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
67#[educe(Debug)]
68pub struct RwConfig {
69 #[serde(default)]
70 #[config_doc(nested)]
71 pub server: ServerConfig,
72
73 #[serde(default)]
74 #[config_doc(nested)]
75 pub meta: MetaConfig,
76
77 #[serde(default)]
78 #[config_doc(nested)]
79 pub batch: BatchConfig,
80
81 #[serde(default)]
82 #[config_doc(nested)]
83 pub frontend: FrontendConfig,
84
85 #[serde(default)]
86 #[config_doc(nested)]
87 pub streaming: StreamingConfig,
88
89 #[serde(default)]
90 #[config_doc(nested)]
91 pub storage: StorageConfig,
92
93 #[serde(default)]
94 #[educe(Debug(ignore))]
95 #[config_doc(nested)]
96 pub system: SystemConfig,
97
98 #[serde(default)]
99 #[config_doc(nested)]
100 pub udf: UdfConfig,
101
102 #[serde(flatten)]
103 #[config_doc(omitted)]
104 pub unrecognized: Unrecognized<Self>,
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
114pub struct RpcClientConfig {
115 #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
116 pub connect_timeout_secs: u64,
117}
118
119pub use risingwave_common_metrics::MetricLevel;
120
121impl RwConfig {
122 pub const fn default_connection_pool_size(&self) -> u16 {
123 self.server.connection_pool_size
124 }
125
126 pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
129 self.streaming
130 .developer
131 .exchange_connection_pool_size
132 .unwrap_or_else(|| self.default_connection_pool_size())
133 }
134
135 pub fn batch_exchange_connection_pool_size(&self) -> u16 {
138 self.batch
139 .developer
140 .exchange_connection_pool_size
141 .unwrap_or_else(|| self.default_connection_pool_size())
142 }
143}
144
145pub mod default {
146
147 pub mod developer {
148 pub fn meta_cached_traces_num() -> u32 {
149 256
150 }
151
152 pub fn meta_cached_traces_memory_limit_bytes() -> usize {
153 1 << 27 }
155
156 pub fn batch_output_channel_size() -> usize {
157 64
158 }
159
160 pub fn batch_receiver_channel_size() -> usize {
161 1000
162 }
163
164 pub fn batch_root_stage_channel_size() -> usize {
165 100
166 }
167
168 pub fn batch_chunk_size() -> usize {
169 1024
170 }
171
172 pub fn batch_local_execute_buffer_size() -> usize {
173 64
174 }
175
176 pub fn batch_exchange_connection_pool_size() -> Option<u16> {
179 None
180 }
181
182 pub fn stream_enable_executor_row_count() -> bool {
183 false
184 }
185
186 pub fn connector_message_buffer_size() -> usize {
187 16
188 }
189
190 pub fn unsafe_stream_extreme_cache_size() -> usize {
191 10
192 }
193
194 pub fn stream_topn_cache_min_capacity() -> usize {
195 10
196 }
197
198 pub fn stream_chunk_size() -> usize {
199 256
200 }
201
202 pub fn stream_exchange_initial_permits() -> usize {
203 2048
204 }
205
206 pub fn stream_exchange_batched_permits() -> usize {
207 256
208 }
209
210 pub fn stream_exchange_concurrent_barriers() -> usize {
211 1
212 }
213
214 pub fn stream_exchange_concurrent_dispatchers() -> usize {
215 0
216 }
217
218 pub fn stream_dml_channel_initial_permits() -> usize {
219 32768
220 }
221
222 pub fn stream_max_barrier_batch_size() -> u32 {
223 1024
224 }
225
226 pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
227 64 << 20 }
229
230 pub fn enable_trivial_move() -> bool {
231 true
232 }
233
234 pub fn enable_check_task_level_overlap() -> bool {
235 false
236 }
237
238 pub fn max_trivial_move_task_count_per_loop() -> usize {
239 256
240 }
241
242 pub fn max_get_task_probe_times() -> usize {
243 5
244 }
245
246 pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
247 100
248 }
249
250 pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
251 400
252 }
253
254 pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
255 10_000
256 }
257
258 pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
259 100
260 }
261
262 pub fn time_travel_vacuum_interval_sec() -> u64 {
263 30
264 }
265 pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
266 1000
267 }
268
269 pub fn hummock_gc_history_insert_batch_size() -> usize {
270 1000
271 }
272
273 pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
274 1000
275 }
276
277 pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
278 false
279 }
280
281 pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
282 10
283 }
284
285 pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
286 1000
287 }
288
289 pub fn memory_controller_threshold_aggressive() -> f64 {
290 0.9
291 }
292
293 pub fn memory_controller_threshold_graceful() -> f64 {
294 0.81
295 }
296
297 pub fn memory_controller_threshold_stable() -> f64 {
298 0.72
299 }
300
301 pub fn memory_controller_eviction_factor_aggressive() -> f64 {
302 2.0
303 }
304
305 pub fn memory_controller_eviction_factor_graceful() -> f64 {
306 1.5
307 }
308
309 pub fn memory_controller_eviction_factor_stable() -> f64 {
310 1.0
311 }
312
313 pub fn memory_controller_update_interval_ms() -> usize {
314 100
315 }
316
317 pub fn memory_controller_sequence_tls_step() -> u64 {
318 128
319 }
320
321 pub fn memory_controller_sequence_tls_lag() -> u64 {
322 32
323 }
324
325 pub fn stream_enable_arrangement_backfill() -> bool {
326 true
327 }
328
329 pub fn enable_shared_source() -> bool {
330 true
331 }
332
333 pub fn stream_high_join_amplification_threshold() -> usize {
334 2048
335 }
336
337 pub fn stream_exchange_connection_pool_size() -> Option<u16> {
339 Some(1)
340 }
341
342 pub fn enable_actor_tokio_metrics() -> bool {
343 false
344 }
345
346 pub fn stream_enable_auto_schema_change() -> bool {
347 true
348 }
349
350 pub fn switch_jdbc_pg_to_native() -> bool {
351 false
352 }
353
354 pub fn streaming_hash_join_entry_state_max_rows() -> usize {
355 30000
357 }
358
359 pub fn streaming_now_progress_ratio() -> Option<f32> {
360 None
361 }
362
363 pub fn enable_explain_analyze_stats() -> bool {
364 true
365 }
366
367 pub fn rpc_client_connect_timeout_secs() -> u64 {
368 5
369 }
370
371 pub fn iceberg_list_interval_sec() -> u64 {
372 1
373 }
374
375 pub fn iceberg_fetch_batch_size() -> u64 {
376 1024
377 }
378
379 pub fn iceberg_sink_positional_delete_cache_size() -> usize {
380 1024
381 }
382
383 pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
384 100_000
385 }
386 }
387}
388
389pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
390pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
391pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; #[cfg(test)]
394pub mod tests {
395 use risingwave_license::LicenseKey;
396
397 use super::*;
398
399 fn default_config_for_docs() -> RwConfig {
400 let mut config = RwConfig::default();
401 config.system.license_key = Some(LicenseKey::empty());
403 config
404 }
405
406 #[test]
410 fn test_example_up_to_date() {
411 const HEADER: &str = "# This file is generated by ./risedev generate-example-config
412# Check detailed comments in src/common/src/config.rs";
413
414 let actual = expect_test::expect_file!["../../../config/example.toml"];
415 let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
416
417 let expected = format!("{HEADER}\n\n{default}");
418 actual.assert_eq(&expected);
419
420 let expected = rw_config_to_markdown();
421 let actual = expect_test::expect_file!["../../../config/docs.md"];
422 actual.assert_eq(&expected);
423 }
424
425 #[derive(Debug)]
426 struct ConfigItemDoc {
427 desc: String,
428 default: String,
429 }
430
431 fn rw_config_to_markdown() -> String {
432 let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
433 RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
434
435 let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
437 .into_iter()
438 .map(|(k, v)| {
439 let docs: BTreeMap<String, ConfigItemDoc> = v
440 .into_iter()
441 .map(|(name, desc)| {
442 (
443 name,
444 ConfigItemDoc {
445 desc,
446 default: "".to_owned(), },
448 )
449 })
450 .collect();
451 (k, docs)
452 })
453 .collect();
454
455 let toml_doc: BTreeMap<String, toml::Value> =
456 toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
457 toml_doc.into_iter().for_each(|(name, value)| {
458 set_default_values("".to_owned(), name, value, &mut configs);
459 });
460
461 let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
462 + "This page is automatically generated by `./risedev generate-example-config`\n";
463 for (section, configs) in configs {
464 if configs.is_empty() {
465 continue;
466 }
467 markdown.push_str(&format!("\n## {}\n\n", section));
468 markdown.push_str("| Config | Description | Default |\n");
469 markdown.push_str("|--------|-------------|---------|\n");
470 for (config, doc) in configs {
471 markdown.push_str(&format!(
472 "| {} | {} | {} |\n",
473 config, doc.desc, doc.default
474 ));
475 }
476 }
477 markdown
478 }
479
480 fn set_default_values(
481 section: String,
482 name: String,
483 value: toml::Value,
484 configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
485 ) {
486 if let toml::Value::Table(table) = value {
488 let section_configs: BTreeMap<String, toml::Value> =
489 table.clone().into_iter().collect();
490 let sub_section = if section.is_empty() {
491 name
492 } else {
493 format!("{}.{}", section, name)
494 };
495 section_configs
496 .into_iter()
497 .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
498 } else if let Some(t) = configs.get_mut(§ion)
499 && let Some(item_doc) = t.get_mut(&name)
500 {
501 item_doc.default = format!("{}", value);
502 }
503 }
504
505 #[test]
506 fn test_object_store_configs_backward_compatibility() {
507 {
509 let config: RwConfig = toml::from_str(
510 r#"
511 [storage.object_store]
512 object_store_set_atomic_write_dir = true
513
514 [storage.object_store.s3]
515 object_store_keepalive_ms = 1
516 object_store_send_buffer_size = 1
517 object_store_recv_buffer_size = 1
518 object_store_nodelay = false
519
520 [storage.object_store.s3.developer]
521 object_store_retry_unknown_service_error = true
522 object_store_retryable_service_error_codes = ['dummy']
523
524
525 "#,
526 )
527 .unwrap();
528
529 assert!(config.storage.object_store.set_atomic_write_dir);
530 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
531 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
532 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
533 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
534 assert!(
535 config
536 .storage
537 .object_store
538 .s3
539 .developer
540 .retry_unknown_service_error
541 );
542 assert_eq!(
543 config
544 .storage
545 .object_store
546 .s3
547 .developer
548 .retryable_service_error_codes,
549 vec!["dummy".to_owned()]
550 );
551 }
552
553 {
555 let config: RwConfig = toml::from_str(
556 r#"
557 [storage.object_store]
558 set_atomic_write_dir = true
559
560 [storage.object_store.s3]
561 keepalive_ms = 1
562 send_buffer_size = 1
563 recv_buffer_size = 1
564 nodelay = false
565
566 [storage.object_store.s3.developer]
567 retry_unknown_service_error = true
568 retryable_service_error_codes = ['dummy']
569
570
571 "#,
572 )
573 .unwrap();
574
575 assert!(config.storage.object_store.set_atomic_write_dir);
576 assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
577 assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
578 assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
579 assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
580 assert!(
581 config
582 .storage
583 .object_store
584 .s3
585 .developer
586 .retry_unknown_service_error
587 );
588 assert_eq!(
589 config
590 .storage
591 .object_store
592 .s3
593 .developer
594 .retryable_service_error_codes,
595 vec!["dummy".to_owned()]
596 );
597 }
598 }
599
600 #[test]
601 fn test_meta_configs_backward_compatibility() {
602 {
604 let config: RwConfig = toml::from_str(
605 r#"
606 [meta]
607 periodic_split_compact_group_interval_sec = 1
608 table_write_throughput_threshold = 10
609 min_table_split_write_throughput = 5
610 "#,
611 )
612 .unwrap();
613
614 assert_eq!(
615 config
616 .meta
617 .periodic_scheduling_compaction_group_split_interval_sec,
618 1
619 );
620 assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
621 assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
622 }
623 }
624}