Skip to main content

risingwave_common/config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This pub module defines the structure of the configuration file `risingwave.toml`.
16//!
17//! [`RwConfig`] corresponds to the whole config file and each other config struct corresponds to a
18//! section in `risingwave.toml`.
19
20pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{
28    CheckpointCompression, CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig,
29    MetaStoreConfig,
30};
31pub mod streaming;
32pub use streaming::{AsyncStackTraceOption, StreamingConfig};
33pub mod server;
34pub use server::{HeapProfilingConfig, ServerConfig};
35
36pub use crate::session_config::SessionInitConfig;
37pub mod udf;
38pub use udf::UdfConfig;
39pub mod storage;
40pub use storage::{
41    CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
42    extract_storage_memory_config,
43};
44pub mod merge;
45pub mod mutate;
46pub mod none_as_empty_string;
47pub mod system;
48pub mod utils;
49
50use std::collections::BTreeMap;
51use std::fs;
52use std::num::NonZeroUsize;
53
54use anyhow::Context;
55use clap::ValueEnum;
56use educe::Educe;
57pub use merge::*;
58use risingwave_common_proc_macro::ConfigDoc;
59pub use risingwave_common_proc_macro::OverrideConfig;
60use risingwave_pb::meta::SystemParams;
61use serde::{Deserialize, Serialize, Serializer};
62use serde_default::DefaultFromSerde;
63use serde_json::Value;
64pub use system::SystemConfig;
65pub use utils::*;
66
67use crate::for_all_params;
68
69/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
70/// streams on the same connection.
71pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
72/// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange,
73/// as we don't rely on this for back-pressure.
74pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB
75
76/// [`RwConfig`] corresponds to the whole config file `risingwave.toml`. Each field corresponds to a
77/// section.
78#[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
79#[educe(Debug)]
80pub struct RwConfig {
81    #[serde(default)]
82    #[config_doc(nested)]
83    pub server: ServerConfig,
84
85    #[serde(default)]
86    #[config_doc(nested)]
87    pub meta: MetaConfig,
88
89    #[serde(default)]
90    #[config_doc(nested)]
91    pub batch: BatchConfig,
92
93    #[serde(default)]
94    #[config_doc(nested)]
95    pub frontend: FrontendConfig,
96
97    #[serde(default)]
98    #[config_doc(nested)]
99    pub streaming: StreamingConfig,
100
101    #[serde(default)]
102    #[config_doc(nested)]
103    pub storage: StorageConfig,
104
105    #[serde(default)]
106    #[educe(Debug(ignore))]
107    #[config_doc(nested)]
108    pub system: SystemConfig,
109
110    #[serde(default)]
111    #[config_doc(nested)]
112    pub udf: UdfConfig,
113
114    #[serde(default)]
115    #[config_doc(nested)]
116    pub session_init: SessionInitConfig,
117
118    #[serde(flatten)]
119    #[config_doc(omitted)]
120    pub unrecognized: Unrecognized<Self>,
121}
122
123/// `[meta.developer.meta_compute_client_config]`
124/// `[meta.developer.meta_stream_client_config]`
125/// `[meta.developer.meta_frontend_client_config]`
126/// `[batch.developer.batch_compute_client_config]`
127/// `[batch.developer.batch_frontend_client_config]`
128/// `[streaming.developer.stream_compute_client_config]`
129#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
130#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
131pub struct RpcClientConfig {
132    #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
133    pub connect_timeout_secs: u64,
134    /// Maximum concurrency when setting up an RPC client pool.
135    /// Set to 0 to keep the previous unlimited behavior.
136    #[serde(default = "default::developer::rpc_client_pool_setup_concurrency")]
137    pub pool_setup_concurrency: usize,
138}
139
140pub use risingwave_common_metrics::MetricLevel;
141
142impl RwConfig {
143    pub const fn default_connection_pool_size(&self) -> u16 {
144        self.server.connection_pool_size
145    }
146
147    /// Returns [`streaming::StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
148    /// otherwise [`ServerConfig::connection_pool_size`].
149    pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
150        self.streaming
151            .developer
152            .exchange_connection_pool_size
153            .unwrap_or_else(|| self.default_connection_pool_size())
154    }
155
156    /// Returns [`batch::BatchDeveloperConfig::exchange_connection_pool_size`] if set,
157    /// otherwise [`ServerConfig::connection_pool_size`].
158    pub fn batch_exchange_connection_pool_size(&self) -> u16 {
159        self.batch
160            .developer
161            .exchange_connection_pool_size
162            .unwrap_or_else(|| self.default_connection_pool_size())
163    }
164}
165
166pub mod default {
167
168    pub mod developer {
169        pub fn meta_cached_traces_num() -> u32 {
170            256
171        }
172
173        pub fn meta_cached_traces_memory_limit_bytes() -> usize {
174            1 << 27 // 128 MiB
175        }
176
177        pub fn batch_output_channel_size() -> usize {
178            64
179        }
180
181        pub fn batch_receiver_channel_size() -> usize {
182            1000
183        }
184
185        pub fn batch_root_stage_channel_size() -> usize {
186            100
187        }
188
189        pub fn batch_chunk_size() -> usize {
190            1024
191        }
192
193        pub fn batch_local_execute_buffer_size() -> usize {
194            64
195        }
196
197        /// Default to unset to be compatible with the behavior before this config is introduced,
198        /// that is, follow the value of `server.connection_pool_size`.
199        pub fn batch_exchange_connection_pool_size() -> Option<u16> {
200            None
201        }
202
203        pub fn stream_enable_executor_row_count() -> bool {
204            false
205        }
206
207        pub fn connector_message_buffer_size() -> usize {
208            16
209        }
210
211        pub fn unsafe_stream_extreme_cache_size() -> usize {
212            10
213        }
214
215        pub fn stream_topn_cache_min_capacity() -> usize {
216            10
217        }
218
219        pub fn stream_chunk_size() -> usize {
220            256
221        }
222
223        pub fn stream_exchange_initial_permits() -> usize {
224            2048
225        }
226
227        pub fn stream_exchange_batched_permits() -> usize {
228            256
229        }
230
231        pub fn stream_exchange_concurrent_barriers() -> usize {
232            1
233        }
234
235        pub fn stream_exchange_concurrent_dispatchers() -> usize {
236            0
237        }
238
239        pub fn stream_dml_channel_initial_permits() -> usize {
240            32768
241        }
242
243        pub fn stream_max_barrier_batch_size() -> u32 {
244            1024
245        }
246
247        pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
248            64 << 20 // 64MB
249        }
250
251        pub fn enable_trivial_move() -> bool {
252            true
253        }
254
255        pub fn enable_check_task_level_overlap() -> bool {
256            false
257        }
258
259        pub fn max_trivial_move_task_count_per_loop() -> usize {
260            256
261        }
262
263        pub fn max_get_task_probe_times() -> usize {
264            5
265        }
266
267        pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
268            100
269        }
270
271        pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
272            400
273        }
274
275        pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
276            10_000
277        }
278
279        pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
280            100
281        }
282
283        pub fn time_travel_vacuum_interval_sec() -> u64 {
284            30
285        }
286
287        pub fn time_travel_vacuum_max_version_count() -> Option<u32> {
288            Some(10000)
289        }
290
291        pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
292            1000
293        }
294
295        pub fn hummock_time_travel_delta_fetch_batch_size() -> usize {
296            100
297        }
298
299        pub fn hummock_gc_history_insert_batch_size() -> usize {
300            1000
301        }
302
303        pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
304            1000
305        }
306
307        pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
308            false
309        }
310
311        pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
312            10
313        }
314
315        pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
316            1000
317        }
318
319        pub fn memory_controller_threshold_aggressive() -> f64 {
320            0.9
321        }
322
323        pub fn memory_controller_threshold_graceful() -> f64 {
324            0.81
325        }
326
327        pub fn memory_controller_threshold_stable() -> f64 {
328            0.72
329        }
330
331        pub fn memory_controller_eviction_factor_aggressive() -> f64 {
332            2.0
333        }
334
335        pub fn memory_controller_eviction_factor_graceful() -> f64 {
336            1.5
337        }
338
339        pub fn memory_controller_eviction_factor_stable() -> f64 {
340            1.0
341        }
342
343        pub fn memory_controller_update_interval_ms() -> usize {
344            100
345        }
346
347        pub fn memory_controller_sequence_tls_step() -> u64 {
348            128
349        }
350
351        pub fn memory_controller_sequence_tls_lag() -> u64 {
352            32
353        }
354
355        pub fn stream_enable_arrangement_backfill() -> bool {
356            true
357        }
358
359        pub fn stream_enable_snapshot_backfill() -> bool {
360            true
361        }
362
363        pub fn enable_shared_source() -> bool {
364            true
365        }
366
367        pub fn stream_high_join_amplification_threshold() -> usize {
368            2048
369        }
370
371        pub fn stream_high_gap_fill_amplification_threshold() -> usize {
372            2048
373        }
374
375        /// Default to 1 to be compatible with the behavior before this config is introduced.
376        pub fn stream_exchange_connection_pool_size() -> Option<u16> {
377            Some(1)
378        }
379
380        pub fn enable_actor_tokio_metrics() -> bool {
381            true
382        }
383
384        pub fn stream_enable_auto_schema_change() -> bool {
385            true
386        }
387
388        pub fn switch_jdbc_pg_to_native() -> bool {
389            false
390        }
391
392        pub fn streaming_hash_join_entry_state_max_rows() -> usize {
393            // NOTE(kwannoel): This is just an arbitrary number.
394            30000
395        }
396
397        pub fn streaming_join_hash_map_evict_interval_rows() -> u32 {
398            16
399        }
400
401        pub fn streaming_now_progress_ratio() -> Option<f32> {
402            None
403        }
404
405        pub fn stream_snapshot_iter_rebuild_interval_secs() -> u64 {
406            10 * 60
407        }
408
409        pub fn enable_explain_analyze_stats() -> bool {
410            true
411        }
412
413        pub fn rpc_client_connect_timeout_secs() -> u64 {
414            5
415        }
416
417        pub fn rpc_client_pool_setup_concurrency() -> usize {
418            0
419        }
420
421        pub fn iceberg_list_interval_sec() -> u64 {
422            10
423        }
424
425        pub fn iceberg_fetch_batch_size() -> u64 {
426            1024
427        }
428
429        pub fn iceberg_sink_positional_delete_cache_size() -> usize {
430            1024
431        }
432
433        pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
434            100_000
435        }
436
437        pub fn materialize_force_overwrite_on_no_check() -> bool {
438            false
439        }
440
441        pub fn refresh_scheduler_interval_sec() -> u64 {
442            60
443        }
444
445        pub fn sync_log_store_pause_duration_ms() -> usize {
446            64
447        }
448
449        pub fn sync_log_store_buffer_size() -> usize {
450            2048
451        }
452
453        pub fn disable_sync_log_store_dispatcher() -> bool {
454            false
455        }
456
457        pub fn table_change_log_insert_batch_size() -> u64 {
458            1000
459        }
460
461        pub fn table_change_log_delete_batch_size() -> u64 {
462            1000
463        }
464
465        pub fn enable_state_table_vnode_stats_pruning() -> bool {
466            false
467        }
468
469        pub fn enable_vnode_key_stats_for_materialize() -> bool {
470            false
471        }
472
473        pub fn max_concurrent_kv_log_store_historical_read() -> usize {
474            0
475        }
476    }
477}
478
479pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
480pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
481pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.
482
483#[cfg(test)]
484pub mod tests {
485    use expect_test::expect;
486    use risingwave_license::LicenseKey;
487
488    use super::*;
489
490    fn default_config_for_docs() -> RwConfig {
491        let mut config = RwConfig::default();
492        // Set `license_key` to empty in the docs to avoid any confusion.
493        config.system.license_key = Some(LicenseKey::empty());
494        // Keep generated docs and example config aligned with the production-safe default.
495        config.frontend.unsafe_enable_local_fs_connector = false;
496        config
497    }
498
499    /// This test ensures that `config/example.toml` is up-to-date with the default values specified
500    /// in this file. Developer should run `./risedev generate-example-config` to update it if this
501    /// test fails.
502    #[test]
503    fn test_example_up_to_date() {
504        const HEADER: &str = "# This file is generated by ./risedev generate-example-config
505# Check detailed comments in src/common/src/config.rs";
506
507        let actual = expect_test::expect_file!["../../../config/example.toml"];
508        let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
509
510        let expected = format!("{HEADER}\n\n{default}");
511        actual.assert_eq(&expected);
512
513        let expected = rw_config_to_markdown();
514        let actual = expect_test::expect_file!["../../../config/docs.md"];
515        actual.assert_eq(&expected);
516    }
517
518    #[test]
519    fn test_session_init_entries_distinguishes_omitted_from_default() {
520        let config: RwConfig = toml::from_str(
521            r#"
522            [session_init]
523            streaming_parallelism = "bounded(8)"
524            streaming_parallelism_for_table = "default"
525            "#,
526        )
527        .unwrap();
528
529        // Omitted fields are `None`; an explicit `default` is `Some("default")`.
530        assert_eq!(
531            config.session_init.streaming_parallelism.as_deref(),
532            Some("bounded(8)")
533        );
534        assert_eq!(
535            config
536                .session_init
537                .streaming_parallelism_for_table
538                .as_deref(),
539            Some("default")
540        );
541        assert_eq!(config.session_init.streaming_parallelism_for_sink, None);
542
543        // Only explicitly-configured parameters are reported, by their session parameter name.
544        assert_eq!(
545            config.session_init.entries(),
546            vec![
547                ("streaming_parallelism", "bounded(8)"),
548                ("streaming_parallelism_for_table", "default"),
549            ]
550        );
551    }
552
553    #[test]
554    fn test_session_init_default_is_empty() {
555        assert!(SessionInitConfig::default().entries().is_empty());
556    }
557
558    #[test]
559    fn test_session_init_rejects_unrecognized_key() {
560        let err = toml::from_str::<RwConfig>(
561            r#"
562            [session_init]
563            streaming_parallelism = "bounded(8)"
564            not_a_real_param = "oops"
565            "#,
566        )
567        .unwrap_err();
568
569        assert!(err.to_string().contains("unknown field `not_a_real_param`"));
570    }
571
572    #[derive(Debug)]
573    struct ConfigItemDoc {
574        desc: String,
575        default: String,
576    }
577
578    fn rw_config_to_markdown() -> String {
579        let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
580        RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
581
582        // Section -> Config Name -> ConfigItemDoc
583        let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
584            .into_iter()
585            .map(|(k, v)| {
586                let docs: BTreeMap<String, ConfigItemDoc> = v
587                    .into_iter()
588                    .map(|(name, desc)| {
589                        (
590                            name,
591                            ConfigItemDoc {
592                                desc,
593                                default: "".to_owned(), // unset
594                            },
595                        )
596                    })
597                    .collect();
598                (k, docs)
599            })
600            .collect();
601
602        let toml_doc: BTreeMap<String, toml::Value> =
603            toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
604        toml_doc.into_iter().for_each(|(name, value)| {
605            set_default_values("".to_owned(), name, value, &mut configs);
606        });
607
608        let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
609            + "This page is automatically generated by `./risedev generate-example-config`\n";
610        for (section, configs) in configs {
611            if configs.is_empty() {
612                continue;
613            }
614            markdown.push_str(&format!("\n## {}\n\n", section));
615            markdown.push_str("| Config | Description | Default |\n");
616            markdown.push_str("|--------|-------------|---------|\n");
617            for (config, doc) in configs {
618                markdown.push_str(&format!(
619                    "| {} | {} | {} |\n",
620                    config, doc.desc, doc.default
621                ));
622            }
623        }
624        markdown
625    }
626
627    fn set_default_values(
628        section: String,
629        name: String,
630        value: toml::Value,
631        configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
632    ) {
633        // Set the default value if it's a config name-value pair, otherwise it's a sub-section (Table) that should be recursively processed.
634        if let toml::Value::Table(table) = value {
635            let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
636            let sub_section = if section.is_empty() {
637                name
638            } else {
639                format!("{}.{}", section, name)
640            };
641            section_configs
642                .into_iter()
643                .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
644        } else if let Some(t) = configs.get_mut(&section)
645            && let Some(item_doc) = t.get_mut(&name)
646        {
647            item_doc.default = format!("{}", value);
648        }
649    }
650
651    #[test]
652    fn test_object_store_configs_backward_compatibility() {
653        // Define configs with the old name and make sure it still works
654        {
655            let config: RwConfig = toml::from_str(
656                r#"
657            [storage.object_store]
658            object_store_set_atomic_write_dir = true
659
660            [storage.object_store.s3]
661            object_store_keepalive_ms = 1
662            object_store_send_buffer_size = 1
663            object_store_recv_buffer_size = 1
664            object_store_nodelay = false
665
666            [storage.object_store.s3.developer]
667            object_store_retry_unknown_service_error = true
668            object_store_retryable_service_error_codes = ['dummy']
669
670
671            "#,
672            )
673            .unwrap();
674
675            assert!(config.storage.object_store.set_atomic_write_dir);
676            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
677            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
678            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
679            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
680            assert!(
681                config
682                    .storage
683                    .object_store
684                    .s3
685                    .developer
686                    .retry_unknown_service_error
687            );
688            assert_eq!(
689                config
690                    .storage
691                    .object_store
692                    .s3
693                    .developer
694                    .retryable_service_error_codes,
695                vec!["dummy".to_owned()]
696            );
697        }
698
699        // Define configs with the new name and make sure it works
700        {
701            let config: RwConfig = toml::from_str(
702                r#"
703            [storage.object_store]
704            set_atomic_write_dir = true
705
706            [storage.object_store.s3]
707            keepalive_ms = 1
708            send_buffer_size = 1
709            recv_buffer_size = 1
710            nodelay = false
711
712            [storage.object_store.s3.developer]
713            retry_unknown_service_error = true
714            retryable_service_error_codes = ['dummy']
715
716
717            "#,
718            )
719            .unwrap();
720
721            assert!(config.storage.object_store.set_atomic_write_dir);
722            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
723            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
724            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
725            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
726            assert!(
727                config
728                    .storage
729                    .object_store
730                    .s3
731                    .developer
732                    .retry_unknown_service_error
733            );
734            assert_eq!(
735                config
736                    .storage
737                    .object_store
738                    .s3
739                    .developer
740                    .retryable_service_error_codes,
741                vec!["dummy".to_owned()]
742            );
743        }
744    }
745
746    #[test]
747    fn test_meta_configs_backward_compatibility() {
748        // Test periodic_space_reclaim_compaction_interval_sec
749        {
750            let config: RwConfig = toml::from_str(
751                r#"
752            [meta]
753            periodic_split_compact_group_interval_sec = 1
754            table_write_throughput_threshold = 10
755            min_table_split_write_throughput = 5
756            "#,
757            )
758            .unwrap();
759
760            assert_eq!(
761                config
762                    .meta
763                    .periodic_scheduling_compaction_group_split_interval_sec,
764                1
765            );
766            assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
767            assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
768        }
769    }
770
771    #[test]
772    fn test_meta_max_normalize_splits_per_round_must_be_positive() {
773        let config = toml::from_str::<RwConfig>(
774            r#"
775            [meta]
776            max_normalize_splits_per_round = 0
777            "#,
778        )
779        .unwrap_err();
780
781        expect![[r#"
782            TOML parse error at line 3, column 46
783              |
784            3 |             max_normalize_splits_per_round = 0
785              |                                              ^
786            meta.max_normalize_splits_per_round must be greater than 0
787        "#]]
788        .assert_eq(&config.to_string());
789    }
790
791    // Previously, we have prefixes like `stream_` for all configs under `streaming.developer`.
792    // Later we removed the prefixes, but we still want to guarantee the backward compatibility.
793    #[test]
794    fn test_prefix_alias() {
795        let config: RwConfig = toml::from_str(
796            "
797            [streaming.developer]
798            stream_chunk_size = 114514
799
800            [streaming.developer.stream_compute_client_config]
801            connect_timeout_secs = 42
802            pool_setup_concurrency = 10
803            ",
804        )
805        .unwrap();
806
807        assert_eq!(config.streaming.developer.chunk_size, 114514);
808        assert_eq!(
809            config
810                .streaming
811                .developer
812                .compute_client_config
813                .connect_timeout_secs,
814            42
815        );
816        assert_eq!(
817            config
818                .streaming
819                .developer
820                .compute_client_config
821                .pool_setup_concurrency,
822            10
823        );
824    }
825
826    #[test]
827    fn test_prefix_alias_duplicate() {
828        let config = toml::from_str::<RwConfig>(
829            "
830            [streaming.developer]
831            stream_chunk_size = 114514
832            chunk_size = 1919810
833            ",
834        )
835        .unwrap_err();
836
837        expect![[r#"
838            TOML parse error at line 2, column 13
839              |
840            2 |             [streaming.developer]
841              |             ^^^^^^^^^^^^^^^^^^^^^
842            duplicate field `chunk_size`
843        "#]]
844        .assert_eq(&config.to_string());
845
846        let config = toml::from_str::<RwConfig>(
847            "
848            [streaming.developer.stream_compute_client_config]
849            connect_timeout_secs = 5
850
851            [streaming.developer.compute_client_config]
852            connect_timeout_secs = 10
853            ",
854        )
855        .unwrap_err();
856
857        expect![[r#"
858            TOML parse error at line 2, column 24
859              |
860            2 |             [streaming.developer.stream_compute_client_config]
861              |                        ^^^^^^^^^
862            duplicate field `compute_client_config`
863        "#]]
864        .assert_eq(&config.to_string());
865    }
866
867    #[test]
868    fn test_storage_max_prefetch_block_number_must_be_positive() {
869        let config = toml::from_str::<RwConfig>(
870            r#"
871            [storage]
872            max_prefetch_block_number = 0
873            "#,
874        )
875        .unwrap_err();
876
877        expect![[r#"
878            TOML parse error at line 3, column 41
879              |
880            3 |             max_prefetch_block_number = 0
881              |                                         ^
882            storage.max_prefetch_block_number must be greater than 0
883        "#]]
884        .assert_eq(&config.to_string());
885    }
886
887    #[test]
888    fn test_iceberg_compaction_enable_prefetch_default_is_false() {
889        let config = StorageConfig::default();
890        assert!(
891            !config.iceberg_compaction_enable_prefetch,
892            "enable_prefetch must default to false to avoid unexpected memory usage in existing deployments"
893        );
894    }
895
896    #[test]
897    fn test_storage_iceberg_compaction_pull_interval_ms_must_be_positive() {
898        let config = toml::from_str::<RwConfig>(
899            r#"
900            [storage]
901            iceberg_compaction_pull_interval_ms = 0
902            "#,
903        )
904        .unwrap_err();
905
906        expect![[r#"
907            TOML parse error at line 3, column 51
908              |
909            3 |             iceberg_compaction_pull_interval_ms = 0
910              |                                                   ^
911            storage.iceberg_compaction_pull_interval_ms must be greater than 0
912        "#]]
913        .assert_eq(&config.to_string());
914    }
915}