risingwave_common/config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This pub module defines the structure of the configuration file `risingwave.toml`.
16//!
17//! [`RwConfig`] corresponds to the whole config file and each other config struct corresponds to a
18//! section in `risingwave.toml`.
19
20pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig, MetaStoreConfig};
28pub mod streaming;
29pub use streaming::{AsyncStackTraceOption, StreamingConfig};
30pub mod server;
31pub use server::{HeapProfilingConfig, ServerConfig};
32pub mod udf;
33pub use udf::UdfConfig;
34pub mod storage;
35pub use storage::{
36    CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
37    extract_storage_memory_config,
38};
39pub mod system;
40pub mod utils;
41use std::collections::BTreeMap;
42use std::fs;
43use std::num::NonZeroUsize;
44
45use anyhow::Context;
46use clap::ValueEnum;
47use educe::Educe;
48use risingwave_common_proc_macro::ConfigDoc;
49pub use risingwave_common_proc_macro::OverrideConfig;
50use risingwave_pb::meta::SystemParams;
51use serde::{Deserialize, Serialize, Serializer};
52use serde_default::DefaultFromSerde;
53use serde_json::Value;
54pub use system::SystemConfig;
55pub use utils::*;
56
57use crate::for_all_params;
58
59/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
60/// streams on the same connection.
61pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
62/// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange,
63/// as we don't rely on this for back-pressure.
64pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB
65
66/// [`RwConfig`] corresponds to the whole config file `risingwave.toml`. Each field corresponds to a
67/// section.
68#[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
69#[educe(Debug)]
70pub struct RwConfig {
71    #[serde(default)]
72    #[config_doc(nested)]
73    pub server: ServerConfig,
74
75    #[serde(default)]
76    #[config_doc(nested)]
77    pub meta: MetaConfig,
78
79    #[serde(default)]
80    #[config_doc(nested)]
81    pub batch: BatchConfig,
82
83    #[serde(default)]
84    #[config_doc(nested)]
85    pub frontend: FrontendConfig,
86
87    #[serde(default)]
88    #[config_doc(nested)]
89    pub streaming: StreamingConfig,
90
91    #[serde(default)]
92    #[config_doc(nested)]
93    pub storage: StorageConfig,
94
95    #[serde(default)]
96    #[educe(Debug(ignore))]
97    #[config_doc(nested)]
98    pub system: SystemConfig,
99
100    #[serde(default)]
101    #[config_doc(nested)]
102    pub udf: UdfConfig,
103
104    #[serde(flatten)]
105    #[config_doc(omitted)]
106    pub unrecognized: Unrecognized<Self>,
107}
108
109/// `[meta.developer.meta_compute_client_config]`
110/// `[meta.developer.meta_stream_client_config]`
111/// `[meta.developer.meta_frontend_client_config]`
112/// `[batch.developer.batch_compute_client_config]`
113/// `[batch.developer.batch_frontend_client_config]`
114/// `[streaming.developer.stream_compute_client_config]`
115#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
116pub struct RpcClientConfig {
117    #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
118    pub connect_timeout_secs: u64,
119}
120
121pub use risingwave_common_metrics::MetricLevel;
122
123impl RwConfig {
124    pub const fn default_connection_pool_size(&self) -> u16 {
125        self.server.connection_pool_size
126    }
127
128    /// Returns [`streaming::StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
129    /// otherwise [`ServerConfig::connection_pool_size`].
130    pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
131        self.streaming
132            .developer
133            .exchange_connection_pool_size
134            .unwrap_or_else(|| self.default_connection_pool_size())
135    }
136
137    /// Returns [`batch::BatchDeveloperConfig::exchange_connection_pool_size`] if set,
138    /// otherwise [`ServerConfig::connection_pool_size`].
139    pub fn batch_exchange_connection_pool_size(&self) -> u16 {
140        self.batch
141            .developer
142            .exchange_connection_pool_size
143            .unwrap_or_else(|| self.default_connection_pool_size())
144    }
145}
146
147pub mod default {
148
149    pub mod developer {
150        pub fn meta_cached_traces_num() -> u32 {
151            256
152        }
153
154        pub fn meta_cached_traces_memory_limit_bytes() -> usize {
155            1 << 27 // 128 MiB
156        }
157
158        pub fn batch_output_channel_size() -> usize {
159            64
160        }
161
162        pub fn batch_receiver_channel_size() -> usize {
163            1000
164        }
165
166        pub fn batch_root_stage_channel_size() -> usize {
167            100
168        }
169
170        pub fn batch_chunk_size() -> usize {
171            1024
172        }
173
174        pub fn batch_local_execute_buffer_size() -> usize {
175            64
176        }
177
178        /// Default to unset to be compatible with the behavior before this config is introduced,
179        /// that is, follow the value of `server.connection_pool_size`.
180        pub fn batch_exchange_connection_pool_size() -> Option<u16> {
181            None
182        }
183
184        pub fn stream_enable_executor_row_count() -> bool {
185            false
186        }
187
188        pub fn connector_message_buffer_size() -> usize {
189            16
190        }
191
192        pub fn unsafe_stream_extreme_cache_size() -> usize {
193            10
194        }
195
196        pub fn stream_topn_cache_min_capacity() -> usize {
197            10
198        }
199
200        pub fn stream_chunk_size() -> usize {
201            256
202        }
203
204        pub fn stream_exchange_initial_permits() -> usize {
205            2048
206        }
207
208        pub fn stream_exchange_batched_permits() -> usize {
209            256
210        }
211
212        pub fn stream_exchange_concurrent_barriers() -> usize {
213            1
214        }
215
216        pub fn stream_exchange_concurrent_dispatchers() -> usize {
217            0
218        }
219
220        pub fn stream_dml_channel_initial_permits() -> usize {
221            32768
222        }
223
224        pub fn stream_max_barrier_batch_size() -> u32 {
225            1024
226        }
227
228        pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
229            64 << 20 // 64MB
230        }
231
232        pub fn enable_trivial_move() -> bool {
233            true
234        }
235
236        pub fn enable_check_task_level_overlap() -> bool {
237            false
238        }
239
240        pub fn max_trivial_move_task_count_per_loop() -> usize {
241            256
242        }
243
244        pub fn max_get_task_probe_times() -> usize {
245            5
246        }
247
248        pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
249            100
250        }
251
252        pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
253            400
254        }
255
256        pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
257            10_000
258        }
259
260        pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
261            100
262        }
263
264        pub fn time_travel_vacuum_interval_sec() -> u64 {
265            30
266        }
267        pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
268            1000
269        }
270
271        pub fn hummock_gc_history_insert_batch_size() -> usize {
272            1000
273        }
274
275        pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
276            1000
277        }
278
279        pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
280            false
281        }
282
283        pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
284            10
285        }
286
287        pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
288            1000
289        }
290
291        pub fn memory_controller_threshold_aggressive() -> f64 {
292            0.9
293        }
294
295        pub fn memory_controller_threshold_graceful() -> f64 {
296            0.81
297        }
298
299        pub fn memory_controller_threshold_stable() -> f64 {
300            0.72
301        }
302
303        pub fn memory_controller_eviction_factor_aggressive() -> f64 {
304            2.0
305        }
306
307        pub fn memory_controller_eviction_factor_graceful() -> f64 {
308            1.5
309        }
310
311        pub fn memory_controller_eviction_factor_stable() -> f64 {
312            1.0
313        }
314
315        pub fn memory_controller_update_interval_ms() -> usize {
316            100
317        }
318
319        pub fn memory_controller_sequence_tls_step() -> u64 {
320            128
321        }
322
323        pub fn memory_controller_sequence_tls_lag() -> u64 {
324            32
325        }
326
327        pub fn stream_enable_arrangement_backfill() -> bool {
328            true
329        }
330
331        pub fn enable_shared_source() -> bool {
332            true
333        }
334
335        pub fn stream_high_join_amplification_threshold() -> usize {
336            2048
337        }
338
339        /// Default to 1 to be compatible with the behavior before this config is introduced.
340        pub fn stream_exchange_connection_pool_size() -> Option<u16> {
341            Some(1)
342        }
343
344        pub fn enable_actor_tokio_metrics() -> bool {
345            false
346        }
347
348        pub fn stream_enable_auto_schema_change() -> bool {
349            true
350        }
351
352        pub fn switch_jdbc_pg_to_native() -> bool {
353            false
354        }
355
356        pub fn streaming_hash_join_entry_state_max_rows() -> usize {
357            // NOTE(kwannoel): This is just an arbitrary number.
358            30000
359        }
360
361        pub fn streaming_now_progress_ratio() -> Option<f32> {
362            None
363        }
364
365        pub fn enable_explain_analyze_stats() -> bool {
366            true
367        }
368
369        pub fn rpc_client_connect_timeout_secs() -> u64 {
370            5
371        }
372
373        pub fn iceberg_list_interval_sec() -> u64 {
374            1
375        }
376
377        pub fn iceberg_fetch_batch_size() -> u64 {
378            1024
379        }
380
381        pub fn iceberg_sink_positional_delete_cache_size() -> usize {
382            1024
383        }
384
385        pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
386            100_000
387        }
388    }
389}
390
391pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
392pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
393pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.
394
395#[cfg(test)]
396pub mod tests {
397    use risingwave_license::LicenseKey;
398
399    use super::*;
400
401    fn default_config_for_docs() -> RwConfig {
402        let mut config = RwConfig::default();
403        // Set `license_key` to empty in the docs to avoid any confusion.
404        config.system.license_key = Some(LicenseKey::empty());
405        config
406    }
407
408    /// This test ensures that `config/example.toml` is up-to-date with the default values specified
409    /// in this file. Developer should run `./risedev generate-example-config` to update it if this
410    /// test fails.
411    #[test]
412    fn test_example_up_to_date() {
413        const HEADER: &str = "# This file is generated by ./risedev generate-example-config
414# Check detailed comments in src/common/src/config.rs";
415
416        let actual = expect_test::expect_file!["../../../config/example.toml"];
417        let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
418
419        let expected = format!("{HEADER}\n\n{default}");
420        actual.assert_eq(&expected);
421
422        let expected = rw_config_to_markdown();
423        let actual = expect_test::expect_file!["../../../config/docs.md"];
424        actual.assert_eq(&expected);
425    }
426
427    #[derive(Debug)]
428    struct ConfigItemDoc {
429        desc: String,
430        default: String,
431    }
432
433    fn rw_config_to_markdown() -> String {
434        let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
435        RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
436
437        // Section -> Config Name -> ConfigItemDoc
438        let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
439            .into_iter()
440            .map(|(k, v)| {
441                let docs: BTreeMap<String, ConfigItemDoc> = v
442                    .into_iter()
443                    .map(|(name, desc)| {
444                        (
445                            name,
446                            ConfigItemDoc {
447                                desc,
448                                default: "".to_owned(), // unset
449                            },
450                        )
451                    })
452                    .collect();
453                (k, docs)
454            })
455            .collect();
456
457        let toml_doc: BTreeMap<String, toml::Value> =
458            toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
459        toml_doc.into_iter().for_each(|(name, value)| {
460            set_default_values("".to_owned(), name, value, &mut configs);
461        });
462
463        let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
464            + "This page is automatically generated by `./risedev generate-example-config`\n";
465        for (section, configs) in configs {
466            if configs.is_empty() {
467                continue;
468            }
469            markdown.push_str(&format!("\n## {}\n\n", section));
470            markdown.push_str("| Config | Description | Default |\n");
471            markdown.push_str("|--------|-------------|---------|\n");
472            for (config, doc) in configs {
473                markdown.push_str(&format!(
474                    "| {} | {} | {} |\n",
475                    config, doc.desc, doc.default
476                ));
477            }
478        }
479        markdown
480    }
481
482    fn set_default_values(
483        section: String,
484        name: String,
485        value: toml::Value,
486        configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
487    ) {
488        // Set the default value if it's a config name-value pair, otherwise it's a sub-section (Table) that should be recursively processed.
489        if let toml::Value::Table(table) = value {
490            let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
491            let sub_section = if section.is_empty() {
492                name
493            } else {
494                format!("{}.{}", section, name)
495            };
496            section_configs
497                .into_iter()
498                .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
499        } else if let Some(t) = configs.get_mut(&section)
500            && let Some(item_doc) = t.get_mut(&name)
501        {
502            item_doc.default = format!("{}", value);
503        }
504    }
505
506    #[test]
507    fn test_object_store_configs_backward_compatibility() {
508        // Define configs with the old name and make sure it still works
509        {
510            let config: RwConfig = toml::from_str(
511                r#"
512            [storage.object_store]
513            object_store_set_atomic_write_dir = true
514
515            [storage.object_store.s3]
516            object_store_keepalive_ms = 1
517            object_store_send_buffer_size = 1
518            object_store_recv_buffer_size = 1
519            object_store_nodelay = false
520
521            [storage.object_store.s3.developer]
522            object_store_retry_unknown_service_error = true
523            object_store_retryable_service_error_codes = ['dummy']
524
525
526            "#,
527            )
528            .unwrap();
529
530            assert!(config.storage.object_store.set_atomic_write_dir);
531            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
532            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
533            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
534            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
535            assert!(
536                config
537                    .storage
538                    .object_store
539                    .s3
540                    .developer
541                    .retry_unknown_service_error
542            );
543            assert_eq!(
544                config
545                    .storage
546                    .object_store
547                    .s3
548                    .developer
549                    .retryable_service_error_codes,
550                vec!["dummy".to_owned()]
551            );
552        }
553
554        // Define configs with the new name and make sure it works
555        {
556            let config: RwConfig = toml::from_str(
557                r#"
558            [storage.object_store]
559            set_atomic_write_dir = true
560
561            [storage.object_store.s3]
562            keepalive_ms = 1
563            send_buffer_size = 1
564            recv_buffer_size = 1
565            nodelay = false
566
567            [storage.object_store.s3.developer]
568            retry_unknown_service_error = true
569            retryable_service_error_codes = ['dummy']
570
571
572            "#,
573            )
574            .unwrap();
575
576            assert!(config.storage.object_store.set_atomic_write_dir);
577            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
578            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
579            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
580            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
581            assert!(
582                config
583                    .storage
584                    .object_store
585                    .s3
586                    .developer
587                    .retry_unknown_service_error
588            );
589            assert_eq!(
590                config
591                    .storage
592                    .object_store
593                    .s3
594                    .developer
595                    .retryable_service_error_codes,
596                vec!["dummy".to_owned()]
597            );
598        }
599    }
600
601    #[test]
602    fn test_meta_configs_backward_compatibility() {
603        // Test periodic_space_reclaim_compaction_interval_sec
604        {
605            let config: RwConfig = toml::from_str(
606                r#"
607            [meta]
608            periodic_split_compact_group_interval_sec = 1
609            table_write_throughput_threshold = 10
610            min_table_split_write_throughput = 5
611            "#,
612            )
613            .unwrap();
614
615            assert_eq!(
616                config
617                    .meta
618                    .periodic_scheduling_compaction_group_split_interval_sec,
619                1
620            );
621            assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
622            assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
623        }
624    }
625}