risingwave_common/config/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This pub module defines the structure of the configuration file `risingwave.toml`.
16//!
17//! [`RwConfig`] corresponds to the whole config file and each other config struct corresponds to a
18//! section in `risingwave.toml`.
19
20pub mod batch;
21pub use batch::BatchConfig;
22pub mod frontend;
23pub use frontend::FrontendConfig;
24pub mod hba;
25pub use hba::{AddressPattern, AuthMethod, ConnectionType, HbaConfig, HbaEntry};
26pub mod meta;
27pub use meta::{
28    CheckpointCompression, CompactionConfig, DefaultParallelism, MetaBackend, MetaConfig,
29    MetaStoreConfig,
30};
31pub mod streaming;
32pub use streaming::{AsyncStackTraceOption, StreamingConfig};
33pub mod server;
34pub use server::{HeapProfilingConfig, ServerConfig};
35
36pub use crate::session_config::SessionInitConfig;
37pub mod udf;
38pub use udf::UdfConfig;
39pub mod storage;
40pub use storage::{
41    CacheEvictionConfig, EvictionConfig, ObjectStoreConfig, StorageConfig, StorageMemoryConfig,
42    extract_storage_memory_config,
43};
44pub mod merge;
45pub mod mutate;
46pub mod none_as_empty_string;
47pub mod system;
48pub mod utils;
49
50use std::collections::BTreeMap;
51use std::fs;
52use std::num::NonZeroUsize;
53
54use anyhow::Context;
55use clap::ValueEnum;
56use educe::Educe;
57pub use merge::*;
58use risingwave_common_proc_macro::ConfigDoc;
59pub use risingwave_common_proc_macro::OverrideConfig;
60use risingwave_pb::meta::SystemParams;
61use serde::{Deserialize, Serialize, Serializer};
62use serde_default::DefaultFromSerde;
63use serde_json::Value;
64pub use system::SystemConfig;
65pub use utils::*;
66
67use crate::for_all_params;
68
69/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
70/// streams on the same connection.
71pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
72/// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange,
73/// as we don't rely on this for back-pressure.
74pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB
75
76/// [`RwConfig`] corresponds to the whole config file `risingwave.toml`. Each field corresponds to a
77/// section.
78#[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
79#[educe(Debug)]
80pub struct RwConfig {
81    #[serde(default)]
82    #[config_doc(nested)]
83    pub server: ServerConfig,
84
85    #[serde(default)]
86    #[config_doc(nested)]
87    pub meta: MetaConfig,
88
89    #[serde(default)]
90    #[config_doc(nested)]
91    pub batch: BatchConfig,
92
93    #[serde(default)]
94    #[config_doc(nested)]
95    pub frontend: FrontendConfig,
96
97    #[serde(default)]
98    #[config_doc(nested)]
99    pub streaming: StreamingConfig,
100
101    #[serde(default)]
102    #[config_doc(nested)]
103    pub storage: StorageConfig,
104
105    #[serde(default)]
106    #[educe(Debug(ignore))]
107    #[config_doc(nested)]
108    pub system: SystemConfig,
109
110    #[serde(default)]
111    #[config_doc(nested)]
112    pub udf: UdfConfig,
113
114    #[serde(default)]
115    #[config_doc(nested)]
116    pub session_init: SessionInitConfig,
117
118    #[serde(flatten)]
119    #[config_doc(omitted)]
120    pub unrecognized: Unrecognized<Self>,
121}
122
123/// `[meta.developer.meta_compute_client_config]`
124/// `[meta.developer.meta_stream_client_config]`
125/// `[meta.developer.meta_frontend_client_config]`
126/// `[batch.developer.batch_compute_client_config]`
127/// `[batch.developer.batch_frontend_client_config]`
128/// `[streaming.developer.stream_compute_client_config]`
129#[serde_with::apply(Option => #[serde(with = "none_as_empty_string")])]
130#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
131pub struct RpcClientConfig {
132    #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
133    pub connect_timeout_secs: u64,
134    /// Maximum concurrency when setting up an RPC client pool.
135    /// Set to 0 to keep the previous unlimited behavior.
136    #[serde(default = "default::developer::rpc_client_pool_setup_concurrency")]
137    pub pool_setup_concurrency: usize,
138}
139
140pub use risingwave_common_metrics::MetricLevel;
141
142impl RwConfig {
143    pub const fn default_connection_pool_size(&self) -> u16 {
144        self.server.connection_pool_size
145    }
146
147    /// Returns [`streaming::StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
148    /// otherwise [`ServerConfig::connection_pool_size`].
149    pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
150        self.streaming
151            .developer
152            .exchange_connection_pool_size
153            .unwrap_or_else(|| self.default_connection_pool_size())
154    }
155
156    /// Returns [`batch::BatchDeveloperConfig::exchange_connection_pool_size`] if set,
157    /// otherwise [`ServerConfig::connection_pool_size`].
158    pub fn batch_exchange_connection_pool_size(&self) -> u16 {
159        self.batch
160            .developer
161            .exchange_connection_pool_size
162            .unwrap_or_else(|| self.default_connection_pool_size())
163    }
164}
165
166pub mod default {
167
168    pub mod developer {
169        pub fn meta_cached_traces_num() -> u32 {
170            256
171        }
172
173        pub fn meta_cached_traces_memory_limit_bytes() -> usize {
174            1 << 27 // 128 MiB
175        }
176
177        pub fn batch_output_channel_size() -> usize {
178            64
179        }
180
181        pub fn batch_receiver_channel_size() -> usize {
182            1000
183        }
184
185        pub fn batch_root_stage_channel_size() -> usize {
186            100
187        }
188
189        pub fn batch_chunk_size() -> usize {
190            1024
191        }
192
193        pub fn batch_local_execute_buffer_size() -> usize {
194            64
195        }
196
197        /// Default to unset to be compatible with the behavior before this config is introduced,
198        /// that is, follow the value of `server.connection_pool_size`.
199        pub fn batch_exchange_connection_pool_size() -> Option<u16> {
200            None
201        }
202
203        pub fn stream_enable_executor_row_count() -> bool {
204            false
205        }
206
207        pub fn connector_message_buffer_size() -> usize {
208            16
209        }
210
211        pub fn unsafe_stream_extreme_cache_size() -> usize {
212            10
213        }
214
215        pub fn stream_topn_cache_min_capacity() -> usize {
216            10
217        }
218
219        pub fn stream_chunk_size() -> usize {
220            256
221        }
222
223        pub fn stream_exchange_initial_permits() -> usize {
224            2048
225        }
226
227        pub fn stream_exchange_batched_permits() -> usize {
228            256
229        }
230
231        pub fn stream_exchange_concurrent_barriers() -> usize {
232            1
233        }
234
235        pub fn stream_exchange_concurrent_dispatchers() -> usize {
236            0
237        }
238
239        pub fn stream_dml_channel_initial_permits() -> usize {
240            32768
241        }
242
243        pub fn stream_max_barrier_batch_size() -> u32 {
244            1024
245        }
246
247        pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
248            64 << 20 // 64MB
249        }
250
251        pub fn enable_trivial_move() -> bool {
252            true
253        }
254
255        pub fn enable_check_task_level_overlap() -> bool {
256            false
257        }
258
259        pub fn max_trivial_move_task_count_per_loop() -> usize {
260            256
261        }
262
263        pub fn max_get_task_probe_times() -> usize {
264            5
265        }
266
267        pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
268            100
269        }
270
271        pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
272            400
273        }
274
275        pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
276            10_000
277        }
278
279        pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
280            100
281        }
282
283        pub fn time_travel_vacuum_interval_sec() -> u64 {
284            30
285        }
286
287        pub fn time_travel_vacuum_max_version_count() -> Option<u32> {
288            Some(10000)
289        }
290
291        pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
292            1000
293        }
294
295        pub fn hummock_time_travel_delta_fetch_batch_size() -> usize {
296            100
297        }
298
299        pub fn hummock_gc_history_insert_batch_size() -> usize {
300            1000
301        }
302
303        pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
304            1000
305        }
306
307        pub fn hummock_time_travel_filter_out_objects_v1() -> bool {
308            false
309        }
310
311        pub fn hummock_time_travel_filter_out_objects_list_version_batch_size() -> usize {
312            10
313        }
314
315        pub fn hummock_time_travel_filter_out_objects_list_delta_batch_size() -> usize {
316            1000
317        }
318
319        pub fn memory_controller_threshold_aggressive() -> f64 {
320            0.9
321        }
322
323        pub fn memory_controller_threshold_graceful() -> f64 {
324            0.81
325        }
326
327        pub fn memory_controller_threshold_stable() -> f64 {
328            0.72
329        }
330
331        pub fn memory_controller_eviction_factor_aggressive() -> f64 {
332            2.0
333        }
334
335        pub fn memory_controller_eviction_factor_graceful() -> f64 {
336            1.5
337        }
338
339        pub fn memory_controller_eviction_factor_stable() -> f64 {
340            1.0
341        }
342
343        pub fn memory_controller_update_interval_ms() -> usize {
344            100
345        }
346
347        pub fn memory_controller_sequence_tls_step() -> u64 {
348            128
349        }
350
351        pub fn memory_controller_sequence_tls_lag() -> u64 {
352            32
353        }
354
355        pub fn stream_enable_arrangement_backfill() -> bool {
356            true
357        }
358
359        pub fn stream_enable_snapshot_backfill() -> bool {
360            true
361        }
362
363        pub fn enable_shared_source() -> bool {
364            true
365        }
366
367        pub fn stream_high_join_amplification_threshold() -> usize {
368            2048
369        }
370
371        /// Default to 1 to be compatible with the behavior before this config is introduced.
372        pub fn stream_exchange_connection_pool_size() -> Option<u16> {
373            Some(1)
374        }
375
376        pub fn enable_actor_tokio_metrics() -> bool {
377            true
378        }
379
380        pub fn stream_enable_auto_schema_change() -> bool {
381            true
382        }
383
384        pub fn switch_jdbc_pg_to_native() -> bool {
385            false
386        }
387
388        pub fn streaming_hash_join_entry_state_max_rows() -> usize {
389            // NOTE(kwannoel): This is just an arbitrary number.
390            30000
391        }
392
393        pub fn streaming_join_hash_map_evict_interval_rows() -> u32 {
394            16
395        }
396
397        pub fn streaming_now_progress_ratio() -> Option<f32> {
398            None
399        }
400
401        pub fn stream_snapshot_iter_rebuild_interval_secs() -> u64 {
402            10 * 60
403        }
404
405        pub fn enable_explain_analyze_stats() -> bool {
406            true
407        }
408
409        pub fn rpc_client_connect_timeout_secs() -> u64 {
410            5
411        }
412
413        pub fn rpc_client_pool_setup_concurrency() -> usize {
414            0
415        }
416
417        pub fn iceberg_list_interval_sec() -> u64 {
418            10
419        }
420
421        pub fn iceberg_fetch_batch_size() -> u64 {
422            1024
423        }
424
425        pub fn iceberg_sink_positional_delete_cache_size() -> usize {
426            1024
427        }
428
429        pub fn iceberg_sink_write_parquet_max_row_group_rows() -> usize {
430            100_000
431        }
432
433        pub fn materialize_force_overwrite_on_no_check() -> bool {
434            false
435        }
436
437        pub fn refresh_scheduler_interval_sec() -> u64 {
438            60
439        }
440
441        pub fn sync_log_store_pause_duration_ms() -> usize {
442            64
443        }
444
445        pub fn sync_log_store_buffer_size() -> usize {
446            2048
447        }
448
449        pub fn disable_sync_log_store_dispatcher() -> bool {
450            false
451        }
452
453        pub fn table_change_log_insert_batch_size() -> u64 {
454            1000
455        }
456
457        pub fn table_change_log_delete_batch_size() -> u64 {
458            1000
459        }
460
461        pub fn enable_state_table_vnode_stats_pruning() -> bool {
462            false
463        }
464
465        pub fn enable_vnode_key_stats_for_materialize() -> bool {
466            false
467        }
468
469        pub fn max_concurrent_kv_log_store_historical_read() -> usize {
470            0
471        }
472    }
473}
474
475pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
476pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
477pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.
478
479#[cfg(test)]
480pub mod tests {
481    use expect_test::expect;
482    use risingwave_license::LicenseKey;
483
484    use super::*;
485
486    fn default_config_for_docs() -> RwConfig {
487        let mut config = RwConfig::default();
488        // Set `license_key` to empty in the docs to avoid any confusion.
489        config.system.license_key = Some(LicenseKey::empty());
490        config
491    }
492
493    /// This test ensures that `config/example.toml` is up-to-date with the default values specified
494    /// in this file. Developer should run `./risedev generate-example-config` to update it if this
495    /// test fails.
496    #[test]
497    fn test_example_up_to_date() {
498        const HEADER: &str = "# This file is generated by ./risedev generate-example-config
499# Check detailed comments in src/common/src/config.rs";
500
501        let actual = expect_test::expect_file!["../../../config/example.toml"];
502        let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
503
504        let expected = format!("{HEADER}\n\n{default}");
505        actual.assert_eq(&expected);
506
507        let expected = rw_config_to_markdown();
508        let actual = expect_test::expect_file!["../../../config/docs.md"];
509        actual.assert_eq(&expected);
510    }
511
512    #[test]
513    fn test_session_init_entries_distinguishes_omitted_from_default() {
514        let config: RwConfig = toml::from_str(
515            r#"
516            [session_init]
517            streaming_parallelism = "bounded(8)"
518            streaming_parallelism_for_table = "default"
519            "#,
520        )
521        .unwrap();
522
523        // Omitted fields are `None`; an explicit `default` is `Some("default")`.
524        assert_eq!(
525            config.session_init.streaming_parallelism.as_deref(),
526            Some("bounded(8)")
527        );
528        assert_eq!(
529            config
530                .session_init
531                .streaming_parallelism_for_table
532                .as_deref(),
533            Some("default")
534        );
535        assert_eq!(config.session_init.streaming_parallelism_for_sink, None);
536
537        // Only explicitly-configured parameters are reported, by their session parameter name.
538        assert_eq!(
539            config.session_init.entries(),
540            vec![
541                ("streaming_parallelism", "bounded(8)"),
542                ("streaming_parallelism_for_table", "default"),
543            ]
544        );
545    }
546
547    #[test]
548    fn test_session_init_default_is_empty() {
549        assert!(SessionInitConfig::default().entries().is_empty());
550    }
551
552    #[test]
553    fn test_session_init_rejects_unrecognized_key() {
554        let err = toml::from_str::<RwConfig>(
555            r#"
556            [session_init]
557            streaming_parallelism = "bounded(8)"
558            not_a_real_param = "oops"
559            "#,
560        )
561        .unwrap_err();
562
563        assert!(err.to_string().contains("unknown field `not_a_real_param`"));
564    }
565
566    #[derive(Debug)]
567    struct ConfigItemDoc {
568        desc: String,
569        default: String,
570    }
571
572    fn rw_config_to_markdown() -> String {
573        let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
574        RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
575
576        // Section -> Config Name -> ConfigItemDoc
577        let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
578            .into_iter()
579            .map(|(k, v)| {
580                let docs: BTreeMap<String, ConfigItemDoc> = v
581                    .into_iter()
582                    .map(|(name, desc)| {
583                        (
584                            name,
585                            ConfigItemDoc {
586                                desc,
587                                default: "".to_owned(), // unset
588                            },
589                        )
590                    })
591                    .collect();
592                (k, docs)
593            })
594            .collect();
595
596        let toml_doc: BTreeMap<String, toml::Value> =
597            toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
598        toml_doc.into_iter().for_each(|(name, value)| {
599            set_default_values("".to_owned(), name, value, &mut configs);
600        });
601
602        let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
603            + "This page is automatically generated by `./risedev generate-example-config`\n";
604        for (section, configs) in configs {
605            if configs.is_empty() {
606                continue;
607            }
608            markdown.push_str(&format!("\n## {}\n\n", section));
609            markdown.push_str("| Config | Description | Default |\n");
610            markdown.push_str("|--------|-------------|---------|\n");
611            for (config, doc) in configs {
612                markdown.push_str(&format!(
613                    "| {} | {} | {} |\n",
614                    config, doc.desc, doc.default
615                ));
616            }
617        }
618        markdown
619    }
620
621    fn set_default_values(
622        section: String,
623        name: String,
624        value: toml::Value,
625        configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
626    ) {
627        // Set the default value if it's a config name-value pair, otherwise it's a sub-section (Table) that should be recursively processed.
628        if let toml::Value::Table(table) = value {
629            let section_configs: BTreeMap<String, toml::Value> = table.into_iter().collect();
630            let sub_section = if section.is_empty() {
631                name
632            } else {
633                format!("{}.{}", section, name)
634            };
635            section_configs
636                .into_iter()
637                .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
638        } else if let Some(t) = configs.get_mut(&section)
639            && let Some(item_doc) = t.get_mut(&name)
640        {
641            item_doc.default = format!("{}", value);
642        }
643    }
644
645    #[test]
646    fn test_object_store_configs_backward_compatibility() {
647        // Define configs with the old name and make sure it still works
648        {
649            let config: RwConfig = toml::from_str(
650                r#"
651            [storage.object_store]
652            object_store_set_atomic_write_dir = true
653
654            [storage.object_store.s3]
655            object_store_keepalive_ms = 1
656            object_store_send_buffer_size = 1
657            object_store_recv_buffer_size = 1
658            object_store_nodelay = false
659
660            [storage.object_store.s3.developer]
661            object_store_retry_unknown_service_error = true
662            object_store_retryable_service_error_codes = ['dummy']
663
664
665            "#,
666            )
667            .unwrap();
668
669            assert!(config.storage.object_store.set_atomic_write_dir);
670            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
671            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
672            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
673            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
674            assert!(
675                config
676                    .storage
677                    .object_store
678                    .s3
679                    .developer
680                    .retry_unknown_service_error
681            );
682            assert_eq!(
683                config
684                    .storage
685                    .object_store
686                    .s3
687                    .developer
688                    .retryable_service_error_codes,
689                vec!["dummy".to_owned()]
690            );
691        }
692
693        // Define configs with the new name and make sure it works
694        {
695            let config: RwConfig = toml::from_str(
696                r#"
697            [storage.object_store]
698            set_atomic_write_dir = true
699
700            [storage.object_store.s3]
701            keepalive_ms = 1
702            send_buffer_size = 1
703            recv_buffer_size = 1
704            nodelay = false
705
706            [storage.object_store.s3.developer]
707            retry_unknown_service_error = true
708            retryable_service_error_codes = ['dummy']
709
710
711            "#,
712            )
713            .unwrap();
714
715            assert!(config.storage.object_store.set_atomic_write_dir);
716            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
717            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
718            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
719            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
720            assert!(
721                config
722                    .storage
723                    .object_store
724                    .s3
725                    .developer
726                    .retry_unknown_service_error
727            );
728            assert_eq!(
729                config
730                    .storage
731                    .object_store
732                    .s3
733                    .developer
734                    .retryable_service_error_codes,
735                vec!["dummy".to_owned()]
736            );
737        }
738    }
739
740    #[test]
741    fn test_meta_configs_backward_compatibility() {
742        // Test periodic_space_reclaim_compaction_interval_sec
743        {
744            let config: RwConfig = toml::from_str(
745                r#"
746            [meta]
747            periodic_split_compact_group_interval_sec = 1
748            table_write_throughput_threshold = 10
749            min_table_split_write_throughput = 5
750            "#,
751            )
752            .unwrap();
753
754            assert_eq!(
755                config
756                    .meta
757                    .periodic_scheduling_compaction_group_split_interval_sec,
758                1
759            );
760            assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
761            assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
762        }
763    }
764
765    #[test]
766    fn test_meta_max_normalize_splits_per_round_must_be_positive() {
767        let config = toml::from_str::<RwConfig>(
768            r#"
769            [meta]
770            max_normalize_splits_per_round = 0
771            "#,
772        )
773        .unwrap_err();
774
775        expect![[r#"
776            TOML parse error at line 3, column 46
777              |
778            3 |             max_normalize_splits_per_round = 0
779              |                                              ^
780            meta.max_normalize_splits_per_round must be greater than 0
781        "#]]
782        .assert_eq(&config.to_string());
783    }
784
785    // Previously, we have prefixes like `stream_` for all configs under `streaming.developer`.
786    // Later we removed the prefixes, but we still want to guarantee the backward compatibility.
787    #[test]
788    fn test_prefix_alias() {
789        let config: RwConfig = toml::from_str(
790            "
791            [streaming.developer]
792            stream_chunk_size = 114514
793
794            [streaming.developer.stream_compute_client_config]
795            connect_timeout_secs = 42
796            pool_setup_concurrency = 10
797            ",
798        )
799        .unwrap();
800
801        assert_eq!(config.streaming.developer.chunk_size, 114514);
802        assert_eq!(
803            config
804                .streaming
805                .developer
806                .compute_client_config
807                .connect_timeout_secs,
808            42
809        );
810        assert_eq!(
811            config
812                .streaming
813                .developer
814                .compute_client_config
815                .pool_setup_concurrency,
816            10
817        );
818    }
819
820    #[test]
821    fn test_prefix_alias_duplicate() {
822        let config = toml::from_str::<RwConfig>(
823            "
824            [streaming.developer]
825            stream_chunk_size = 114514
826            chunk_size = 1919810
827            ",
828        )
829        .unwrap_err();
830
831        expect![[r#"
832            TOML parse error at line 2, column 13
833              |
834            2 |             [streaming.developer]
835              |             ^^^^^^^^^^^^^^^^^^^^^
836            duplicate field `chunk_size`
837        "#]]
838        .assert_eq(&config.to_string());
839
840        let config = toml::from_str::<RwConfig>(
841            "
842            [streaming.developer.stream_compute_client_config]
843            connect_timeout_secs = 5
844
845            [streaming.developer.compute_client_config]
846            connect_timeout_secs = 10
847            ",
848        )
849        .unwrap_err();
850
851        expect![[r#"
852            TOML parse error at line 2, column 24
853              |
854            2 |             [streaming.developer.stream_compute_client_config]
855              |                        ^^^^^^^^^
856            duplicate field `compute_client_config`
857        "#]]
858        .assert_eq(&config.to_string());
859    }
860
861    #[test]
862    fn test_storage_max_prefetch_block_number_must_be_positive() {
863        let config = toml::from_str::<RwConfig>(
864            r#"
865            [storage]
866            max_prefetch_block_number = 0
867            "#,
868        )
869        .unwrap_err();
870
871        expect![[r#"
872            TOML parse error at line 3, column 41
873              |
874            3 |             max_prefetch_block_number = 0
875              |                                         ^
876            storage.max_prefetch_block_number must be greater than 0
877        "#]]
878        .assert_eq(&config.to_string());
879    }
880
881    #[test]
882    fn test_iceberg_compaction_enable_prefetch_default_is_false() {
883        let config = StorageConfig::default();
884        assert!(
885            !config.iceberg_compaction_enable_prefetch,
886            "enable_prefetch must default to false to avoid unexpected memory usage in existing deployments"
887        );
888    }
889
890    #[test]
891    fn test_storage_iceberg_compaction_pull_interval_ms_must_be_positive() {
892        let config = toml::from_str::<RwConfig>(
893            r#"
894            [storage]
895            iceberg_compaction_pull_interval_ms = 0
896            "#,
897        )
898        .unwrap_err();
899
900        expect![[r#"
901            TOML parse error at line 3, column 51
902              |
903            3 |             iceberg_compaction_pull_interval_ms = 0
904              |                                                   ^
905            storage.iceberg_compaction_pull_interval_ms must be greater than 0
906        "#]]
907        .assert_eq(&config.to_string());
908    }
909}