risingwave_common/config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This pub module defines the structure of the configuration file `risingwave.toml`.
16//!
17//! [`RwConfig`] corresponds to the whole config file and each other config struct corresponds to a
18//! section in `risingwave.toml`.
19
20pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod meta;
25pub use meta::{CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig, MetaStoreConfig};
26pub mod streaming;
27pub use streaming::{AsyncStackTraceOption, StreamingConfig};
28pub mod server;
29pub use server::{HeapProfilingConfig, ServerConfig};
30pub mod udf;
31pub use udf::UdfConfig;
32pub mod storage;
33pub use storage::{
34    CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
35    extract_storage_memory_config,
36};
37pub mod system;
38pub mod utils;
39use std::collections::BTreeMap;
40use std::fs;
41use std::num::NonZeroUsize;
42
43use anyhow::Context;
44use clap::ValueEnum;
45use educe::Educe;
46use risingwave_common_proc_macro::ConfigDoc;
47pub use risingwave_common_proc_macro::OverrideConfig;
48use risingwave_pb::meta::SystemParams;
49use serde::{Deserialize, Serialize, Serializer};
50use serde_default::DefaultFromSerde;
51use serde_json::Value;
52pub use system::SystemConfig;
53pub use utils::*;
54
55use crate::for_all_params;
56
57/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
58/// streams on the same connection.
59pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
60/// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange,
61/// as we don't rely on this for back-pressure.
62pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB
63
64/// [`RwConfig`] corresponds to the whole config file `risingwave.toml`. Each field corresponds to a
65/// section.
66#[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
67#[educe(Debug)]
68pub struct RwConfig {
69    #[serde(default)]
70    #[config_doc(nested)]
71    pub server: ServerConfig,
72
73    #[serde(default)]
74    #[config_doc(nested)]
75    pub meta: MetaConfig,
76
77    #[serde(default)]
78    #[config_doc(nested)]
79    pub batch: BatchConfig,
80
81    #[serde(default)]
82    #[config_doc(nested)]
83    pub frontend: FrontendConfig,
84
85    #[serde(default)]
86    #[config_doc(nested)]
87    pub streaming: StreamingConfig,
88
89    #[serde(default)]
90    #[config_doc(nested)]
91    pub storage: StorageConfig,
92
93    #[serde(default)]
94    #[educe(Debug(ignore))]
95    #[config_doc(nested)]
96    pub system: SystemConfig,
97
98    #[serde(default)]
99    #[config_doc(nested)]
100    pub udf: UdfConfig,
101
102    #[serde(flatten)]
103    #[config_doc(omitted)]
104    pub unrecognized: Unrecognized<Self>,
105}
106
107/// `[meta.developer.meta_compute_client_config]`
108/// `[meta.developer.meta_stream_client_config]`
109/// `[meta.developer.meta_frontend_client_config]`
110/// `[batch.developer.batch_compute_client_config]`
111/// `[batch.developer.batch_frontend_client_config]`
112/// `[streaming.developer.stream_compute_client_config]`
113#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
114pub struct RpcClientConfig {
115    #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
116    pub connect_timeout_secs: u64,
117}
118
119pub use risingwave_common_metrics::MetricLevel;
120
121impl RwConfig {
122    pub const fn default_connection_pool_size(&self) -> u16 {
123        self.server.connection_pool_size
124    }
125
126    /// Returns [`streaming::StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
127    /// otherwise [`ServerConfig::connection_pool_size`].
128    pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
129        self.streaming
130            .developer
131            .exchange_connection_pool_size
132            .unwrap_or_else(|| self.default_connection_pool_size())
133    }
134
135    /// Returns [`batch::BatchDeveloperConfig::exchange_connection_pool_size`] if set,
136    /// otherwise [`ServerConfig::connection_pool_size`].
137    pub fn batch_exchange_connection_pool_size(&self) -> u16 {
138        self.batch
139            .developer
140            .exchange_connection_pool_size
141            .unwrap_or_else(|| self.default_connection_pool_size())
142    }
143}
144
145pub mod default {
146
147    pub mod developer {
148        pub fn meta_cached_traces_num() -> u32 {
149            256
150        }
151
152        pub fn meta_cached_traces_memory_limit_bytes() -> usize {
153            1 << 27 // 128 MiB
154        }
155
156        pub fn batch_output_channel_size() -> usize {
157            64
158        }
159
160        pub fn batch_receiver_channel_size() -> usize {
161            1000
162        }
163
164        pub fn batch_root_stage_channel_size() -> usize {
165            100
166        }
167
168        pub fn batch_chunk_size() -> usize {
169            1024
170        }
171
172        pub fn batch_local_execute_buffer_size() -> usize {
173            64
174        }
175
176        /// Default to unset to be compatible with the behavior before this config is introduced,
177        /// that is, follow the value of `server.connection_pool_size`.
178        pub fn batch_exchange_connection_pool_size() -> Option<u16> {
179            None
180        }
181
182        pub fn stream_enable_executor_row_count() -> bool {
183            false
184        }
185
186        pub fn connector_message_buffer_size() -> usize {
187            16
188        }
189
190        pub fn unsafe_stream_extreme_cache_size() -> usize {
191            10
192        }
193
194        pub fn stream_topn_cache_min_capacity() -> usize {
195            10
196        }
197
198        pub fn stream_chunk_size() -> usize {
199            256
200        }
201
202        pub fn stream_exchange_initial_permits() -> usize {
203            2048
204        }
205
206        pub fn stream_exchange_batched_permits() -> usize {
207            256
208        }
209
210        pub fn stream_exchange_concurrent_barriers() -> usize {
211            1
212        }
213
214        pub fn stream_exchange_concurrent_dispatchers() -> usize {
215            0
216        }
217
218        pub fn stream_dml_channel_initial_permits() -> usize {
219            32768
220        }
221
222        pub fn stream_max_barrier_batch_size() -> u32 {
223            1024
224        }
225
226        pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
227            64 << 20 // 64MB
228        }
229
230        pub fn enable_trivial_move() -> bool {
231            true
232        }
233
234        pub fn enable_check_task_level_overlap() -> bool {
235            false
236        }
237
238        pub fn max_trivial_move_task_count_per_loop() -> usize {
239            256
240        }
241
242        pub fn max_get_task_probe_times() -> usize {
243            5
244        }
245
246        pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
247            100
248        }
249
250        pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
251            400
252        }
253
254        pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
255            10_000
256        }
257
258        pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
259            100
260        }
261
262        pub fn time_travel_vacuum_interval_sec() -> u64 {
263            30
264        }
265        pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
266            1000
267        }
268
269        pub fn hummock_gc_history_insert_batch_size() -> usize {
270            1000
271        }
272
273        pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
274            1000
275        }
276
277        pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
278            false
279        }
280
281        pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
282            10
283        }
284
285        pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
286            1000
287        }
288
289        pub fn memory_controller_threshold_aggressive() -> f64 {
290            0.9
291        }
292
293        pub fn memory_controller_threshold_graceful() -> f64 {
294            0.81
295        }
296
297        pub fn memory_controller_threshold_stable() -> f64 {
298            0.72
299        }
300
301        pub fn memory_controller_eviction_factor_aggressive() -> f64 {
302            2.0
303        }
304
305        pub fn memory_controller_eviction_factor_graceful() -> f64 {
306            1.5
307        }
308
309        pub fn memory_controller_eviction_factor_stable() -> f64 {
310            1.0
311        }
312
313        pub fn memory_controller_update_interval_ms() -> usize {
314            100
315        }
316
317        pub fn memory_controller_sequence_tls_step() -> u64 {
318            128
319        }
320
321        pub fn memory_controller_sequence_tls_lag() -> u64 {
322            32
323        }
324
325        pub fn stream_enable_arrangement_backfill() -> bool {
326            true
327        }
328
329        pub fn enable_shared_source() -> bool {
330            true
331        }
332
333        pub fn stream_high_join_amplification_threshold() -> usize {
334            2048
335        }
336
337        /// Default to 1 to be compatible with the behavior before this config is introduced.
338        pub fn stream_exchange_connection_pool_size() -> Option<u16> {
339            Some(1)
340        }
341
342        pub fn enable_actor_tokio_metrics() -> bool {
343            false
344        }
345
346        pub fn stream_enable_auto_schema_change() -> bool {
347            true
348        }
349
350        pub fn switch_jdbc_pg_to_native() -> bool {
351            false
352        }
353
354        pub fn streaming_hash_join_entry_state_max_rows() -> usize {
355            // NOTE(kwannoel): This is just an arbitrary number.
356            30000
357        }
358
359        pub fn streaming_now_progress_ratio() -> Option<f32> {
360            None
361        }
362
363        pub fn enable_explain_analyze_stats() -> bool {
364            true
365        }
366
367        pub fn rpc_client_connect_timeout_secs() -> u64 {
368            5
369        }
370
371        pub fn iceberg_list_interval_sec() -> u64 {
372            1
373        }
374
375        pub fn iceberg_fetch_batch_size() -> u64 {
376            1024
377        }
378
379        pub fn iceberg_sink_positional_delete_cache_size() -> usize {
380            1024
381        }
382
383        pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
384            100_000
385        }
386    }
387}
388
389pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
390pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
391pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.
392
393#[cfg(test)]
394pub mod tests {
395    use risingwave_license::LicenseKey;
396
397    use super::*;
398
399    fn default_config_for_docs() -> RwConfig {
400        let mut config = RwConfig::default();
401        // Set `license_key` to empty in the docs to avoid any confusion.
402        config.system.license_key = Some(LicenseKey::empty());
403        config
404    }
405
406    /// This test ensures that `config/example.toml` is up-to-date with the default values specified
407    /// in this file. Developer should run `./risedev generate-example-config` to update it if this
408    /// test fails.
409    #[test]
410    fn test_example_up_to_date() {
411        const HEADER: &str = "# This file is generated by ./risedev generate-example-config
412# Check detailed comments in src/common/src/config.rs";
413
414        let actual = expect_test::expect_file!["../../../config/example.toml"];
415        let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
416
417        let expected = format!("{HEADER}\n\n{default}");
418        actual.assert_eq(&expected);
419
420        let expected = rw_config_to_markdown();
421        let actual = expect_test::expect_file!["../../../config/docs.md"];
422        actual.assert_eq(&expected);
423    }
424
425    #[derive(Debug)]
426    struct ConfigItemDoc {
427        desc: String,
428        default: String,
429    }
430
431    fn rw_config_to_markdown() -> String {
432        let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
433        RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
434
435        // Section -> Config Name -> ConfigItemDoc
436        let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
437            .into_iter()
438            .map(|(k, v)| {
439                let docs: BTreeMap<String, ConfigItemDoc> = v
440                    .into_iter()
441                    .map(|(name, desc)| {
442                        (
443                            name,
444                            ConfigItemDoc {
445                                desc,
446                                default: "".to_owned(), // unset
447                            },
448                        )
449                    })
450                    .collect();
451                (k, docs)
452            })
453            .collect();
454
455        let toml_doc: BTreeMap<String, toml::Value> =
456            toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
457        toml_doc.into_iter().for_each(|(name, value)| {
458            set_default_values("".to_owned(), name, value, &mut configs);
459        });
460
461        let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
462            + "This page is automatically generated by `./risedev generate-example-config`\n";
463        for (section, configs) in configs {
464            if configs.is_empty() {
465                continue;
466            }
467            markdown.push_str(&format!("\n## {}\n\n", section));
468            markdown.push_str("| Config | Description | Default |\n");
469            markdown.push_str("|--------|-------------|---------|\n");
470            for (config, doc) in configs {
471                markdown.push_str(&format!(
472                    "| {} | {} | {} |\n",
473                    config, doc.desc, doc.default
474                ));
475            }
476        }
477        markdown
478    }
479
480    fn set_default_values(
481        section: String,
482        name: String,
483        value: toml::Value,
484        configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
485    ) {
486        // Set the default value if it's a config name-value pair, otherwise it's a sub-section (Table) that should be recursively processed.
487        if let toml::Value::Table(table) = value {
488            let section_configs: BTreeMap<String, toml::Value> =
489                table.clone().into_iter().collect();
490            let sub_section = if section.is_empty() {
491                name
492            } else {
493                format!("{}.{}", section, name)
494            };
495            section_configs
496                .into_iter()
497                .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
498        } else if let Some(t) = configs.get_mut(&section)
499            && let Some(item_doc) = t.get_mut(&name)
500        {
501            item_doc.default = format!("{}", value);
502        }
503    }
504
505    #[test]
506    fn test_object_store_configs_backward_compatibility() {
507        // Define configs with the old name and make sure it still works
508        {
509            let config: RwConfig = toml::from_str(
510                r#"
511            [storage.object_store]
512            object_store_set_atomic_write_dir = true
513
514            [storage.object_store.s3]
515            object_store_keepalive_ms = 1
516            object_store_send_buffer_size = 1
517            object_store_recv_buffer_size = 1
518            object_store_nodelay = false
519
520            [storage.object_store.s3.developer]
521            object_store_retry_unknown_service_error = true
522            object_store_retryable_service_error_codes = ['dummy']
523
524
525            "#,
526            )
527            .unwrap();
528
529            assert!(config.storage.object_store.set_atomic_write_dir);
530            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
531            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
532            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
533            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
534            assert!(
535                config
536                    .storage
537                    .object_store
538                    .s3
539                    .developer
540                    .retry_unknown_service_error
541            );
542            assert_eq!(
543                config
544                    .storage
545                    .object_store
546                    .s3
547                    .developer
548                    .retryable_service_error_codes,
549                vec!["dummy".to_owned()]
550            );
551        }
552
553        // Define configs with the new name and make sure it works
554        {
555            let config: RwConfig = toml::from_str(
556                r#"
557            [storage.object_store]
558            set_atomic_write_dir = true
559
560            [storage.object_store.s3]
561            keepalive_ms = 1
562            send_buffer_size = 1
563            recv_buffer_size = 1
564            nodelay = false
565
566            [storage.object_store.s3.developer]
567            retry_unknown_service_error = true
568            retryable_service_error_codes = ['dummy']
569
570
571            "#,
572            )
573            .unwrap();
574
575            assert!(config.storage.object_store.set_atomic_write_dir);
576            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
577            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
578            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
579            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
580            assert!(
581                config
582                    .storage
583                    .object_store
584                    .s3
585                    .developer
586                    .retry_unknown_service_error
587            );
588            assert_eq!(
589                config
590                    .storage
591                    .object_store
592                    .s3
593                    .developer
594                    .retryable_service_error_codes,
595                vec!["dummy".to_owned()]
596            );
597        }
598    }
599
600    #[test]
601    fn test_meta_configs_backward_compatibility() {
602        // Test periodic_space_reclaim_compaction_interval_sec
603        {
604            let config: RwConfig = toml::from_str(
605                r#"
606            [meta]
607            periodic_split_compact_group_interval_sec = 1
608            table_write_throughput_threshold = 10
609            min_table_split_write_throughput = 5
610            "#,
611            )
612            .unwrap();
613
614            assert_eq!(
615                config
616                    .meta
617                    .periodic_scheduling_compaction_group_split_interval_sec,
618                1
619            );
620            assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
621            assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
622        }
623    }
624}