risingwave_common/config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This pub module defines the structure of the configuration file `risingwave.toml`.
16//!
17//! [`RwConfig`] corresponds to the whole config file and each other config struct corresponds to a
18//! section in `risingwave.toml`.
19
20pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig, MetaStoreConfig};
28pub mod streaming;
29pub use streaming::{AsyncStackTraceOption, StreamingConfig};
30pub mod server;
31pub use server::{HeapProfilingConfig, ServerConfig};
32pub mod udf;
33pub use udf::UdfConfig;
34pub mod storage;
35pub use storage::{
36    CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
37    extract_storage_memory_config,
38};
39pub mod merge;
40pub mod mutate;
41pub mod none_as_empty_string;
42pub mod system;
43pub mod utils;
44
45use std::collections::BTreeMap;
46use std::fs;
47use std::num::NonZeroUsize;
48
49use anyhow::Context;
50use clap::ValueEnum;
51use educe::Educe;
52pub use merge::*;
53use risingwave_common_proc_macro::ConfigDoc;
54pub use risingwave_common_proc_macro::OverrideConfig;
55use risingwave_pb::meta::SystemParams;
56use serde::{Deserialize, Serialize, Serializer};
57use serde_default::DefaultFromSerde;
58use serde_json::Value;
59pub use system::SystemConfig;
60pub use utils::*;
61
62use crate::for_all_params;
63
64/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
65/// streams on the same connection.
66pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
67/// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange,
68/// as we don't rely on this for back-pressure.
69pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB
70
71/// [`RwConfig`] corresponds to the whole config file `risingwave.toml`. Each field corresponds to a
72/// section.
73#[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
74#[educe(Debug)]
75pub struct RwConfig {
76    #[serde(default)]
77    #[config_doc(nested)]
78    pub server: ServerConfig,
79
80    #[serde(default)]
81    #[config_doc(nested)]
82    pub meta: MetaConfig,
83
84    #[serde(default)]
85    #[config_doc(nested)]
86    pub batch: BatchConfig,
87
88    #[serde(default)]
89    #[config_doc(nested)]
90    pub frontend: FrontendConfig,
91
92    #[serde(default)]
93    #[config_doc(nested)]
94    pub streaming: StreamingConfig,
95
96    #[serde(default)]
97    #[config_doc(nested)]
98    pub storage: StorageConfig,
99
100    #[serde(default)]
101    #[educe(Debug(ignore))]
102    #[config_doc(nested)]
103    pub system: SystemConfig,
104
105    #[serde(default)]
106    #[config_doc(nested)]
107    pub udf: UdfConfig,
108
109    #[serde(flatten)]
110    #[config_doc(omitted)]
111    pub unrecognized: Unrecognized<Self>,
112}
113
114/// `[meta.developer.meta_compute_client_config]`
115/// `[meta.developer.meta_stream_client_config]`
116/// `[meta.developer.meta_frontend_client_config]`
117/// `[batch.developer.batch_compute_client_config]`
118/// `[batch.developer.batch_frontend_client_config]`
119/// `[streaming.developer.stream_compute_client_config]`
120#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
121#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
122pub struct RpcClientConfig {
123    #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
124    pub connect_timeout_secs: u64,
125}
126
127pub use risingwave_common_metrics::MetricLevel;
128
129impl RwConfig {
130    pub const fn default_connection_pool_size(&self) -> u16 {
131        self.server.connection_pool_size
132    }
133
134    /// Returns [`streaming::StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
135    /// otherwise [`ServerConfig::connection_pool_size`].
136    pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
137        self.streaming
138            .developer
139            .exchange_connection_pool_size
140            .unwrap_or_else(|| self.default_connection_pool_size())
141    }
142
143    /// Returns [`batch::BatchDeveloperConfig::exchange_connection_pool_size`] if set,
144    /// otherwise [`ServerConfig::connection_pool_size`].
145    pub fn batch_exchange_connection_pool_size(&self) -> u16 {
146        self.batch
147            .developer
148            .exchange_connection_pool_size
149            .unwrap_or_else(|| self.default_connection_pool_size())
150    }
151}
152
153pub mod default {
154
155    pub mod developer {
156        pub fn meta_cached_traces_num() -> u32 {
157            256
158        }
159
160        pub fn meta_cached_traces_memory_limit_bytes() -> usize {
161            1 << 27 // 128 MiB
162        }
163
164        pub fn batch_output_channel_size() -> usize {
165            64
166        }
167
168        pub fn batch_receiver_channel_size() -> usize {
169            1000
170        }
171
172        pub fn batch_root_stage_channel_size() -> usize {
173            100
174        }
175
176        pub fn batch_chunk_size() -> usize {
177            1024
178        }
179
180        pub fn batch_local_execute_buffer_size() -> usize {
181            64
182        }
183
184        /// Default to unset to be compatible with the behavior before this config is introduced,
185        /// that is, follow the value of `server.connection_pool_size`.
186        pub fn batch_exchange_connection_pool_size() -> Option<u16> {
187            None
188        }
189
190        pub fn stream_enable_executor_row_count() -> bool {
191            false
192        }
193
194        pub fn connector_message_buffer_size() -> usize {
195            16
196        }
197
198        pub fn unsafe_stream_extreme_cache_size() -> usize {
199            10
200        }
201
202        pub fn stream_topn_cache_min_capacity() -> usize {
203            10
204        }
205
206        pub fn stream_chunk_size() -> usize {
207            256
208        }
209
210        pub fn stream_exchange_initial_permits() -> usize {
211            2048
212        }
213
214        pub fn stream_exchange_batched_permits() -> usize {
215            256
216        }
217
218        pub fn stream_exchange_concurrent_barriers() -> usize {
219            1
220        }
221
222        pub fn stream_exchange_concurrent_dispatchers() -> usize {
223            0
224        }
225
226        pub fn stream_dml_channel_initial_permits() -> usize {
227            32768
228        }
229
230        pub fn stream_max_barrier_batch_size() -> u32 {
231            1024
232        }
233
234        pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
235            64 << 20 // 64MB
236        }
237
238        pub fn enable_trivial_move() -> bool {
239            true
240        }
241
242        pub fn enable_check_task_level_overlap() -> bool {
243            false
244        }
245
246        pub fn max_trivial_move_task_count_per_loop() -> usize {
247            256
248        }
249
250        pub fn max_get_task_probe_times() -> usize {
251            5
252        }
253
254        pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
255            100
256        }
257
258        pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
259            400
260        }
261
262        pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
263            10_000
264        }
265
266        pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
267            100
268        }
269
270        pub fn time_travel_vacuum_interval_sec() -> u64 {
271            30
272        }
273
274        pub fn time_travel_vacuum_max_version_count() -> Option<u32> {
275            Some(10000)
276        }
277
278        pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
279            1000
280        }
281
282        pub fn hummock_gc_history_insert_batch_size() -> usize {
283            1000
284        }
285
286        pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
287            1000
288        }
289
290        pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
291            false
292        }
293
294        pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
295            10
296        }
297
298        pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
299            1000
300        }
301
302        pub fn memory_controller_threshold_aggressive() -> f64 {
303            0.9
304        }
305
306        pub fn memory_controller_threshold_graceful() -> f64 {
307            0.81
308        }
309
310        pub fn memory_controller_threshold_stable() -> f64 {
311            0.72
312        }
313
314        pub fn memory_controller_eviction_factor_aggressive() -> f64 {
315            2.0
316        }
317
318        pub fn memory_controller_eviction_factor_graceful() -> f64 {
319            1.5
320        }
321
322        pub fn memory_controller_eviction_factor_stable() -> f64 {
323            1.0
324        }
325
326        pub fn memory_controller_update_interval_ms() -> usize {
327            100
328        }
329
330        pub fn memory_controller_sequence_tls_step() -> u64 {
331            128
332        }
333
334        pub fn memory_controller_sequence_tls_lag() -> u64 {
335            32
336        }
337
338        pub fn stream_enable_arrangement_backfill() -> bool {
339            true
340        }
341
342        pub fn stream_enable_snapshot_backfill() -> bool {
343            true
344        }
345
346        pub fn enable_shared_source() -> bool {
347            true
348        }
349
350        pub fn stream_high_join_amplification_threshold() -> usize {
351            2048
352        }
353
354        /// Default to 1 to be compatible with the behavior before this config is introduced.
355        pub fn stream_exchange_connection_pool_size() -> Option<u16> {
356            Some(1)
357        }
358
359        pub fn enable_actor_tokio_metrics() -> bool {
360            true
361        }
362
363        pub fn stream_enable_auto_schema_change() -> bool {
364            true
365        }
366
367        pub fn switch_jdbc_pg_to_native() -> bool {
368            false
369        }
370
371        pub fn streaming_hash_join_entry_state_max_rows() -> usize {
372            // NOTE(kwannoel): This is just an arbitrary number.
373            30000
374        }
375
376        pub fn streaming_now_progress_ratio() -> Option<f32> {
377            None
378        }
379
380        pub fn stream_snapshot_iter_rebuild_interval_secs() -> u64 {
381            10 * 60
382        }
383
384        pub fn enable_explain_analyze_stats() -> bool {
385            true
386        }
387
388        pub fn rpc_client_connect_timeout_secs() -> u64 {
389            5
390        }
391
392        pub fn iceberg_list_interval_sec() -> u64 {
393            10
394        }
395
396        pub fn iceberg_fetch_batch_size() -> u64 {
397            1024
398        }
399
400        pub fn iceberg_sink_positional_delete_cache_size() -> usize {
401            1024
402        }
403
404        pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
405            100_000
406        }
407
408        pub fn materialize_force_overwrite_on_no_check() -> bool {
409            false
410        }
411
412        pub fn refresh_scheduler_interval_sec() -> u64 {
413            60
414        }
415
416        pub fn sync_log_store_pause_duration_ms() -> usize {
417            64
418        }
419
420        pub fn sync_log_store_buffer_size() -> usize {
421            2048
422        }
423
424        pub fn enable_state_table_vnode_stats_pruning() -> bool {
425            false
426        }
427    }
428}
429
430pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
431pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
432pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.
433
434#[cfg(test)]
435pub mod tests {
436    use expect_test::expect;
437    use risingwave_license::LicenseKey;
438
439    use super::*;
440
441    fn default_config_for_docs() -> RwConfig {
442        let mut config = RwConfig::default();
443        // Set `license_key` to empty in the docs to avoid any confusion.
444        config.system.license_key = Some(LicenseKey::empty());
445        config
446    }
447
448    /// This test ensures that `config/example.toml` is up-to-date with the default values specified
449    /// in this file. Developer should run `./risedev generate-example-config` to update it if this
450    /// test fails.
451    #[test]
452    fn test_example_up_to_date() {
453        const HEADER: &str = "# This file is generated by ./risedev generate-example-config
454# Check detailed comments in src/common/src/config.rs";
455
456        let actual = expect_test::expect_file!["../../../config/example.toml"];
457        let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
458
459        let expected = format!("{HEADER}\n\n{default}");
460        actual.assert_eq(&expected);
461
462        let expected = rw_config_to_markdown();
463        let actual = expect_test::expect_file!["../../../config/docs.md"];
464        actual.assert_eq(&expected);
465    }
466
467    #[derive(Debug)]
468    struct ConfigItemDoc {
469        desc: String,
470        default: String,
471    }
472
473    fn rw_config_to_markdown() -> String {
474        let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
475        RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
476
477        // Section -> Config Name -> ConfigItemDoc
478        let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
479            .into_iter()
480            .map(|(k, v)| {
481                let docs: BTreeMap<String, ConfigItemDoc> = v
482                    .into_iter()
483                    .map(|(name, desc)| {
484                        (
485                            name,
486                            ConfigItemDoc {
487                                desc,
488                                default: "".to_owned(), // unset
489                            },
490                        )
491                    })
492                    .collect();
493                (k, docs)
494            })
495            .collect();
496
497        let toml_doc: BTreeMap<String, toml::Value> =
498            toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
499        toml_doc.into_iter().for_each(|(name, value)| {
500            set_default_values("".to_owned(), name, value, &mut configs);
501        });
502
503        let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
504            + "This page is automatically generated by `./risedev generate-example-config`\n";
505        for (section, configs) in configs {
506            if configs.is_empty() {
507                continue;
508            }
509            markdown.push_str(&format!("\n## {}\n\n", section));
510            markdown.push_str("| Config | Description | Default |\n");
511            markdown.push_str("|--------|-------------|---------|\n");
512            for (config, doc) in configs {
513                markdown.push_str(&format!(
514                    "| {} | {} | {} |\n",
515                    config, doc.desc, doc.default
516                ));
517            }
518        }
519        markdown
520    }
521
522    fn set_default_values(
523        section: String,
524        name: String,
525        value: toml::Value,
526        configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
527    ) {
528        // Set the default value if it's a config name-value pair, otherwise it's a sub-section (Table) that should be recursively processed.
529        if let toml::Value::Table(table) = value {
530            let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
531            let sub_section = if section.is_empty() {
532                name
533            } else {
534                format!("{}.{}", section, name)
535            };
536            section_configs
537                .into_iter()
538                .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
539        } else if let Some(t) = configs.get_mut(&section)
540            && let Some(item_doc) = t.get_mut(&name)
541        {
542            item_doc.default = format!("{}", value);
543        }
544    }
545
546    #[test]
547    fn test_object_store_configs_backward_compatibility() {
548        // Define configs with the old name and make sure it still works
549        {
550            let config: RwConfig = toml::from_str(
551                r#"
552            [storage.object_store]
553            object_store_set_atomic_write_dir = true
554
555            [storage.object_store.s3]
556            object_store_keepalive_ms = 1
557            object_store_send_buffer_size = 1
558            object_store_recv_buffer_size = 1
559            object_store_nodelay = false
560
561            [storage.object_store.s3.developer]
562            object_store_retry_unknown_service_error = true
563            object_store_retryable_service_error_codes = ['dummy']
564
565
566            "#,
567            )
568            .unwrap();
569
570            assert!(config.storage.object_store.set_atomic_write_dir);
571            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
572            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
573            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
574            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
575            assert!(
576                config
577                    .storage
578                    .object_store
579                    .s3
580                    .developer
581                    .retry_unknown_service_error
582            );
583            assert_eq!(
584                config
585                    .storage
586                    .object_store
587                    .s3
588                    .developer
589                    .retryable_service_error_codes,
590                vec!["dummy".to_owned()]
591            );
592        }
593
594        // Define configs with the new name and make sure it works
595        {
596            let config: RwConfig = toml::from_str(
597                r#"
598            [storage.object_store]
599            set_atomic_write_dir = true
600
601            [storage.object_store.s3]
602            keepalive_ms = 1
603            send_buffer_size = 1
604            recv_buffer_size = 1
605            nodelay = false
606
607            [storage.object_store.s3.developer]
608            retry_unknown_service_error = true
609            retryable_service_error_codes = ['dummy']
610
611
612            "#,
613            )
614            .unwrap();
615
616            assert!(config.storage.object_store.set_atomic_write_dir);
617            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
618            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
619            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
620            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
621            assert!(
622                config
623                    .storage
624                    .object_store
625                    .s3
626                    .developer
627                    .retry_unknown_service_error
628            );
629            assert_eq!(
630                config
631                    .storage
632                    .object_store
633                    .s3
634                    .developer
635                    .retryable_service_error_codes,
636                vec!["dummy".to_owned()]
637            );
638        }
639    }
640
641    #[test]
642    fn test_meta_configs_backward_compatibility() {
643        // Test periodic_space_reclaim_compaction_interval_sec
644        {
645            let config: RwConfig = toml::from_str(
646                r#"
647            [meta]
648            periodic_split_compact_group_interval_sec = 1
649            table_write_throughput_threshold = 10
650            min_table_split_write_throughput = 5
651            "#,
652            )
653            .unwrap();
654
655            assert_eq!(
656                config
657                    .meta
658                    .periodic_scheduling_compaction_group_split_interval_sec,
659                1
660            );
661            assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
662            assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
663        }
664    }
665
666    // Previously, we have prefixes like `stream_` for all configs under `streaming.developer`.
667    // Later we removed the prefixes, but we still want to guarantee the backward compatibility.
668    #[test]
669    fn test_prefix_alias() {
670        let config: RwConfig = toml::from_str(
671            "
672            [streaming.developer]
673            stream_chunk_size = 114514
674
675            [streaming.developer.stream_compute_client_config]
676            connect_timeout_secs = 42
677            ",
678        )
679        .unwrap();
680
681        assert_eq!(config.streaming.developer.chunk_size, 114514);
682        assert_eq!(
683            config
684                .streaming
685                .developer
686                .compute_client_config
687                .connect_timeout_secs,
688            42
689        );
690    }
691
692    #[test]
693    fn test_prefix_alias_duplicate() {
694        let config = toml::from_str::<RwConfig>(
695            "
696            [streaming.developer]
697            stream_chunk_size = 114514
698            chunk_size = 1919810
699            ",
700        )
701        .unwrap_err();
702
703        expect![[r#"
704            TOML parse error at line 2, column 13
705              |
706            2 |             [streaming.developer]
707              |             ^^^^^^^^^^^^^^^^^^^^^
708            duplicate field `chunk_size`
709        "#]]
710        .assert_eq(&config.to_string());
711
712        let config = toml::from_str::<RwConfig>(
713            "
714            [streaming.developer.stream_compute_client_config]
715            connect_timeout_secs = 5
716
717            [streaming.developer.compute_client_config]
718            connect_timeout_secs = 10
719            ",
720        )
721        .unwrap_err();
722
723        expect![[r#"
724            TOML parse error at line 2, column 24
725              |
726            2 |             [streaming.developer.stream_compute_client_config]
727              |                        ^^^^^^^^^
728            duplicate field `compute_client_config`
729        "#]]
730        .assert_eq(&config.to_string());
731    }
732}