risingwave_common/config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This pub module defines the structure of the configuration file `risingwave.toml`.
16//!
17//! [`RwConfig`] corresponds to the whole config file and each other config struct corresponds to a
18//! section in `risingwave.toml`.
19
20pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig, MetaStoreConfig};
28pub mod streaming;
29pub use streaming::{AsyncStackTraceOption, StreamingConfig};
30pub mod server;
31pub use server::{HeapProfilingConfig, ServerConfig};
32pub mod udf;
33pub use udf::UdfConfig;
34pub mod storage;
35pub use storage::{
36    CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
37    extract_storage_memory_config,
38};
39pub mod merge;
40pub mod mutate;
41pub mod system;
42pub mod utils;
43
44use std::collections::BTreeMap;
45use std::fs;
46use std::num::NonZeroUsize;
47
48use anyhow::Context;
49use clap::ValueEnum;
50use educe::Educe;
51pub use merge::*;
52use risingwave_common_proc_macro::ConfigDoc;
53pub use risingwave_common_proc_macro::OverrideConfig;
54use risingwave_pb::meta::SystemParams;
55use serde::{Deserialize, Serialize, Serializer};
56use serde_default::DefaultFromSerde;
57use serde_json::Value;
58pub use system::SystemConfig;
59pub use utils::*;
60
61use crate::for_all_params;
62
63/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
64/// streams on the same connection.
65pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
66/// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange,
67/// as we don't rely on this for back-pressure.
68pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB
69
70/// [`RwConfig`] corresponds to the whole config file `risingwave.toml`. Each field corresponds to a
71/// section.
72#[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
73#[educe(Debug)]
74pub struct RwConfig {
75    #[serde(default)]
76    #[config_doc(nested)]
77    pub server: ServerConfig,
78
79    #[serde(default)]
80    #[config_doc(nested)]
81    pub meta: MetaConfig,
82
83    #[serde(default)]
84    #[config_doc(nested)]
85    pub batch: BatchConfig,
86
87    #[serde(default)]
88    #[config_doc(nested)]
89    pub frontend: FrontendConfig,
90
91    #[serde(default)]
92    #[config_doc(nested)]
93    pub streaming: StreamingConfig,
94
95    #[serde(default)]
96    #[config_doc(nested)]
97    pub storage: StorageConfig,
98
99    #[serde(default)]
100    #[educe(Debug(ignore))]
101    #[config_doc(nested)]
102    pub system: SystemConfig,
103
104    #[serde(default)]
105    #[config_doc(nested)]
106    pub udf: UdfConfig,
107
108    #[serde(flatten)]
109    #[config_doc(omitted)]
110    pub unrecognized: Unrecognized<Self>,
111}
112
113/// `[meta.developer.meta_compute_client_config]`
114/// `[meta.developer.meta_stream_client_config]`
115/// `[meta.developer.meta_frontend_client_config]`
116/// `[batch.developer.batch_compute_client_config]`
117/// `[batch.developer.batch_frontend_client_config]`
118/// `[streaming.developer.stream_compute_client_config]`
119#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
120pub struct RpcClientConfig {
121    #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
122    pub connect_timeout_secs: u64,
123}
124
125pub use risingwave_common_metrics::MetricLevel;
126
127impl RwConfig {
128    pub const fn default_connection_pool_size(&self) -> u16 {
129        self.server.connection_pool_size
130    }
131
132    /// Returns [`streaming::StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
133    /// otherwise [`ServerConfig::connection_pool_size`].
134    pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
135        self.streaming
136            .developer
137            .exchange_connection_pool_size
138            .unwrap_or_else(|| self.default_connection_pool_size())
139    }
140
141    /// Returns [`batch::BatchDeveloperConfig::exchange_connection_pool_size`] if set,
142    /// otherwise [`ServerConfig::connection_pool_size`].
143    pub fn batch_exchange_connection_pool_size(&self) -> u16 {
144        self.batch
145            .developer
146            .exchange_connection_pool_size
147            .unwrap_or_else(|| self.default_connection_pool_size())
148    }
149}
150
151pub mod default {
152
153    pub mod developer {
154        pub fn meta_cached_traces_num() -> u32 {
155            256
156        }
157
158        pub fn meta_cached_traces_memory_limit_bytes() -> usize {
159            1 << 27 // 128 MiB
160        }
161
162        pub fn batch_output_channel_size() -> usize {
163            64
164        }
165
166        pub fn batch_receiver_channel_size() -> usize {
167            1000
168        }
169
170        pub fn batch_root_stage_channel_size() -> usize {
171            100
172        }
173
174        pub fn batch_chunk_size() -> usize {
175            1024
176        }
177
178        pub fn batch_local_execute_buffer_size() -> usize {
179            64
180        }
181
182        /// Default to unset to be compatible with the behavior before this config is introduced,
183        /// that is, follow the value of `server.connection_pool_size`.
184        pub fn batch_exchange_connection_pool_size() -> Option<u16> {
185            None
186        }
187
188        pub fn stream_enable_executor_row_count() -> bool {
189            false
190        }
191
192        pub fn connector_message_buffer_size() -> usize {
193            16
194        }
195
196        pub fn unsafe_stream_extreme_cache_size() -> usize {
197            10
198        }
199
200        pub fn stream_topn_cache_min_capacity() -> usize {
201            10
202        }
203
204        pub fn stream_chunk_size() -> usize {
205            256
206        }
207
208        pub fn stream_exchange_initial_permits() -> usize {
209            2048
210        }
211
212        pub fn stream_exchange_batched_permits() -> usize {
213            256
214        }
215
216        pub fn stream_exchange_concurrent_barriers() -> usize {
217            1
218        }
219
220        pub fn stream_exchange_concurrent_dispatchers() -> usize {
221            0
222        }
223
224        pub fn stream_dml_channel_initial_permits() -> usize {
225            32768
226        }
227
228        pub fn stream_max_barrier_batch_size() -> u32 {
229            1024
230        }
231
232        pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
233            64 << 20 // 64MB
234        }
235
236        pub fn enable_trivial_move() -> bool {
237            true
238        }
239
240        pub fn enable_check_task_level_overlap() -> bool {
241            false
242        }
243
244        pub fn max_trivial_move_task_count_per_loop() -> usize {
245            256
246        }
247
248        pub fn max_get_task_probe_times() -> usize {
249            5
250        }
251
252        pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
253            100
254        }
255
256        pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
257            400
258        }
259
260        pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
261            10_000
262        }
263
264        pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
265            100
266        }
267
268        pub fn time_travel_vacuum_interval_sec() -> u64 {
269            30
270        }
271        pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
272            1000
273        }
274
275        pub fn hummock_gc_history_insert_batch_size() -> usize {
276            1000
277        }
278
279        pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
280            1000
281        }
282
283        pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
284            false
285        }
286
287        pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
288            10
289        }
290
291        pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
292            1000
293        }
294
295        pub fn memory_controller_threshold_aggressive() -> f64 {
296            0.9
297        }
298
299        pub fn memory_controller_threshold_graceful() -> f64 {
300            0.81
301        }
302
303        pub fn memory_controller_threshold_stable() -> f64 {
304            0.72
305        }
306
307        pub fn memory_controller_eviction_factor_aggressive() -> f64 {
308            2.0
309        }
310
311        pub fn memory_controller_eviction_factor_graceful() -> f64 {
312            1.5
313        }
314
315        pub fn memory_controller_eviction_factor_stable() -> f64 {
316            1.0
317        }
318
319        pub fn memory_controller_update_interval_ms() -> usize {
320            100
321        }
322
323        pub fn memory_controller_sequence_tls_step() -> u64 {
324            128
325        }
326
327        pub fn memory_controller_sequence_tls_lag() -> u64 {
328            32
329        }
330
331        pub fn stream_enable_arrangement_backfill() -> bool {
332            true
333        }
334
335        pub fn enable_shared_source() -> bool {
336            true
337        }
338
339        pub fn stream_high_join_amplification_threshold() -> usize {
340            2048
341        }
342
343        /// Default to 1 to be compatible with the behavior before this config is introduced.
344        pub fn stream_exchange_connection_pool_size() -> Option<u16> {
345            Some(1)
346        }
347
348        pub fn enable_actor_tokio_metrics() -> bool {
349            true
350        }
351
352        pub fn stream_enable_auto_schema_change() -> bool {
353            true
354        }
355
356        pub fn switch_jdbc_pg_to_native() -> bool {
357            false
358        }
359
360        pub fn streaming_hash_join_entry_state_max_rows() -> usize {
361            // NOTE(kwannoel): This is just an arbitrary number.
362            30000
363        }
364
365        pub fn streaming_now_progress_ratio() -> Option<f32> {
366            None
367        }
368
369        pub fn enable_explain_analyze_stats() -> bool {
370            true
371        }
372
373        pub fn rpc_client_connect_timeout_secs() -> u64 {
374            5
375        }
376
377        pub fn iceberg_list_interval_sec() -> u64 {
378            10
379        }
380
381        pub fn iceberg_fetch_batch_size() -> u64 {
382            1024
383        }
384
385        pub fn iceberg_sink_positional_delete_cache_size() -> usize {
386            1024
387        }
388
389        pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
390            100_000
391        }
392
393        pub fn materialize_force_overwrite_on_no_check() -> bool {
394            false
395        }
396
397        pub fn refresh_scheduler_interval_sec() -> u64 {
398            60
399        }
400
401        pub fn sync_log_store_pause_duration_ms() -> usize {
402            64
403        }
404
405        pub fn sync_log_store_buffer_size() -> usize {
406            2048
407        }
408    }
409}
410
411pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
412pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
413pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.
414
415#[cfg(test)]
416pub mod tests {
417    use expect_test::expect;
418    use risingwave_license::LicenseKey;
419
420    use super::*;
421
422    fn default_config_for_docs() -> RwConfig {
423        let mut config = RwConfig::default();
424        // Set `license_key` to empty in the docs to avoid any confusion.
425        config.system.license_key = Some(LicenseKey::empty());
426        config
427    }
428
429    /// This test ensures that `config/example.toml` is up-to-date with the default values specified
430    /// in this file. Developer should run `./risedev generate-example-config` to update it if this
431    /// test fails.
432    #[test]
433    fn test_example_up_to_date() {
434        const HEADER: &str = "# This file is generated by ./risedev generate-example-config
435# Check detailed comments in src/common/src/config.rs";
436
437        let actual = expect_test::expect_file!["../../../config/example.toml"];
438        let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
439
440        let expected = format!("{HEADER}\n\n{default}");
441        actual.assert_eq(&expected);
442
443        let expected = rw_config_to_markdown();
444        let actual = expect_test::expect_file!["../../../config/docs.md"];
445        actual.assert_eq(&expected);
446    }
447
448    #[derive(Debug)]
449    struct ConfigItemDoc {
450        desc: String,
451        default: String,
452    }
453
454    fn rw_config_to_markdown() -> String {
455        let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
456        RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
457
458        // Section -> Config Name -> ConfigItemDoc
459        let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
460            .into_iter()
461            .map(|(k, v)| {
462                let docs: BTreeMap<String, ConfigItemDoc> = v
463                    .into_iter()
464                    .map(|(name, desc)| {
465                        (
466                            name,
467                            ConfigItemDoc {
468                                desc,
469                                default: "".to_owned(), // unset
470                            },
471                        )
472                    })
473                    .collect();
474                (k, docs)
475            })
476            .collect();
477
478        let toml_doc: BTreeMap<String, toml::Value> =
479            toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
480        toml_doc.into_iter().for_each(|(name, value)| {
481            set_default_values("".to_owned(), name, value, &mut configs);
482        });
483
484        let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
485            + "This page is automatically generated by `./risedev generate-example-config`\n";
486        for (section, configs) in configs {
487            if configs.is_empty() {
488                continue;
489            }
490            markdown.push_str(&format!("\n## {}\n\n", section));
491            markdown.push_str("| Config | Description | Default |\n");
492            markdown.push_str("|--------|-------------|---------|\n");
493            for (config, doc) in configs {
494                markdown.push_str(&format!(
495                    "| {} | {} | {} |\n",
496                    config, doc.desc, doc.default
497                ));
498            }
499        }
500        markdown
501    }
502
503    fn set_default_values(
504        section: String,
505        name: String,
506        value: toml::Value,
507        configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
508    ) {
509        // Set the default value if it's a config name-value pair, otherwise it's a sub-section (Table) that should be recursively processed.
510        if let toml::Value::Table(table) = value {
511            let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
512            let sub_section = if section.is_empty() {
513                name
514            } else {
515                format!("{}.{}", section, name)
516            };
517            section_configs
518                .into_iter()
519                .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
520        } else if let Some(t) = configs.get_mut(&section)
521            && let Some(item_doc) = t.get_mut(&name)
522        {
523            item_doc.default = format!("{}", value);
524        }
525    }
526
527    #[test]
528    fn test_object_store_configs_backward_compatibility() {
529        // Define configs with the old name and make sure it still works
530        {
531            let config: RwConfig = toml::from_str(
532                r#"
533            [storage.object_store]
534            object_store_set_atomic_write_dir = true
535
536            [storage.object_store.s3]
537            object_store_keepalive_ms = 1
538            object_store_send_buffer_size = 1
539            object_store_recv_buffer_size = 1
540            object_store_nodelay = false
541
542            [storage.object_store.s3.developer]
543            object_store_retry_unknown_service_error = true
544            object_store_retryable_service_error_codes = ['dummy']
545
546
547            "#,
548            )
549            .unwrap();
550
551            assert!(config.storage.object_store.set_atomic_write_dir);
552            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
553            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
554            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
555            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
556            assert!(
557                config
558                    .storage
559                    .object_store
560                    .s3
561                    .developer
562                    .retry_unknown_service_error
563            );
564            assert_eq!(
565                config
566                    .storage
567                    .object_store
568                    .s3
569                    .developer
570                    .retryable_service_error_codes,
571                vec!["dummy".to_owned()]
572            );
573        }
574
575        // Define configs with the new name and make sure it works
576        {
577            let config: RwConfig = toml::from_str(
578                r#"
579            [storage.object_store]
580            set_atomic_write_dir = true
581
582            [storage.object_store.s3]
583            keepalive_ms = 1
584            send_buffer_size = 1
585            recv_buffer_size = 1
586            nodelay = false
587
588            [storage.object_store.s3.developer]
589            retry_unknown_service_error = true
590            retryable_service_error_codes = ['dummy']
591
592
593            "#,
594            )
595            .unwrap();
596
597            assert!(config.storage.object_store.set_atomic_write_dir);
598            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
599            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
600            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
601            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
602            assert!(
603                config
604                    .storage
605                    .object_store
606                    .s3
607                    .developer
608                    .retry_unknown_service_error
609            );
610            assert_eq!(
611                config
612                    .storage
613                    .object_store
614                    .s3
615                    .developer
616                    .retryable_service_error_codes,
617                vec!["dummy".to_owned()]
618            );
619        }
620    }
621
622    #[test]
623    fn test_meta_configs_backward_compatibility() {
624        // Test periodic_space_reclaim_compaction_interval_sec
625        {
626            let config: RwConfig = toml::from_str(
627                r#"
628            [meta]
629            periodic_split_compact_group_interval_sec = 1
630            table_write_throughput_threshold = 10
631            min_table_split_write_throughput = 5
632            "#,
633            )
634            .unwrap();
635
636            assert_eq!(
637                config
638                    .meta
639                    .periodic_scheduling_compaction_group_split_interval_sec,
640                1
641            );
642            assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
643            assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
644        }
645    }
646
647    // Previously, we have prefixes like `stream_` for all configs under `streaming.developer`.
648    // Later we removed the prefixes, but we still want to guarantee the backward compatibility.
649    #[test]
650    fn test_prefix_alias() {
651        let config: RwConfig = toml::from_str(
652            "
653            [streaming.developer]
654            stream_chunk_size = 114514
655
656            [streaming.developer.stream_compute_client_config]
657            connect_timeout_secs = 42
658            ",
659        )
660        .unwrap();
661
662        assert_eq!(config.streaming.developer.chunk_size, 114514);
663        assert_eq!(
664            config
665                .streaming
666                .developer
667                .compute_client_config
668                .connect_timeout_secs,
669            42
670        );
671    }
672
673    #[test]
674    fn test_prefix_alias_duplicate() {
675        let config = toml::from_str::<RwConfig>(
676            "
677            [streaming.developer]
678            stream_chunk_size = 114514
679            chunk_size = 1919810
680            ",
681        )
682        .unwrap_err();
683
684        expect![[r#"
685            TOML parse error at line 2, column 13
686              |
687            2 |             [streaming.developer]
688              |             ^^^^^^^^^^^^^^^^^^^^^
689            duplicate field `chunk_size`
690        "#]]
691        .assert_eq(&config.to_string());
692
693        let config = toml::from_str::<RwConfig>(
694            "
695            [streaming.developer.stream_compute_client_config]
696            connect_timeout_secs = 5
697
698            [streaming.developer.compute_client_config]
699            connect_timeout_secs = 10
700            ",
701        )
702        .unwrap_err();
703
704        expect![[r#"
705            TOML parse error at line 2, column 24
706              |
707            2 |             [streaming.developer.stream_compute_client_config]
708              |                        ^^^^^^^^^
709            duplicate field `compute_client_config`
710        "#]]
711        .assert_eq(&config.to_string());
712    }
713}