risingwave_common/
config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module defines the structure of the configuration file `risingwave.toml`.
16//!
17//! [`RwConfig`] corresponds to the whole config file and each other config struct corresponds to a
18//! section in `risingwave.toml`.
19
20use std::collections::BTreeMap;
21use std::fs;
22use std::num::NonZeroUsize;
23
24use anyhow::Context;
25use clap::ValueEnum;
26use educe::Educe;
27use foyer::{Compression, LfuConfig, LruConfig, RecoverMode, RuntimeOptions, S3FifoConfig};
28use risingwave_common_proc_macro::ConfigDoc;
29pub use risingwave_common_proc_macro::OverrideConfig;
30use risingwave_pb::meta::SystemParams;
31use serde::{Deserialize, Serialize, Serializer};
32use serde_default::DefaultFromSerde;
33use serde_json::Value;
34
35use crate::for_all_params;
36
37/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
38/// streams on the same connection.
39pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
40/// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange,
41/// as we don't rely on this for back-pressure.
42pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB
43
44/// Unrecognized fields in a config section. Generic over the config section type to provide better
45/// error messages.
46///
47/// The current implementation will log warnings if there are unrecognized fields.
48#[derive(Educe)]
49#[educe(Clone, Default)]
50pub struct Unrecognized<T: 'static> {
51    inner: BTreeMap<String, Value>,
52    _marker: std::marker::PhantomData<&'static T>,
53}
54
55impl<T> std::fmt::Debug for Unrecognized<T> {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        self.inner.fmt(f)
58    }
59}
60
61impl<T> Unrecognized<T> {
62    /// Returns all unrecognized fields as a map.
63    pub fn into_inner(self) -> BTreeMap<String, Value> {
64        self.inner
65    }
66}
67
68impl<'de, T> Deserialize<'de> for Unrecognized<T> {
69    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
70    where
71        D: serde::Deserializer<'de>,
72    {
73        let inner = BTreeMap::deserialize(deserializer)?;
74        if !inner.is_empty() {
75            tracing::warn!(
76                "unrecognized fields in `{}`: {:?}",
77                std::any::type_name::<T>(),
78                inner.keys()
79            );
80        }
81        Ok(Unrecognized {
82            inner,
83            _marker: std::marker::PhantomData,
84        })
85    }
86}
87
88impl<T> Serialize for Unrecognized<T> {
89    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
90    where
91        S: serde::Serializer,
92    {
93        self.inner.serialize(serializer)
94    }
95}
96
97pub fn load_config(path: &str, cli_override: impl OverrideConfig) -> RwConfig
98where
99{
100    let mut config = if path.is_empty() {
101        tracing::warn!("risingwave.toml not found, using default config.");
102        RwConfig::default()
103    } else {
104        let config_str = fs::read_to_string(path)
105            .with_context(|| format!("failed to open config file at `{path}`"))
106            .unwrap();
107        toml::from_str(config_str.as_str())
108            .context("failed to parse config file")
109            .unwrap()
110    };
111    cli_override.r#override(&mut config);
112    config
113}
114
115pub trait OverrideConfig {
116    fn r#override(&self, config: &mut RwConfig);
117}
118
119impl<T: OverrideConfig> OverrideConfig for &T {
120    fn r#override(&self, config: &mut RwConfig) {
121        T::r#override(self, config)
122    }
123}
124
125/// For non-user-facing components where the CLI arguments do not override the config file.
126#[derive(Clone, Copy)]
127pub struct NoOverride;
128
129impl OverrideConfig for NoOverride {
130    fn r#override(&self, _config: &mut RwConfig) {}
131}
132
133/// [`RwConfig`] corresponds to the whole config file `risingwave.toml`. Each field corresponds to a
134/// section.
135#[derive(Educe, Clone, Serialize, Deserialize, Default, ConfigDoc)]
136#[educe(Debug)]
137pub struct RwConfig {
138    #[serde(default)]
139    #[config_doc(nested)]
140    pub server: ServerConfig,
141
142    #[serde(default)]
143    #[config_doc(nested)]
144    pub meta: MetaConfig,
145
146    #[serde(default)]
147    #[config_doc(nested)]
148    pub batch: BatchConfig,
149
150    #[serde(default)]
151    #[config_doc(nested)]
152    pub frontend: FrontendConfig,
153
154    #[serde(default)]
155    #[config_doc(nested)]
156    pub streaming: StreamingConfig,
157
158    #[serde(default)]
159    #[config_doc(nested)]
160    pub storage: StorageConfig,
161
162    #[serde(default)]
163    #[educe(Debug(ignore))]
164    #[config_doc(nested)]
165    pub system: SystemConfig,
166
167    #[serde(default)]
168    #[config_doc(nested)]
169    pub udf: UdfConfig,
170
171    #[serde(flatten)]
172    #[config_doc(omitted)]
173    pub unrecognized: Unrecognized<Self>,
174}
175
176serde_with::with_prefix!(meta_prefix "meta_");
177serde_with::with_prefix!(streaming_prefix "stream_");
178serde_with::with_prefix!(batch_prefix "batch_");
179
180#[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)]
181pub enum MetaBackend {
182    #[default]
183    Mem,
184    Sql, // any database url
185    Sqlite,
186    Postgres,
187    Mysql,
188}
189
190/// The section `[meta]` in `risingwave.toml`.
191#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
192pub struct MetaConfig {
193    /// Objects within `min_sst_retention_time_sec` won't be deleted by hummock full GC, even they
194    /// are dangling.
195    #[serde(default = "default::meta::min_sst_retention_time_sec")]
196    pub min_sst_retention_time_sec: u64,
197
198    /// Interval of automatic hummock full GC.
199    #[serde(default = "default::meta::full_gc_interval_sec")]
200    pub full_gc_interval_sec: u64,
201
202    /// Max number of object per full GC job can fetch.
203    #[serde(default = "default::meta::full_gc_object_limit")]
204    pub full_gc_object_limit: u64,
205
206    /// Duration in seconds to retain garbage collection history data.
207    #[serde(default = "default::meta::gc_history_retention_time_sec")]
208    pub gc_history_retention_time_sec: u64,
209
210    /// Max number of inflight time travel query.
211    #[serde(default = "default::meta::max_inflight_time_travel_query")]
212    pub max_inflight_time_travel_query: u64,
213
214    /// Schedule compaction for all compaction groups with this interval.
215    #[serde(default = "default::meta::periodic_compaction_interval_sec")]
216    pub periodic_compaction_interval_sec: u64,
217
218    /// Interval of invoking a vacuum job, to remove stale metadata from meta store and objects
219    /// from object store.
220    #[serde(default = "default::meta::vacuum_interval_sec")]
221    pub vacuum_interval_sec: u64,
222
223    /// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of
224    /// meta node.
225    #[serde(default = "default::meta::vacuum_spin_interval_ms")]
226    pub vacuum_spin_interval_ms: u64,
227
228    /// Interval of hummock version checkpoint.
229    #[serde(default = "default::meta::hummock_version_checkpoint_interval_sec")]
230    pub hummock_version_checkpoint_interval_sec: u64,
231
232    /// If enabled, `SSTable` object file and version delta will be retained.
233    ///
234    /// `SSTable` object file need to be deleted via full GC.
235    ///
236    /// version delta need to be manually deleted.
237    #[serde(default = "default::meta::enable_hummock_data_archive")]
238    pub enable_hummock_data_archive: bool,
239
240    /// The interval at which a Hummock version snapshot is taken for time travel.
241    ///
242    /// Larger value indicates less storage overhead but worse query performance.
243    #[serde(default = "default::meta::hummock_time_travel_snapshot_interval")]
244    pub hummock_time_travel_snapshot_interval: u64,
245
246    /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
247    /// attempt is rejected.
248    #[serde(default = "default::meta::min_delta_log_num_for_hummock_version_checkpoint")]
249    pub min_delta_log_num_for_hummock_version_checkpoint: u64,
250
251    /// Maximum allowed heartbeat interval in seconds.
252    #[serde(default = "default::meta::max_heartbeat_interval_sec")]
253    pub max_heartbeat_interval_secs: u32,
254
255    /// Whether to enable fail-on-recovery. Should only be used in e2e tests.
256    #[serde(default)]
257    pub disable_recovery: bool,
258
259    /// Whether to disable adaptive-scaling feature.
260    #[serde(default)]
261    pub disable_automatic_parallelism_control: bool,
262
263    /// The number of streaming jobs per scaling operation.
264    #[serde(default = "default::meta::parallelism_control_batch_size")]
265    pub parallelism_control_batch_size: usize,
266
267    /// The period of parallelism control trigger.
268    #[serde(default = "default::meta::parallelism_control_trigger_period_sec")]
269    pub parallelism_control_trigger_period_sec: u64,
270
271    /// The first delay of parallelism control.
272    #[serde(default = "default::meta::parallelism_control_trigger_first_delay_sec")]
273    pub parallelism_control_trigger_first_delay_sec: u64,
274
275    #[serde(default = "default::meta::meta_leader_lease_secs")]
276    pub meta_leader_lease_secs: u64,
277
278    /// After specified seconds of idle (no mview or flush), the process will be exited.
279    /// It is mainly useful for playgrounds.
280    #[serde(default)]
281    pub dangerous_max_idle_secs: Option<u64>,
282
283    /// The default global parallelism for all streaming jobs, if user doesn't specify the
284    /// parallelism, this value will be used. `FULL` means use all available parallelism units,
285    /// otherwise it's a number.
286    #[serde(default = "default::meta::default_parallelism")]
287    pub default_parallelism: DefaultParallelism,
288
289    /// Whether to enable deterministic compaction scheduling, which
290    /// will disable all auto scheduling of compaction tasks.
291    /// Should only be used in e2e tests.
292    #[serde(default)]
293    pub enable_compaction_deterministic: bool,
294
295    /// Enable sanity check when SSTs are committed.
296    #[serde(default)]
297    pub enable_committed_sst_sanity_check: bool,
298
299    #[serde(default = "default::meta::node_num_monitor_interval_sec")]
300    pub node_num_monitor_interval_sec: u64,
301
302    #[serde(default = "default::meta::backend")]
303    pub backend: MetaBackend,
304
305    /// Schedule `space_reclaim` compaction for all compaction groups with this interval.
306    #[serde(default = "default::meta::periodic_space_reclaim_compaction_interval_sec")]
307    pub periodic_space_reclaim_compaction_interval_sec: u64,
308
309    /// Schedule `ttl_reclaim` compaction for all compaction groups with this interval.
310    #[serde(default = "default::meta::periodic_ttl_reclaim_compaction_interval_sec")]
311    pub periodic_ttl_reclaim_compaction_interval_sec: u64,
312
313    #[serde(default = "default::meta::periodic_tombstone_reclaim_compaction_interval_sec")]
314    pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
315
316    #[serde(default = "default::meta::move_table_size_limit")]
317    #[deprecated]
318    pub move_table_size_limit: u64,
319
320    #[serde(default = "default::meta::split_group_size_limit")]
321    #[deprecated]
322    pub split_group_size_limit: u64,
323
324    #[serde(default = "default::meta::cut_table_size_limit")]
325    #[deprecated]
326    pub cut_table_size_limit: u64,
327
328    #[serde(default, flatten)]
329    #[config_doc(omitted)]
330    pub unrecognized: Unrecognized<Self>,
331
332    /// Whether config object storage bucket lifecycle to purge stale data.
333    #[serde(default)]
334    pub do_not_config_object_storage_lifecycle: bool,
335
336    /// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically.
337    /// Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table.
338    #[serde(default = "default::meta::partition_vnode_count")]
339    pub partition_vnode_count: u32,
340
341    /// The threshold of write throughput to trigger a group split.
342    #[serde(
343        default = "default::meta::table_high_write_throughput_threshold",
344        alias = "table_write_throughput_threshold"
345    )]
346    pub table_high_write_throughput_threshold: u64,
347
348    #[serde(
349        default = "default::meta::table_low_write_throughput_threshold",
350        alias = "min_table_split_write_throughput"
351    )]
352    /// The threshold of write throughput to trigger a group merge.
353    pub table_low_write_throughput_threshold: u64,
354
355    // If the compaction task does not report heartbeat beyond the
356    // `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task
357    #[serde(default = "default::meta::compaction_task_max_heartbeat_interval_secs")]
358    pub compaction_task_max_heartbeat_interval_secs: u64,
359
360    // If the compaction task does not change in progress beyond the
361    // `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task
362    #[serde(default = "default::meta::compaction_task_max_progress_interval_secs")]
363    pub compaction_task_max_progress_interval_secs: u64,
364
365    #[serde(default)]
366    #[config_doc(nested)]
367    pub compaction_config: CompactionConfig,
368
369    /// Count of partitions of tables in default group and materialized view group.
370    /// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment.
371    /// Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table.
372    /// Set it zero to disable this feature.
373    #[serde(default = "default::meta::hybrid_partition_vnode_count")]
374    pub hybrid_partition_vnode_count: u32,
375
376    #[serde(default = "default::meta::event_log_enabled")]
377    pub event_log_enabled: bool,
378    /// Keeps the latest N events per channel.
379    #[serde(default = "default::meta::event_log_channel_max_size")]
380    pub event_log_channel_max_size: u32,
381
382    #[serde(default, with = "meta_prefix")]
383    #[config_doc(omitted)]
384    pub developer: MetaDeveloperConfig,
385    /// Whether compactor should rewrite row to remove dropped column.
386    #[serde(default = "default::meta::enable_dropped_column_reclaim")]
387    pub enable_dropped_column_reclaim: bool,
388
389    /// Whether to split the compaction group when the size of the group exceeds the `compaction_group_config.max_estimated_group_size() * split_group_size_ratio`.
390    #[serde(default = "default::meta::split_group_size_ratio")]
391    pub split_group_size_ratio: f64,
392
393    // During group scheduling, the configured `*_throughput_ratio` is used to determine if the sample exceeds the threshold.
394    // Use `table_stat_throuput_window_seconds_for_*` to check if the split and merge conditions are met.
395    /// To split the compaction group when the high throughput statistics of the group exceeds the threshold.
396    #[serde(default = "default::meta::table_stat_high_write_throughput_ratio_for_split")]
397    pub table_stat_high_write_throughput_ratio_for_split: f64,
398
399    /// To merge the compaction group when the low throughput statistics of the group exceeds the threshold.
400    #[serde(default = "default::meta::table_stat_low_write_throughput_ratio_for_merge")]
401    pub table_stat_low_write_throughput_ratio_for_merge: f64,
402
403    // Hummock also control the size of samples to be judged during group scheduling by `table_stat_sample_size_for_split` and `table_stat_sample_size_for_merge`.
404    // Will use max(table_stat_throuput_window_seconds_for_split /ckpt, table_stat_throuput_window_seconds_for_merge/ckpt) as the global sample size.
405    // For example, if `table_stat_throuput_window_seconds_for_merge` = 240 and `table_stat_throuput_window_seconds_for_split` = 60, and `ckpt_sec = 1`,
406    //  global sample size will be max(240/1, 60/1), then only the last 60 samples will be considered for split, and so on.
407    /// The window seconds of table throughput statistic history for split compaction group.
408    #[serde(default = "default::meta::table_stat_throuput_window_seconds_for_split")]
409    pub table_stat_throuput_window_seconds_for_split: usize,
410
411    /// The window seconds of table throughput statistic history for merge compaction group.
412    #[serde(default = "default::meta::table_stat_throuput_window_seconds_for_merge")]
413    pub table_stat_throuput_window_seconds_for_merge: usize,
414
415    /// The threshold of table size in one compact task to decide whether to partition one table into `hybrid_partition_vnode_count` parts, which belongs to default group and materialized view group.
416    /// Set it max value of 64-bit number to disable this feature.
417    #[serde(default = "default::meta::compact_task_table_size_partition_threshold_low")]
418    pub compact_task_table_size_partition_threshold_low: u64,
419
420    /// The threshold of table size in one compact task to decide whether to partition one table into `partition_vnode_count` parts, which belongs to default group and materialized view group.
421    /// Set it max value of 64-bit number to disable this feature.
422    #[serde(default = "default::meta::compact_task_table_size_partition_threshold_high")]
423    pub compact_task_table_size_partition_threshold_high: u64,
424
425    #[serde(
426        default = "default::meta::periodic_scheduling_compaction_group_split_interval_sec",
427        alias = "periodic_split_compact_group_interval_sec"
428    )]
429    pub periodic_scheduling_compaction_group_split_interval_sec: u64,
430
431    #[serde(default = "default::meta::periodic_scheduling_compaction_group_merge_interval_sec")]
432    pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
433
434    #[serde(default)]
435    #[config_doc(nested)]
436    pub meta_store_config: MetaStoreConfig,
437}
438
439#[derive(Copy, Clone, Debug, Default)]
440pub enum DefaultParallelism {
441    #[default]
442    Full,
443    Default(NonZeroUsize),
444}
445
446impl Serialize for DefaultParallelism {
447    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
448    where
449        S: Serializer,
450    {
451        #[derive(Debug, Serialize, Deserialize)]
452        #[serde(untagged)]
453        enum Parallelism {
454            Str(String),
455            Int(usize),
456        }
457        match self {
458            DefaultParallelism::Full => Parallelism::Str("Full".to_owned()).serialize(serializer),
459            DefaultParallelism::Default(val) => {
460                Parallelism::Int(val.get() as _).serialize(serializer)
461            }
462        }
463    }
464}
465
466impl<'de> Deserialize<'de> for DefaultParallelism {
467    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
468    where
469        D: serde::Deserializer<'de>,
470    {
471        #[derive(Debug, Deserialize)]
472        #[serde(untagged)]
473        enum Parallelism {
474            Str(String),
475            Int(usize),
476        }
477        let p = Parallelism::deserialize(deserializer)?;
478        match p {
479            Parallelism::Str(s) => {
480                if s.trim().eq_ignore_ascii_case("full") {
481                    Ok(DefaultParallelism::Full)
482                } else {
483                    Err(serde::de::Error::custom(format!(
484                        "invalid default parallelism: {}",
485                        s
486                    )))
487                }
488            }
489            Parallelism::Int(i) => Ok(DefaultParallelism::Default(
490                // Note: we won't check whether this exceeds the maximum parallelism (i.e., vnode count)
491                // here because it requires extra context. The check will be done when scheduling jobs.
492                NonZeroUsize::new(i).ok_or_else(|| {
493                    serde::de::Error::custom("default parallelism should not be 0")
494                })?,
495            )),
496        }
497    }
498}
499
500/// The subsections `[meta.developer]`.
501///
502/// It is put at [`MetaConfig::developer`].
503#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
504pub struct MetaDeveloperConfig {
505    /// The number of traces to be cached in-memory by the tracing collector
506    /// embedded in the meta node.
507    #[serde(default = "default::developer::meta_cached_traces_num")]
508    pub cached_traces_num: u32,
509
510    /// The maximum memory usage in bytes for the tracing collector embedded
511    /// in the meta node.
512    #[serde(default = "default::developer::meta_cached_traces_memory_limit_bytes")]
513    pub cached_traces_memory_limit_bytes: usize,
514
515    /// Compaction picker config
516    #[serde(default = "default::developer::enable_trivial_move")]
517    pub enable_trivial_move: bool,
518    #[serde(default = "default::developer::enable_check_task_level_overlap")]
519    pub enable_check_task_level_overlap: bool,
520    #[serde(default = "default::developer::max_trivial_move_task_count_per_loop")]
521    pub max_trivial_move_task_count_per_loop: usize,
522
523    #[serde(default = "default::developer::max_get_task_probe_times")]
524    pub max_get_task_probe_times: usize,
525
526    /// Max number of actor allowed per parallelism (default = 100).
527    /// CREATE MV/Table will be noticed when the number of actors exceeds this limit.
528    #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")]
529    pub actor_cnt_per_worker_parallelism_soft_limit: usize,
530
531    /// Max number of actor allowed per parallelism (default = 400).
532    /// CREATE MV/Table will be rejected when the number of actors exceeds this limit.
533    #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")]
534    pub actor_cnt_per_worker_parallelism_hard_limit: usize,
535
536    /// Max number of SSTs fetched from meta store per SELECT, during time travel Hummock version replay.
537    #[serde(default = "default::developer::hummock_time_travel_sst_info_fetch_batch_size")]
538    pub hummock_time_travel_sst_info_fetch_batch_size: usize,
539
540    /// Max number of SSTs inserted into meta store per INSERT, during time travel metadata writing.
541    #[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")]
542    pub hummock_time_travel_sst_info_insert_batch_size: usize,
543
544    #[serde(default = "default::developer::time_travel_vacuum_interval_sec")]
545    pub time_travel_vacuum_interval_sec: u64,
546
547    /// Max number of epoch-to-version inserted into meta store per INSERT, during time travel metadata writing.
548    #[serde(default = "default::developer::hummock_time_travel_epoch_version_insert_batch_size")]
549    pub hummock_time_travel_epoch_version_insert_batch_size: usize,
550
551    #[serde(default = "default::developer::hummock_gc_history_insert_batch_size")]
552    pub hummock_gc_history_insert_batch_size: usize,
553
554    #[serde(default = "default::developer::hummock_time_travel_filter_out_objects_batch_size")]
555    pub hummock_time_travel_filter_out_objects_batch_size: usize,
556
557    #[serde(default)]
558    pub compute_client_config: RpcClientConfig,
559
560    #[serde(default)]
561    pub stream_client_config: RpcClientConfig,
562
563    #[serde(default)]
564    pub frontend_client_config: RpcClientConfig,
565}
566
567#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
568pub struct RpcClientConfig {
569    #[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
570    pub connect_timeout_secs: u64,
571}
572
573/// The section `[server]` in `risingwave.toml`.
574#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
575pub struct ServerConfig {
576    /// The interval for periodic heartbeat from worker to the meta service.
577    #[serde(default = "default::server::heartbeat_interval_ms")]
578    pub heartbeat_interval_ms: u32,
579
580    /// The default number of the connections when connecting to a gRPC server.
581    ///
582    /// For the connections used in streaming or batch exchange, please refer to the entries in
583    /// `[stream.developer]` and `[batch.developer]` sections. This value will be used if they
584    /// are not specified.
585    #[serde(default = "default::server::connection_pool_size")]
586    // Intentionally made private to avoid abuse. Check the related methods on `RwConfig`.
587    connection_pool_size: u16,
588
589    /// Used for control the metrics level, similar to log level.
590    #[serde(default = "default::server::metrics_level")]
591    pub metrics_level: MetricLevel,
592
593    #[serde(default = "default::server::telemetry_enabled")]
594    pub telemetry_enabled: bool,
595
596    /// Enable heap profile dump when memory usage is high.
597    #[serde(default)]
598    pub heap_profiling: HeapProfilingConfig,
599
600    // Number of max pending reset stream for grpc server.
601    #[serde(default = "default::server::grpc_max_reset_stream_size")]
602    pub grpc_max_reset_stream: u32,
603
604    #[serde(default, flatten)]
605    #[config_doc(omitted)]
606    pub unrecognized: Unrecognized<Self>,
607}
608
609/// The section `[batch]` in `risingwave.toml`.
610#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
611pub struct BatchConfig {
612    /// The thread number of the batch task runtime in the compute node. The default value is
613    /// decided by `tokio`.
614    #[serde(default)]
615    pub worker_threads_num: Option<usize>,
616
617    #[serde(default, with = "batch_prefix")]
618    #[config_doc(omitted)]
619    pub developer: BatchDeveloperConfig,
620
621    /// This is the max number of queries per sql session.
622    #[serde(default)]
623    pub distributed_query_limit: Option<u64>,
624
625    /// This is the max number of batch queries per frontend node.
626    #[serde(default)]
627    pub max_batch_queries_per_frontend_node: Option<u64>,
628
629    #[serde(default = "default::batch::enable_barrier_read")]
630    pub enable_barrier_read: bool,
631
632    /// Timeout for a batch query in seconds.
633    #[serde(default = "default::batch::statement_timeout_in_sec")]
634    pub statement_timeout_in_sec: u32,
635
636    #[serde(default, flatten)]
637    #[config_doc(omitted)]
638    pub unrecognized: Unrecognized<Self>,
639
640    #[serde(default = "default::batch::frontend_compute_runtime_worker_threads")]
641    /// frontend compute runtime worker threads
642    pub frontend_compute_runtime_worker_threads: usize,
643
644    /// This is the secs used to mask a worker unavailable temporarily.
645    #[serde(default = "default::batch::mask_worker_temporary_secs")]
646    pub mask_worker_temporary_secs: usize,
647
648    /// Keywords on which SQL option redaction is based in the query log.
649    /// A SQL option with a name containing any of these keywords will be redacted.
650    #[serde(default = "default::batch::redact_sql_option_keywords")]
651    pub redact_sql_option_keywords: Vec<String>,
652
653    /// Enable the spill out to disk feature for batch queries.
654    #[serde(default = "default::batch::enable_spill")]
655    pub enable_spill: bool,
656}
657
658#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
659pub struct FrontendConfig {
660    /// Total memory constraints for running queries.
661    #[serde(default = "default::frontend::max_total_query_size_bytes")]
662    pub max_total_query_size_bytes: u64,
663
664    /// A query of size under this threshold will never be rejected due to memory constraints.
665    #[serde(default = "default::frontend::min_single_query_size_bytes")]
666    pub min_single_query_size_bytes: u64,
667
668    /// A query of size exceeding this threshold will always be rejected due to memory constraints.
669    #[serde(default = "default::frontend::max_single_query_size_bytes")]
670    pub max_single_query_size_bytes: u64,
671}
672
673#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
674pub struct UdfConfig {
675    /// Allow embedded Python UDFs to be created.
676    #[serde(default = "default::udf::enable_embedded_python_udf")]
677    pub enable_embedded_python_udf: bool,
678
679    /// Allow embedded JS UDFs to be created.
680    #[serde(default = "default::udf::enable_embedded_javascript_udf")]
681    pub enable_embedded_javascript_udf: bool,
682
683    /// Allow embedded WASM UDFs to be created.
684    #[serde(default = "default::udf::enable_embedded_wasm_udf")]
685    pub enable_embedded_wasm_udf: bool,
686}
687
688/// The section `[streaming]` in `risingwave.toml`.
689#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
690pub struct StreamingConfig {
691    /// The maximum number of barriers in-flight in the compute nodes.
692    #[serde(default = "default::streaming::in_flight_barrier_nums")]
693    pub in_flight_barrier_nums: usize,
694
695    /// The thread number of the streaming actor runtime in the compute node. The default value is
696    /// decided by `tokio`.
697    #[serde(default)]
698    pub actor_runtime_worker_threads_num: Option<usize>,
699
700    /// Enable async stack tracing through `await-tree` for risectl.
701    #[serde(default = "default::streaming::async_stack_trace")]
702    pub async_stack_trace: AsyncStackTraceOption,
703
704    #[serde(default, with = "streaming_prefix")]
705    #[config_doc(omitted)]
706    pub developer: StreamingDeveloperConfig,
707
708    /// Max unique user stream errors per actor
709    #[serde(default = "default::streaming::unique_user_stream_errors")]
710    pub unique_user_stream_errors: usize,
711
712    /// Control the strictness of stream consistency.
713    #[serde(default = "default::streaming::unsafe_enable_strict_consistency")]
714    pub unsafe_enable_strict_consistency: bool,
715
716    #[serde(default, flatten)]
717    #[config_doc(omitted)]
718    pub unrecognized: Unrecognized<Self>,
719}
720
721pub use risingwave_common_metrics::MetricLevel;
722
723/// the section `[storage.cache]` in `risingwave.toml`.
724#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
725pub struct CacheConfig {
726    /// Configure the capacity of the block cache in MB explicitly.
727    /// The overridden value will only be effective if:
728    /// 1. `meta_cache_capacity_mb` and `shared_buffer_capacity_mb` are also configured explicitly.
729    /// 2. `block_cache_capacity_mb` + `meta_cache_capacity_mb` + `meta_cache_capacity_mb` doesn't exceed 0.3 * non-reserved memory.
730    #[serde(default)]
731    pub block_cache_capacity_mb: Option<usize>,
732
733    /// Configure the number of shards in the block cache explicitly.
734    /// If not set, the shard number will be determined automatically based on cache capacity.
735    #[serde(default)]
736    pub block_cache_shard_num: Option<usize>,
737
738    #[serde(default)]
739    #[config_doc(omitted)]
740    pub block_cache_eviction: CacheEvictionConfig,
741
742    /// Configure the capacity of the block cache in MB explicitly.
743    /// The overridden value will only be effective if:
744    /// 1. `block_cache_capacity_mb` and `shared_buffer_capacity_mb` are also configured explicitly.
745    /// 2. `block_cache_capacity_mb` + `meta_cache_capacity_mb` + `meta_cache_capacity_mb` doesn't exceed 0.3 * non-reserved memory.
746    #[serde(default)]
747    pub meta_cache_capacity_mb: Option<usize>,
748
749    /// Configure the number of shards in the meta cache explicitly.
750    /// If not set, the shard number will be determined automatically based on cache capacity.
751    #[serde(default)]
752    pub meta_cache_shard_num: Option<usize>,
753
754    #[serde(default)]
755    #[config_doc(omitted)]
756    pub meta_cache_eviction: CacheEvictionConfig,
757}
758
759/// the section `[storage.cache.eviction]` in `risingwave.toml`.
760#[derive(Clone, Debug, Serialize, Deserialize)]
761#[serde(tag = "algorithm")]
762pub enum CacheEvictionConfig {
763    Lru {
764        high_priority_ratio_in_percent: Option<usize>,
765    },
766    Lfu {
767        window_capacity_ratio_in_percent: Option<usize>,
768        protected_capacity_ratio_in_percent: Option<usize>,
769        cmsketch_eps: Option<f64>,
770        cmsketch_confidence: Option<f64>,
771    },
772    S3Fifo {
773        small_queue_capacity_ratio_in_percent: Option<usize>,
774        ghost_queue_capacity_ratio_in_percent: Option<usize>,
775        small_to_main_freq_threshold: Option<u8>,
776    },
777}
778
779impl Default for CacheEvictionConfig {
780    fn default() -> Self {
781        Self::Lru {
782            high_priority_ratio_in_percent: None,
783        }
784    }
785}
786
787/// The section `[storage]` in `risingwave.toml`.
788#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
789pub struct StorageConfig {
790    /// parallelism while syncing share buffers into L0 SST. Should NOT be 0.
791    #[serde(default = "default::storage::share_buffers_sync_parallelism")]
792    pub share_buffers_sync_parallelism: u32,
793
794    /// Worker threads number of dedicated tokio runtime for share buffer compaction. 0 means use
795    /// tokio's default value (number of CPU core).
796    #[serde(default = "default::storage::share_buffer_compaction_worker_threads_number")]
797    pub share_buffer_compaction_worker_threads_number: u32,
798
799    /// Configure the maximum shared buffer size in MB explicitly. Writes attempting to exceed the capacity
800    /// will stall until there is enough space. The overridden value will only be effective if:
801    /// 1. `block_cache_capacity_mb` and `meta_cache_capacity_mb` are also configured explicitly.
802    /// 2. `block_cache_capacity_mb` + `meta_cache_capacity_mb` + `meta_cache_capacity_mb` doesn't exceed 0.3 * non-reserved memory.
803    #[serde(default)]
804    pub shared_buffer_capacity_mb: Option<usize>,
805
806    /// The shared buffer will start flushing data to object when the ratio of memory usage to the
807    /// shared buffer capacity exceed such ratio.
808    #[serde(default = "default::storage::shared_buffer_flush_ratio")]
809    pub shared_buffer_flush_ratio: f32,
810
811    /// The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger,
812    /// the total flush size across multiple epochs should be at least higher than this size.
813    #[serde(default = "default::storage::shared_buffer_min_batch_flush_size_mb")]
814    pub shared_buffer_min_batch_flush_size_mb: usize,
815
816    /// The threshold for the number of immutable memtables to merge to a new imm.
817    #[serde(default = "default::storage::imm_merge_threshold")]
818    #[deprecated]
819    pub imm_merge_threshold: usize,
820
821    /// Whether to enable write conflict detection
822    #[serde(default = "default::storage::write_conflict_detection_enabled")]
823    pub write_conflict_detection_enabled: bool,
824
825    #[serde(default)]
826    #[config_doc(nested)]
827    pub cache: CacheConfig,
828
829    /// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_capacity_mb` instead.
830    #[serde(default)]
831    pub block_cache_capacity_mb: Option<usize>,
832
833    /// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead.
834    #[serde(default)]
835    pub meta_cache_capacity_mb: Option<usize>,
836
837    /// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_eviction.high_priority_ratio_in_percent` with `storage.cache.block_cache_eviction.algorithm = "Lru"` instead.
838    #[serde(default)]
839    pub high_priority_ratio_in_percent: Option<usize>,
840
841    /// max memory usage for large query
842    #[serde(default)]
843    pub prefetch_buffer_capacity_mb: Option<usize>,
844
845    #[serde(default = "default::storage::max_cached_recent_versions_number")]
846    pub max_cached_recent_versions_number: usize,
847
848    /// max prefetch block number
849    #[serde(default = "default::storage::max_prefetch_block_number")]
850    pub max_prefetch_block_number: usize,
851
852    #[serde(default = "default::storage::disable_remote_compactor")]
853    pub disable_remote_compactor: bool,
854
855    /// Number of tasks shared buffer can upload in parallel.
856    #[serde(default = "default::storage::share_buffer_upload_concurrency")]
857    pub share_buffer_upload_concurrency: usize,
858
859    #[serde(default)]
860    pub compactor_memory_limit_mb: Option<usize>,
861
862    /// Compactor calculates the maximum number of tasks that can be executed on the node based on
863    /// `worker_num` and `compactor_max_task_multiplier`.
864    /// `max_pull_task_count` = `worker_num` * `compactor_max_task_multiplier`
865    #[serde(default = "default::storage::compactor_max_task_multiplier")]
866    pub compactor_max_task_multiplier: f32,
867
868    /// The percentage of memory available when compactor is deployed separately.
869    /// `non_reserved_memory_bytes` = `system_memory_available_bytes` * `compactor_memory_available_proportion`
870    #[serde(default = "default::storage::compactor_memory_available_proportion")]
871    pub compactor_memory_available_proportion: f64,
872
873    /// Number of SST ids fetched from meta per RPC
874    #[serde(default = "default::storage::sstable_id_remote_fetch_number")]
875    pub sstable_id_remote_fetch_number: u32,
876
877    #[serde(default = "default::storage::min_sstable_size_mb")]
878    pub min_sstable_size_mb: u32,
879
880    #[serde(default)]
881    #[config_doc(nested)]
882    pub data_file_cache: FileCacheConfig,
883
884    #[serde(default)]
885    #[config_doc(nested)]
886    pub meta_file_cache: FileCacheConfig,
887
888    #[serde(default)]
889    #[config_doc(nested)]
890    pub cache_refill: CacheRefillConfig,
891
892    /// Whether to enable streaming upload for sstable.
893    #[serde(default = "default::storage::min_sst_size_for_streaming_upload")]
894    pub min_sst_size_for_streaming_upload: u64,
895
896    #[serde(default = "default::storage::max_concurrent_compaction_task_number")]
897    pub max_concurrent_compaction_task_number: u64,
898
899    #[serde(default = "default::storage::max_preload_wait_time_mill")]
900    pub max_preload_wait_time_mill: u64,
901
902    #[serde(default = "default::storage::max_version_pinning_duration_sec")]
903    pub max_version_pinning_duration_sec: u64,
904
905    #[serde(default = "default::storage::compactor_max_sst_key_count")]
906    pub compactor_max_sst_key_count: u64,
907    // DEPRECATED: This config will be deprecated in the future version, use `storage.compactor_iter_max_io_retry_times` instead.
908    #[serde(default = "default::storage::compact_iter_recreate_timeout_ms")]
909    pub compact_iter_recreate_timeout_ms: u64,
910    #[serde(default = "default::storage::compactor_max_sst_size")]
911    pub compactor_max_sst_size: u64,
912    #[serde(default = "default::storage::enable_fast_compaction")]
913    pub enable_fast_compaction: bool,
914    #[serde(default = "default::storage::check_compaction_result")]
915    pub check_compaction_result: bool,
916    #[serde(default = "default::storage::max_preload_io_retry_times")]
917    pub max_preload_io_retry_times: usize,
918    #[serde(default = "default::storage::compactor_fast_max_compact_delete_ratio")]
919    pub compactor_fast_max_compact_delete_ratio: u32,
920    #[serde(default = "default::storage::compactor_fast_max_compact_task_size")]
921    pub compactor_fast_max_compact_task_size: u64,
922    #[serde(default = "default::storage::compactor_iter_max_io_retry_times")]
923    pub compactor_iter_max_io_retry_times: usize,
924
925    /// Deprecated: The window size of table info statistic history.
926    #[serde(default = "default::storage::table_info_statistic_history_times")]
927    #[deprecated]
928    pub table_info_statistic_history_times: usize,
929
930    #[serde(default, flatten)]
931    #[config_doc(omitted)]
932    pub unrecognized: Unrecognized<Self>,
933
934    /// The spill threshold for mem table.
935    #[serde(default = "default::storage::mem_table_spill_threshold")]
936    pub mem_table_spill_threshold: usize,
937
938    /// The concurrent uploading number of `SSTables` of builder
939    #[serde(default = "default::storage::compactor_concurrent_uploading_sst_count")]
940    pub compactor_concurrent_uploading_sst_count: Option<usize>,
941
942    #[serde(default = "default::storage::compactor_max_overlap_sst_count")]
943    pub compactor_max_overlap_sst_count: usize,
944
945    /// The maximum number of meta files that can be preloaded.
946    /// If the number of meta files exceeds this value, the compactor will try to compute parallelism only through `SstableInfo`, no longer preloading `SstableMeta`.
947    /// This is to prevent the compactor from consuming too much memory, but it may cause the compactor to be less efficient.
948    #[serde(default = "default::storage::compactor_max_preload_meta_file_count")]
949    pub compactor_max_preload_meta_file_count: usize,
950
951    /// Object storage configuration
952    /// 1. General configuration
953    /// 2. Some special configuration of Backend
954    /// 3. Retry and timeout configuration
955    #[serde(default)]
956    pub object_store: ObjectStoreConfig,
957
958    #[serde(default = "default::storage::time_travel_version_cache_capacity")]
959    pub time_travel_version_cache_capacity: u64,
960}
961
962#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
963pub struct CacheRefillConfig {
964    /// `SSTable` levels to refill.
965    #[serde(default = "default::cache_refill::data_refill_levels")]
966    pub data_refill_levels: Vec<u32>,
967
968    /// Cache refill maximum timeout to apply version delta.
969    #[serde(default = "default::cache_refill::timeout_ms")]
970    pub timeout_ms: u64,
971
972    /// Inflight data cache refill tasks.
973    #[serde(default = "default::cache_refill::concurrency")]
974    pub concurrency: usize,
975
976    /// Block count that a data cache refill request fetches.
977    #[serde(default = "default::cache_refill::unit")]
978    pub unit: usize,
979
980    /// Data cache refill unit admission ratio.
981    ///
982    /// Only unit whose blocks are admitted above the ratio will be refilled.
983    #[serde(default = "default::cache_refill::threshold")]
984    pub threshold: f64,
985
986    /// Recent filter layer count.
987    #[serde(default = "default::cache_refill::recent_filter_layers")]
988    pub recent_filter_layers: usize,
989
990    /// Recent filter layer rotate interval.
991    #[serde(default = "default::cache_refill::recent_filter_rotate_interval_ms")]
992    pub recent_filter_rotate_interval_ms: usize,
993
994    #[serde(default, flatten)]
995    #[config_doc(omitted)]
996    pub unrecognized: Unrecognized<Self>,
997}
998
999/// The subsection `[storage.data_file_cache]` and `[storage.meta_file_cache]` in `risingwave.toml`.
1000///
1001/// It's put at [`StorageConfig::data_file_cache`] and  [`StorageConfig::meta_file_cache`].
1002#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
1003pub struct FileCacheConfig {
1004    #[serde(default = "default::file_cache::dir")]
1005    pub dir: String,
1006
1007    #[serde(default = "default::file_cache::capacity_mb")]
1008    pub capacity_mb: usize,
1009
1010    #[serde(default = "default::file_cache::file_capacity_mb")]
1011    pub file_capacity_mb: usize,
1012
1013    #[serde(default = "default::file_cache::flushers")]
1014    pub flushers: usize,
1015
1016    #[serde(default = "default::file_cache::reclaimers")]
1017    pub reclaimers: usize,
1018
1019    #[serde(default = "default::file_cache::recover_concurrency")]
1020    pub recover_concurrency: usize,
1021
1022    #[serde(default = "default::file_cache::insert_rate_limit_mb")]
1023    pub insert_rate_limit_mb: usize,
1024
1025    #[serde(default = "default::file_cache::indexer_shards")]
1026    pub indexer_shards: usize,
1027
1028    #[serde(default = "default::file_cache::compression")]
1029    pub compression: Compression,
1030
1031    #[serde(default = "default::file_cache::flush_buffer_threshold_mb")]
1032    pub flush_buffer_threshold_mb: Option<usize>,
1033
1034    /// Recover mode.
1035    ///
1036    /// Options:
1037    ///
1038    /// - "None": Do not recover disk cache.
1039    /// - "Quiet": Recover disk cache and skip errors.
1040    /// - "Strict": Recover disk cache and panic on errors.
1041    ///
1042    /// More details, see [`RecoverMode::None`], [`RecoverMode::Quiet`] and [`RecoverMode::Strict`],
1043    #[serde(default = "default::file_cache::recover_mode")]
1044    pub recover_mode: RecoverMode,
1045
1046    #[serde(default = "default::file_cache::runtime_config")]
1047    pub runtime_config: RuntimeOptions,
1048
1049    #[serde(default, flatten)]
1050    #[config_doc(omitted)]
1051    pub unrecognized: Unrecognized<Self>,
1052}
1053
1054#[derive(Debug, Default, Clone, Copy, ValueEnum, Serialize, Deserialize)]
1055pub enum AsyncStackTraceOption {
1056    /// Disabled.
1057    Off,
1058    /// Enabled with basic instruments.
1059    On,
1060    /// Enabled with extra verbose instruments in release build.
1061    /// Behaves the same as `on` in debug build due to performance concern.
1062    #[default]
1063    #[clap(alias = "verbose")]
1064    ReleaseVerbose,
1065}
1066
1067impl AsyncStackTraceOption {
1068    pub fn is_verbose(self) -> Option<bool> {
1069        match self {
1070            Self::Off => None,
1071            Self::On => Some(false),
1072            Self::ReleaseVerbose => Some(!cfg!(debug_assertions)),
1073        }
1074    }
1075}
1076
1077#[derive(Debug, Default, Clone, Copy, ValueEnum)]
1078pub enum CompactorMode {
1079    #[default]
1080    #[clap(alias = "dedicated")]
1081    Dedicated,
1082
1083    #[clap(alias = "shared")]
1084    Shared,
1085}
1086
1087#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
1088pub struct HeapProfilingConfig {
1089    /// Enable to auto dump heap profile when memory usage is high
1090    #[serde(default = "default::heap_profiling::enable_auto")]
1091    pub enable_auto: bool,
1092
1093    /// The proportion (number between 0 and 1) of memory usage to trigger heap profile dump
1094    #[serde(default = "default::heap_profiling::threshold_auto")]
1095    pub threshold_auto: f32,
1096
1097    /// The directory to dump heap profile. If empty, the prefix in `MALLOC_CONF` will be used
1098    #[serde(default = "default::heap_profiling::dir")]
1099    pub dir: String,
1100}
1101
1102/// The subsections `[streaming.developer]`.
1103///
1104/// It is put at [`StreamingConfig::developer`].
1105#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
1106pub struct StreamingDeveloperConfig {
1107    /// Set to true to enable per-executor row count metrics. This will produce a lot of timeseries
1108    /// and might affect the prometheus performance. If you only need actor input and output
1109    /// rows data, see `stream_actor_in_record_cnt` and `stream_actor_out_record_cnt` instead.
1110    #[serde(default = "default::developer::stream_enable_executor_row_count")]
1111    pub enable_executor_row_count: bool,
1112
1113    /// The capacity of the chunks in the channel that connects between `ConnectorSource` and
1114    /// `SourceExecutor`.
1115    #[serde(default = "default::developer::connector_message_buffer_size")]
1116    pub connector_message_buffer_size: usize,
1117
1118    /// Limit number of the cached entries in an extreme aggregation call.
1119    #[serde(default = "default::developer::unsafe_stream_extreme_cache_size")]
1120    pub unsafe_extreme_cache_size: usize,
1121
1122    /// The maximum size of the chunk produced by executor at a time.
1123    #[serde(default = "default::developer::stream_chunk_size")]
1124    pub chunk_size: usize,
1125
1126    /// The initial permits that a channel holds, i.e., the maximum row count can be buffered in
1127    /// the channel.
1128    #[serde(default = "default::developer::stream_exchange_initial_permits")]
1129    pub exchange_initial_permits: usize,
1130
1131    /// The permits that are batched to add back, for reducing the backward `AddPermits` messages
1132    /// in remote exchange.
1133    #[serde(default = "default::developer::stream_exchange_batched_permits")]
1134    pub exchange_batched_permits: usize,
1135
1136    /// The maximum number of concurrent barriers in an exchange channel.
1137    #[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
1138    pub exchange_concurrent_barriers: usize,
1139
1140    /// The concurrency for dispatching messages to different downstream jobs.
1141    ///
1142    /// - `1` means no concurrency, i.e., dispatch messages to downstream jobs one by one.
1143    /// - `0` means unlimited concurrency.
1144    #[serde(default = "default::developer::stream_exchange_concurrent_dispatchers")]
1145    pub exchange_concurrent_dispatchers: usize,
1146
1147    /// The initial permits for a dml channel, i.e., the maximum row count can be buffered in
1148    /// the channel.
1149    #[serde(default = "default::developer::stream_dml_channel_initial_permits")]
1150    pub dml_channel_initial_permits: usize,
1151
1152    /// The max heap size of dirty groups of `HashAggExecutor`.
1153    #[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
1154    pub hash_agg_max_dirty_groups_heap_size: usize,
1155
1156    #[serde(default = "default::developer::memory_controller_threshold_aggressive")]
1157    pub memory_controller_threshold_aggressive: f64,
1158
1159    #[serde(default = "default::developer::memory_controller_threshold_graceful")]
1160    pub memory_controller_threshold_graceful: f64,
1161
1162    #[serde(default = "default::developer::memory_controller_threshold_stable")]
1163    pub memory_controller_threshold_stable: f64,
1164
1165    #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
1166    pub memory_controller_eviction_factor_aggressive: f64,
1167
1168    #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
1169    pub memory_controller_eviction_factor_graceful: f64,
1170
1171    #[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
1172    pub memory_controller_eviction_factor_stable: f64,
1173
1174    #[serde(default = "default::developer::memory_controller_update_interval_ms")]
1175    pub memory_controller_update_interval_ms: usize,
1176
1177    #[serde(default = "default::developer::memory_controller_sequence_tls_step")]
1178    pub memory_controller_sequence_tls_step: u64,
1179
1180    #[serde(default = "default::developer::memory_controller_sequence_tls_lag")]
1181    pub memory_controller_sequence_tls_lag: u64,
1182
1183    #[serde(default = "default::developer::stream_enable_arrangement_backfill")]
1184    /// Enable arrangement backfill
1185    /// If false, the arrangement backfill will be disabled,
1186    /// even if session variable set.
1187    /// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true)
1188    pub enable_arrangement_backfill: bool,
1189
1190    #[serde(default = "default::developer::stream_high_join_amplification_threshold")]
1191    /// If number of hash join matches exceeds this threshold number,
1192    /// it will be logged.
1193    pub high_join_amplification_threshold: usize,
1194
1195    /// Actor tokio metrics is enabled if `enable_actor_tokio_metrics` is set or metrics level >= Debug.
1196    #[serde(default = "default::developer::enable_actor_tokio_metrics")]
1197    pub enable_actor_tokio_metrics: bool,
1198
1199    /// The number of the connections for streaming remote exchange between two nodes.
1200    /// If not specified, the value of `server.connection_pool_size` will be used.
1201    #[serde(default = "default::developer::stream_exchange_connection_pool_size")]
1202    pub exchange_connection_pool_size: Option<u16>,
1203
1204    /// A flag to allow disabling the auto schema change handling
1205    #[serde(default = "default::developer::stream_enable_auto_schema_change")]
1206    pub enable_auto_schema_change: bool,
1207
1208    #[serde(default = "default::developer::enable_shared_source")]
1209    /// Enable shared source
1210    /// If false, the shared source will be disabled,
1211    /// even if session variable set.
1212    /// If true, it's decided by session variable `streaming_use_shared_source` (default true)
1213    pub enable_shared_source: bool,
1214
1215    #[serde(default = "default::developer::switch_jdbc_pg_to_native")]
1216    /// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..."
1217    /// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
1218    pub switch_jdbc_pg_to_native: bool,
1219
1220    /// The maximum number of consecutive barriers allowed in a message when sent between actors.
1221    #[serde(default = "default::developer::stream_max_barrier_batch_size")]
1222    pub max_barrier_batch_size: u32,
1223
1224    /// Configure the system-wide cache row cardinality of hash join.
1225    /// For example, if this is set to 1000, it means we can have at most 1000 rows in cache.
1226    #[serde(default = "default::developer::streaming_hash_join_entry_state_max_rows")]
1227    pub hash_join_entry_state_max_rows: usize,
1228
1229    /// Enable / Disable profiling stats used by `EXPLAIN ANALYZE`
1230    #[serde(default = "default::developer::enable_explain_analyze_stats")]
1231    pub enable_explain_analyze_stats: bool,
1232
1233    #[serde(default)]
1234    pub compute_client_config: RpcClientConfig,
1235
1236    /// `IcebergListExecutor`: The interval in seconds for Iceberg source to list new files.
1237    #[serde(default = "default::developer::iceberg_list_interval_sec")]
1238    pub iceberg_list_interval_sec: u64,
1239
1240    /// `IcebergFetchExecutor`: The number of files the executor will fetch concurrently in a batch.
1241    #[serde(default = "default::developer::iceberg_fetch_batch_size")]
1242    pub iceberg_fetch_batch_size: u64,
1243
1244    /// `IcebergSink`: The size of the cache for positional delete in the sink.
1245    #[serde(default = "default::developer::iceberg_sink_positional_delete_cache_size")]
1246    pub iceberg_sink_positional_delete_cache_size: usize,
1247}
1248
1249/// The subsections `[batch.developer]`.
1250///
1251/// It is put at [`BatchConfig::developer`].
1252#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
1253pub struct BatchDeveloperConfig {
1254    /// The capacity of the chunks in the channel that connects between `ConnectorSource` and
1255    /// `SourceExecutor`.
1256    #[serde(default = "default::developer::connector_message_buffer_size")]
1257    pub connector_message_buffer_size: usize,
1258
1259    /// The size of the channel used for output to exchange/shuffle.
1260    #[serde(default = "default::developer::batch_output_channel_size")]
1261    pub output_channel_size: usize,
1262
1263    #[serde(default = "default::developer::batch_receiver_channel_size")]
1264    pub receiver_channel_size: usize,
1265
1266    #[serde(default = "default::developer::batch_root_stage_channel_size")]
1267    pub root_stage_channel_size: usize,
1268
1269    /// The size of a chunk produced by `RowSeqScanExecutor`
1270    #[serde(default = "default::developer::batch_chunk_size")]
1271    pub chunk_size: usize,
1272
1273    /// The number of the connections for batch remote exchange between two nodes.
1274    /// If not specified, the value of `server.connection_pool_size` will be used.
1275    #[serde(default = "default::developer::batch_exchange_connection_pool_size")]
1276    exchange_connection_pool_size: Option<u16>,
1277
1278    #[serde(default)]
1279    pub compute_client_config: RpcClientConfig,
1280}
1281
1282macro_rules! define_system_config {
1283    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => {
1284        paste::paste!(
1285            /// The section `[system]` in `risingwave.toml`. All these fields are used to initialize the system
1286            /// parameters persisted in Meta store. Most fields are for testing purpose only and should not be
1287            /// documented.
1288            #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
1289            pub struct SystemConfig {
1290                $(
1291                    #[doc = $doc]
1292                    #[serde(default = "default::system::" $field "_opt")]
1293                    pub $field: Option<$type>,
1294                )*
1295            }
1296        );
1297    };
1298}
1299
1300for_all_params!(define_system_config);
1301
1302/// The subsections `[storage.object_store]`.
1303#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
1304pub struct ObjectStoreConfig {
1305    // alias is for backward compatibility
1306    #[serde(
1307        default = "default::object_store_config::set_atomic_write_dir",
1308        alias = "object_store_set_atomic_write_dir"
1309    )]
1310    pub set_atomic_write_dir: bool,
1311
1312    /// Retry and timeout configuration
1313    /// Description retry strategy driven by exponential back-off
1314    /// Exposes the timeout and retries of each Object store interface. Therefore, the total timeout for each interface is determined based on the interface's timeout/retry configuration and the exponential back-off policy.
1315    #[serde(default)]
1316    pub retry: ObjectStoreRetryConfig,
1317
1318    /// Some special configuration of S3 Backend
1319    #[serde(default)]
1320    pub s3: S3ObjectStoreConfig,
1321
1322    // TODO: the following field will be deprecated after opendal is stabilized
1323    #[serde(default = "default::object_store_config::opendal_upload_concurrency")]
1324    pub opendal_upload_concurrency: usize,
1325
1326    // TODO: the following field will be deprecated after opendal is stabilized
1327    #[serde(default)]
1328    pub opendal_writer_abort_on_err: bool,
1329
1330    #[serde(default = "default::object_store_config::upload_part_size")]
1331    pub upload_part_size: usize,
1332}
1333
1334impl ObjectStoreConfig {
1335    pub fn set_atomic_write_dir(&mut self) {
1336        self.set_atomic_write_dir = true;
1337    }
1338}
1339
1340/// The subsections `[storage.object_store.s3]`.
1341#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
1342pub struct S3ObjectStoreConfig {
1343    // alias is for backward compatibility
1344    #[serde(
1345        default = "default::object_store_config::s3::keepalive_ms",
1346        alias = "object_store_keepalive_ms"
1347    )]
1348    pub keepalive_ms: Option<u64>,
1349    #[serde(
1350        default = "default::object_store_config::s3::recv_buffer_size",
1351        alias = "object_store_recv_buffer_size"
1352    )]
1353    pub recv_buffer_size: Option<usize>,
1354    #[serde(
1355        default = "default::object_store_config::s3::send_buffer_size",
1356        alias = "object_store_send_buffer_size"
1357    )]
1358    pub send_buffer_size: Option<usize>,
1359    #[serde(
1360        default = "default::object_store_config::s3::nodelay",
1361        alias = "object_store_nodelay"
1362    )]
1363    pub nodelay: Option<bool>,
1364    /// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
1365    #[serde(default = "default::object_store_config::s3::developer::retry_unknown_service_error")]
1366    pub retry_unknown_service_error: bool,
1367    #[serde(default = "default::object_store_config::s3::identity_resolution_timeout_s")]
1368    pub identity_resolution_timeout_s: u64,
1369    #[serde(default)]
1370    pub developer: S3ObjectStoreDeveloperConfig,
1371}
1372
1373/// The subsections `[storage.object_store.s3.developer]`.
1374#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
1375pub struct S3ObjectStoreDeveloperConfig {
1376    /// Whether to retry s3 sdk error from which no error metadata is provided.
1377    #[serde(
1378        default = "default::object_store_config::s3::developer::retry_unknown_service_error",
1379        alias = "object_store_retry_unknown_service_error"
1380    )]
1381    pub retry_unknown_service_error: bool,
1382    /// An array of error codes that should be retried.
1383    /// e.g. `["SlowDown", "TooManyRequests"]`
1384    #[serde(
1385        default = "default::object_store_config::s3::developer::retryable_service_error_codes",
1386        alias = "object_store_retryable_service_error_codes"
1387    )]
1388    pub retryable_service_error_codes: Vec<String>,
1389
1390    // TODO: deprecate this config when we are completely deprecate aws sdk.
1391    #[serde(default = "default::object_store_config::s3::developer::use_opendal")]
1392    pub use_opendal: bool,
1393}
1394
1395#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
1396pub struct ObjectStoreRetryConfig {
1397    // A retry strategy driven by exponential back-off.
1398    // The retry strategy is used for all object store operations.
1399    /// Given a base duration for retry strategy in milliseconds.
1400    #[serde(default = "default::object_store_config::object_store_req_backoff_interval_ms")]
1401    pub req_backoff_interval_ms: u64,
1402
1403    /// The max delay interval for the retry strategy. No retry delay will be longer than this `Duration`.
1404    #[serde(default = "default::object_store_config::object_store_req_backoff_max_delay_ms")]
1405    pub req_backoff_max_delay_ms: u64,
1406
1407    /// A multiplicative factor that will be applied to the exponential back-off retry delay.
1408    #[serde(default = "default::object_store_config::object_store_req_backoff_factor")]
1409    pub req_backoff_factor: u64,
1410
1411    /// Maximum timeout for `upload` operation
1412    #[serde(default = "default::object_store_config::object_store_upload_attempt_timeout_ms")]
1413    pub upload_attempt_timeout_ms: u64,
1414
1415    /// Total counts of `upload` operation retries
1416    #[serde(default = "default::object_store_config::object_store_upload_retry_attempts")]
1417    pub upload_retry_attempts: usize,
1418
1419    /// Maximum timeout for `streaming_upload_init` and `streaming_upload`
1420    #[serde(
1421        default = "default::object_store_config::object_store_streaming_upload_attempt_timeout_ms"
1422    )]
1423    pub streaming_upload_attempt_timeout_ms: u64,
1424
1425    /// Total counts of `streaming_upload` operation retries
1426    #[serde(
1427        default = "default::object_store_config::object_store_streaming_upload_retry_attempts"
1428    )]
1429    pub streaming_upload_retry_attempts: usize,
1430
1431    /// Maximum timeout for `read` operation
1432    #[serde(default = "default::object_store_config::object_store_read_attempt_timeout_ms")]
1433    pub read_attempt_timeout_ms: u64,
1434
1435    /// Total counts of `read` operation retries
1436    #[serde(default = "default::object_store_config::object_store_read_retry_attempts")]
1437    pub read_retry_attempts: usize,
1438
1439    /// Maximum timeout for `streaming_read_init` and `streaming_read` operation
1440    #[serde(
1441        default = "default::object_store_config::object_store_streaming_read_attempt_timeout_ms"
1442    )]
1443    pub streaming_read_attempt_timeout_ms: u64,
1444
1445    /// Total counts of `streaming_read operation` retries
1446    #[serde(default = "default::object_store_config::object_store_streaming_read_retry_attempts")]
1447    pub streaming_read_retry_attempts: usize,
1448
1449    /// Maximum timeout for `metadata` operation
1450    #[serde(default = "default::object_store_config::object_store_metadata_attempt_timeout_ms")]
1451    pub metadata_attempt_timeout_ms: u64,
1452
1453    /// Total counts of `metadata` operation retries
1454    #[serde(default = "default::object_store_config::object_store_metadata_retry_attempts")]
1455    pub metadata_retry_attempts: usize,
1456
1457    /// Maximum timeout for `delete` operation
1458    #[serde(default = "default::object_store_config::object_store_delete_attempt_timeout_ms")]
1459    pub delete_attempt_timeout_ms: u64,
1460
1461    /// Total counts of `delete` operation retries
1462    #[serde(default = "default::object_store_config::object_store_delete_retry_attempts")]
1463    pub delete_retry_attempts: usize,
1464
1465    /// Maximum timeout for `delete_object` operation
1466    #[serde(
1467        default = "default::object_store_config::object_store_delete_objects_attempt_timeout_ms"
1468    )]
1469    pub delete_objects_attempt_timeout_ms: u64,
1470
1471    /// Total counts of `delete_object` operation retries
1472    #[serde(default = "default::object_store_config::object_store_delete_objects_retry_attempts")]
1473    pub delete_objects_retry_attempts: usize,
1474
1475    /// Maximum timeout for `list` operation
1476    #[serde(default = "default::object_store_config::object_store_list_attempt_timeout_ms")]
1477    pub list_attempt_timeout_ms: u64,
1478
1479    /// Total counts of `list` operation retries
1480    #[serde(default = "default::object_store_config::object_store_list_retry_attempts")]
1481    pub list_retry_attempts: usize,
1482}
1483
1484impl SystemConfig {
1485    #![allow(deprecated)]
1486    pub fn into_init_system_params(self) -> SystemParams {
1487        macro_rules! fields {
1488            ($({ $field:ident, $($rest:tt)* },)*) => {
1489                SystemParams {
1490                    $($field: self.$field.map(Into::into),)*
1491                    ..Default::default() // deprecated fields
1492                }
1493            };
1494        }
1495
1496        let mut system_params = for_all_params!(fields);
1497
1498        // Initialize backup_storage_url and backup_storage_directory if not set.
1499        if let Some(state_store) = &system_params.state_store
1500            && let Some(data_directory) = &system_params.data_directory
1501        {
1502            if system_params.backup_storage_url.is_none() {
1503                if let Some(hummock_state_store) = state_store.strip_prefix("hummock+") {
1504                    system_params.backup_storage_url = Some(hummock_state_store.to_owned());
1505                } else {
1506                    system_params.backup_storage_url = Some("memory".to_owned());
1507                }
1508                tracing::info!("initialize backup_storage_url based on state_store");
1509            }
1510            if system_params.backup_storage_directory.is_none() {
1511                system_params.backup_storage_directory = Some(format!("{data_directory}/backup"));
1512                tracing::info!("initialize backup_storage_directory based on data_directory");
1513            }
1514        }
1515        system_params
1516    }
1517}
1518
1519impl RwConfig {
1520    pub const fn default_connection_pool_size(&self) -> u16 {
1521        self.server.connection_pool_size
1522    }
1523
1524    /// Returns [`StreamingDeveloperConfig::exchange_connection_pool_size`] if set,
1525    /// otherwise [`ServerConfig::connection_pool_size`].
1526    pub fn streaming_exchange_connection_pool_size(&self) -> u16 {
1527        self.streaming
1528            .developer
1529            .exchange_connection_pool_size
1530            .unwrap_or_else(|| self.default_connection_pool_size())
1531    }
1532
1533    /// Returns [`BatchDeveloperConfig::exchange_connection_pool_size`] if set,
1534    /// otherwise [`ServerConfig::connection_pool_size`].
1535    pub fn batch_exchange_connection_pool_size(&self) -> u16 {
1536        self.batch
1537            .developer
1538            .exchange_connection_pool_size
1539            .unwrap_or_else(|| self.default_connection_pool_size())
1540    }
1541}
1542
1543pub mod default {
1544    pub mod meta {
1545        use crate::config::{DefaultParallelism, MetaBackend};
1546
1547        pub fn min_sst_retention_time_sec() -> u64 {
1548            3600 * 6
1549        }
1550
1551        pub fn gc_history_retention_time_sec() -> u64 {
1552            3600 * 6
1553        }
1554
1555        pub fn full_gc_interval_sec() -> u64 {
1556            3600
1557        }
1558
1559        pub fn full_gc_object_limit() -> u64 {
1560            100_000
1561        }
1562
1563        pub fn max_inflight_time_travel_query() -> u64 {
1564            1000
1565        }
1566
1567        pub fn periodic_compaction_interval_sec() -> u64 {
1568            60
1569        }
1570
1571        pub fn vacuum_interval_sec() -> u64 {
1572            30
1573        }
1574
1575        pub fn vacuum_spin_interval_ms() -> u64 {
1576            100
1577        }
1578
1579        pub fn hummock_version_checkpoint_interval_sec() -> u64 {
1580            30
1581        }
1582
1583        pub fn enable_hummock_data_archive() -> bool {
1584            false
1585        }
1586
1587        pub fn hummock_time_travel_snapshot_interval() -> u64 {
1588            100
1589        }
1590
1591        pub fn min_delta_log_num_for_hummock_version_checkpoint() -> u64 {
1592            10
1593        }
1594
1595        pub fn max_heartbeat_interval_sec() -> u32 {
1596            60
1597        }
1598
1599        pub fn meta_leader_lease_secs() -> u64 {
1600            30
1601        }
1602
1603        pub fn default_parallelism() -> DefaultParallelism {
1604            DefaultParallelism::Full
1605        }
1606
1607        pub fn node_num_monitor_interval_sec() -> u64 {
1608            10
1609        }
1610
1611        pub fn backend() -> MetaBackend {
1612            MetaBackend::Mem
1613        }
1614
1615        pub fn periodic_space_reclaim_compaction_interval_sec() -> u64 {
1616            3600 // 60min
1617        }
1618
1619        pub fn periodic_ttl_reclaim_compaction_interval_sec() -> u64 {
1620            1800 // 30mi
1621        }
1622
1623        pub fn periodic_scheduling_compaction_group_split_interval_sec() -> u64 {
1624            10 // 10s
1625        }
1626
1627        pub fn periodic_tombstone_reclaim_compaction_interval_sec() -> u64 {
1628            600
1629        }
1630
1631        // limit the size of state table to trigger split by high throughput
1632        pub fn move_table_size_limit() -> u64 {
1633            10 * 1024 * 1024 * 1024 // 10GB
1634        }
1635
1636        // limit the size of group to trigger split by group_size and avoid too many small groups
1637        pub fn split_group_size_limit() -> u64 {
1638            64 * 1024 * 1024 * 1024 // 64GB
1639        }
1640
1641        pub fn partition_vnode_count() -> u32 {
1642            16
1643        }
1644
1645        pub fn table_high_write_throughput_threshold() -> u64 {
1646            16 * 1024 * 1024 // 16MB
1647        }
1648
1649        pub fn table_low_write_throughput_threshold() -> u64 {
1650            4 * 1024 * 1024 // 4MB
1651        }
1652
1653        pub fn compaction_task_max_heartbeat_interval_secs() -> u64 {
1654            30 // 30s
1655        }
1656
1657        pub fn compaction_task_max_progress_interval_secs() -> u64 {
1658            60 * 10 // 10min
1659        }
1660
1661        pub fn cut_table_size_limit() -> u64 {
1662            1024 * 1024 * 1024 // 1GB
1663        }
1664
1665        pub fn hybrid_partition_vnode_count() -> u32 {
1666            4
1667        }
1668
1669        pub fn compact_task_table_size_partition_threshold_low() -> u64 {
1670            128 * 1024 * 1024 // 128MB
1671        }
1672
1673        pub fn compact_task_table_size_partition_threshold_high() -> u64 {
1674            512 * 1024 * 1024 // 512MB
1675        }
1676
1677        pub fn event_log_enabled() -> bool {
1678            true
1679        }
1680
1681        pub fn event_log_channel_max_size() -> u32 {
1682            10
1683        }
1684
1685        pub fn parallelism_control_batch_size() -> usize {
1686            10
1687        }
1688
1689        pub fn parallelism_control_trigger_period_sec() -> u64 {
1690            10
1691        }
1692
1693        pub fn parallelism_control_trigger_first_delay_sec() -> u64 {
1694            30
1695        }
1696
1697        pub fn enable_dropped_column_reclaim() -> bool {
1698            false
1699        }
1700
1701        pub fn split_group_size_ratio() -> f64 {
1702            0.9
1703        }
1704
1705        pub fn table_stat_high_write_throughput_ratio_for_split() -> f64 {
1706            0.5
1707        }
1708
1709        pub fn table_stat_low_write_throughput_ratio_for_merge() -> f64 {
1710            0.7
1711        }
1712
1713        pub fn table_stat_throuput_window_seconds_for_split() -> usize {
1714            60
1715        }
1716
1717        pub fn table_stat_throuput_window_seconds_for_merge() -> usize {
1718            240
1719        }
1720
1721        pub fn periodic_scheduling_compaction_group_merge_interval_sec() -> u64 {
1722            60 * 10 // 10min
1723        }
1724    }
1725
1726    pub mod server {
1727        use crate::config::MetricLevel;
1728
1729        pub fn heartbeat_interval_ms() -> u32 {
1730            1000
1731        }
1732
1733        pub fn connection_pool_size() -> u16 {
1734            16
1735        }
1736
1737        pub fn metrics_level() -> MetricLevel {
1738            MetricLevel::Info
1739        }
1740
1741        pub fn telemetry_enabled() -> bool {
1742            true
1743        }
1744
1745        pub fn grpc_max_reset_stream_size() -> u32 {
1746            200
1747        }
1748    }
1749
1750    pub mod storage {
1751        pub fn share_buffers_sync_parallelism() -> u32 {
1752            1
1753        }
1754
1755        pub fn share_buffer_compaction_worker_threads_number() -> u32 {
1756            4
1757        }
1758
1759        pub fn shared_buffer_capacity_mb() -> usize {
1760            1024
1761        }
1762
1763        pub fn shared_buffer_flush_ratio() -> f32 {
1764            0.8
1765        }
1766
1767        pub fn shared_buffer_min_batch_flush_size_mb() -> usize {
1768            800
1769        }
1770
1771        pub fn imm_merge_threshold() -> usize {
1772            0 // disable
1773        }
1774
1775        pub fn write_conflict_detection_enabled() -> bool {
1776            cfg!(debug_assertions)
1777        }
1778
1779        pub fn max_cached_recent_versions_number() -> usize {
1780            60
1781        }
1782
1783        pub fn block_cache_capacity_mb() -> usize {
1784            512
1785        }
1786
1787        pub fn high_priority_ratio_in_percent() -> usize {
1788            70
1789        }
1790
1791        pub fn window_capacity_ratio_in_percent() -> usize {
1792            10
1793        }
1794
1795        pub fn protected_capacity_ratio_in_percent() -> usize {
1796            80
1797        }
1798
1799        pub fn cmsketch_eps() -> f64 {
1800            0.002
1801        }
1802
1803        pub fn cmsketch_confidence() -> f64 {
1804            0.95
1805        }
1806
1807        pub fn small_queue_capacity_ratio_in_percent() -> usize {
1808            10
1809        }
1810
1811        pub fn ghost_queue_capacity_ratio_in_percent() -> usize {
1812            1000
1813        }
1814
1815        pub fn small_to_main_freq_threshold() -> u8 {
1816            1
1817        }
1818
1819        pub fn meta_cache_capacity_mb() -> usize {
1820            128
1821        }
1822
1823        pub fn disable_remote_compactor() -> bool {
1824            false
1825        }
1826
1827        pub fn share_buffer_upload_concurrency() -> usize {
1828            8
1829        }
1830
1831        pub fn compactor_memory_limit_mb() -> usize {
1832            512
1833        }
1834
1835        pub fn compactor_max_task_multiplier() -> f32 {
1836            3.0000
1837        }
1838
1839        pub fn compactor_memory_available_proportion() -> f64 {
1840            0.8
1841        }
1842
1843        pub fn sstable_id_remote_fetch_number() -> u32 {
1844            10
1845        }
1846
1847        pub fn min_sstable_size_mb() -> u32 {
1848            32
1849        }
1850
1851        pub fn min_sst_size_for_streaming_upload() -> u64 {
1852            // 32MB
1853            32 * 1024 * 1024
1854        }
1855
1856        pub fn max_concurrent_compaction_task_number() -> u64 {
1857            16
1858        }
1859
1860        pub fn max_preload_wait_time_mill() -> u64 {
1861            0
1862        }
1863
1864        pub fn max_version_pinning_duration_sec() -> u64 {
1865            3 * 3600
1866        }
1867
1868        pub fn compactor_max_sst_key_count() -> u64 {
1869            2 * 1024 * 1024 // 200w
1870        }
1871
1872        pub fn compact_iter_recreate_timeout_ms() -> u64 {
1873            10 * 60 * 1000
1874        }
1875
1876        pub fn compactor_iter_max_io_retry_times() -> usize {
1877            8
1878        }
1879
1880        pub fn compactor_max_sst_size() -> u64 {
1881            512 * 1024 * 1024 // 512m
1882        }
1883
1884        pub fn enable_fast_compaction() -> bool {
1885            true
1886        }
1887
1888        pub fn check_compaction_result() -> bool {
1889            false
1890        }
1891
1892        pub fn max_preload_io_retry_times() -> usize {
1893            3
1894        }
1895
1896        pub fn mem_table_spill_threshold() -> usize {
1897            4 << 20
1898        }
1899
1900        pub fn compactor_fast_max_compact_delete_ratio() -> u32 {
1901            40
1902        }
1903
1904        pub fn compactor_fast_max_compact_task_size() -> u64 {
1905            2 * 1024 * 1024 * 1024 // 2g
1906        }
1907
1908        pub fn max_prefetch_block_number() -> usize {
1909            16
1910        }
1911
1912        pub fn compactor_concurrent_uploading_sst_count() -> Option<usize> {
1913            None
1914        }
1915
1916        pub fn compactor_max_overlap_sst_count() -> usize {
1917            64
1918        }
1919
1920        pub fn compactor_max_preload_meta_file_count() -> usize {
1921            32
1922        }
1923
1924        // deprecated
1925        pub fn table_info_statistic_history_times() -> usize {
1926            240
1927        }
1928
1929        pub fn block_file_cache_flush_buffer_threshold_mb() -> usize {
1930            256
1931        }
1932
1933        pub fn meta_file_cache_flush_buffer_threshold_mb() -> usize {
1934            64
1935        }
1936
1937        pub fn time_travel_version_cache_capacity() -> u64 {
1938            10
1939        }
1940    }
1941
1942    pub mod streaming {
1943        use crate::config::AsyncStackTraceOption;
1944
1945        pub fn in_flight_barrier_nums() -> usize {
1946            // quick fix
1947            // TODO: remove this limitation from code
1948            10000
1949        }
1950
1951        pub fn async_stack_trace() -> AsyncStackTraceOption {
1952            AsyncStackTraceOption::default()
1953        }
1954
1955        pub fn unique_user_stream_errors() -> usize {
1956            10
1957        }
1958
1959        pub fn unsafe_enable_strict_consistency() -> bool {
1960            true
1961        }
1962    }
1963
1964    pub mod file_cache {
1965        use foyer::{Compression, RecoverMode, RuntimeOptions, TokioRuntimeOptions};
1966
1967        pub fn dir() -> String {
1968            "".to_owned()
1969        }
1970
1971        pub fn capacity_mb() -> usize {
1972            1024
1973        }
1974
1975        pub fn file_capacity_mb() -> usize {
1976            64
1977        }
1978
1979        pub fn flushers() -> usize {
1980            4
1981        }
1982
1983        pub fn reclaimers() -> usize {
1984            4
1985        }
1986
1987        pub fn recover_concurrency() -> usize {
1988            8
1989        }
1990
1991        pub fn insert_rate_limit_mb() -> usize {
1992            0
1993        }
1994
1995        pub fn indexer_shards() -> usize {
1996            64
1997        }
1998
1999        pub fn compression() -> Compression {
2000            Compression::None
2001        }
2002
2003        pub fn flush_buffer_threshold_mb() -> Option<usize> {
2004            None
2005        }
2006
2007        pub fn recover_mode() -> RecoverMode {
2008            RecoverMode::Quiet
2009        }
2010
2011        pub fn runtime_config() -> RuntimeOptions {
2012            RuntimeOptions::Unified(TokioRuntimeOptions::default())
2013        }
2014    }
2015
2016    pub mod cache_refill {
2017        pub fn data_refill_levels() -> Vec<u32> {
2018            vec![]
2019        }
2020
2021        pub fn timeout_ms() -> u64 {
2022            6000
2023        }
2024
2025        pub fn concurrency() -> usize {
2026            10
2027        }
2028
2029        pub fn unit() -> usize {
2030            64
2031        }
2032
2033        pub fn threshold() -> f64 {
2034            0.5
2035        }
2036
2037        pub fn recent_filter_layers() -> usize {
2038            6
2039        }
2040
2041        pub fn recent_filter_rotate_interval_ms() -> usize {
2042            10000
2043        }
2044    }
2045
2046    pub mod heap_profiling {
2047        pub fn enable_auto() -> bool {
2048            true
2049        }
2050
2051        pub fn threshold_auto() -> f32 {
2052            0.9
2053        }
2054
2055        pub fn dir() -> String {
2056            "./".to_owned()
2057        }
2058    }
2059
2060    pub mod developer {
2061        pub fn meta_cached_traces_num() -> u32 {
2062            256
2063        }
2064
2065        pub fn meta_cached_traces_memory_limit_bytes() -> usize {
2066            1 << 27 // 128 MiB
2067        }
2068
2069        pub fn batch_output_channel_size() -> usize {
2070            64
2071        }
2072
2073        pub fn batch_receiver_channel_size() -> usize {
2074            1000
2075        }
2076
2077        pub fn batch_root_stage_channel_size() -> usize {
2078            100
2079        }
2080
2081        pub fn batch_chunk_size() -> usize {
2082            1024
2083        }
2084
2085        /// Default to unset to be compatible with the behavior before this config is introduced,
2086        /// that is, follow the value of `server.connection_pool_size`.
2087        pub fn batch_exchange_connection_pool_size() -> Option<u16> {
2088            None
2089        }
2090
2091        pub fn stream_enable_executor_row_count() -> bool {
2092            false
2093        }
2094
2095        pub fn connector_message_buffer_size() -> usize {
2096            16
2097        }
2098
2099        pub fn unsafe_stream_extreme_cache_size() -> usize {
2100            10
2101        }
2102
2103        pub fn stream_chunk_size() -> usize {
2104            256
2105        }
2106
2107        pub fn stream_exchange_initial_permits() -> usize {
2108            2048
2109        }
2110
2111        pub fn stream_exchange_batched_permits() -> usize {
2112            256
2113        }
2114
2115        pub fn stream_exchange_concurrent_barriers() -> usize {
2116            1
2117        }
2118
2119        pub fn stream_exchange_concurrent_dispatchers() -> usize {
2120            0
2121        }
2122
2123        pub fn stream_dml_channel_initial_permits() -> usize {
2124            32768
2125        }
2126
2127        pub fn stream_max_barrier_batch_size() -> u32 {
2128            1024
2129        }
2130
2131        pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
2132            64 << 20 // 64MB
2133        }
2134
2135        pub fn enable_trivial_move() -> bool {
2136            true
2137        }
2138
2139        pub fn enable_check_task_level_overlap() -> bool {
2140            false
2141        }
2142
2143        pub fn max_trivial_move_task_count_per_loop() -> usize {
2144            256
2145        }
2146
2147        pub fn max_get_task_probe_times() -> usize {
2148            5
2149        }
2150
2151        pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
2152            100
2153        }
2154
2155        pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
2156            400
2157        }
2158
2159        pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
2160            10_000
2161        }
2162
2163        pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize {
2164            100
2165        }
2166
2167        pub fn time_travel_vacuum_interval_sec() -> u64 {
2168            30
2169        }
2170        pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
2171            1000
2172        }
2173
2174        pub fn hummock_gc_history_insert_batch_size() -> usize {
2175            1000
2176        }
2177
2178        pub fn hummock_time_travel_filter_out_objects_batch_size() -> usize {
2179            1000
2180        }
2181
2182        pub fn memory_controller_threshold_aggressive() -> f64 {
2183            0.9
2184        }
2185
2186        pub fn memory_controller_threshold_graceful() -> f64 {
2187            0.81
2188        }
2189
2190        pub fn memory_controller_threshold_stable() -> f64 {
2191            0.72
2192        }
2193
2194        pub fn memory_controller_eviction_factor_aggressive() -> f64 {
2195            2.0
2196        }
2197
2198        pub fn memory_controller_eviction_factor_graceful() -> f64 {
2199            1.5
2200        }
2201
2202        pub fn memory_controller_eviction_factor_stable() -> f64 {
2203            1.0
2204        }
2205
2206        pub fn memory_controller_update_interval_ms() -> usize {
2207            100
2208        }
2209
2210        pub fn memory_controller_sequence_tls_step() -> u64 {
2211            128
2212        }
2213
2214        pub fn memory_controller_sequence_tls_lag() -> u64 {
2215            32
2216        }
2217
2218        pub fn stream_enable_arrangement_backfill() -> bool {
2219            true
2220        }
2221
2222        pub fn enable_shared_source() -> bool {
2223            true
2224        }
2225
2226        pub fn stream_high_join_amplification_threshold() -> usize {
2227            2048
2228        }
2229
2230        /// Default to 1 to be compatible with the behavior before this config is introduced.
2231        pub fn stream_exchange_connection_pool_size() -> Option<u16> {
2232            Some(1)
2233        }
2234
2235        pub fn enable_actor_tokio_metrics() -> bool {
2236            false
2237        }
2238
2239        pub fn stream_enable_auto_schema_change() -> bool {
2240            true
2241        }
2242
2243        pub fn switch_jdbc_pg_to_native() -> bool {
2244            false
2245        }
2246
2247        pub fn streaming_hash_join_entry_state_max_rows() -> usize {
2248            // NOTE(kwannoel): This is just an arbitrary number.
2249            30000
2250        }
2251
2252        pub fn enable_explain_analyze_stats() -> bool {
2253            true
2254        }
2255
2256        pub fn rpc_client_connect_timeout_secs() -> u64 {
2257            5
2258        }
2259
2260        pub fn iceberg_list_interval_sec() -> u64 {
2261            1
2262        }
2263
2264        pub fn iceberg_fetch_batch_size() -> u64 {
2265            1024
2266        }
2267
2268        pub fn iceberg_sink_positional_delete_cache_size() -> usize {
2269            1024
2270        }
2271    }
2272
2273    pub use crate::system_param::default as system;
2274
2275    pub mod batch {
2276        pub fn enable_barrier_read() -> bool {
2277            false
2278        }
2279
2280        pub fn enable_spill() -> bool {
2281            true
2282        }
2283
2284        pub fn statement_timeout_in_sec() -> u32 {
2285            // 1 hour
2286            60 * 60
2287        }
2288
2289        pub fn frontend_compute_runtime_worker_threads() -> usize {
2290            4
2291        }
2292
2293        pub fn mask_worker_temporary_secs() -> usize {
2294            30
2295        }
2296
2297        pub fn redact_sql_option_keywords() -> Vec<String> {
2298            [
2299                "credential",
2300                "key",
2301                "password",
2302                "private",
2303                "secret",
2304                "token",
2305            ]
2306            .into_iter()
2307            .map(str::to_string)
2308            .collect()
2309        }
2310    }
2311
2312    pub mod frontend {
2313        pub fn max_total_query_size_bytes() -> u64 {
2314            1024 * 1024 * 1024
2315        }
2316
2317        pub fn min_single_query_size_bytes() -> u64 {
2318            1024 * 1024
2319        }
2320
2321        pub fn max_single_query_size_bytes() -> u64 {
2322            1024 * 1024 * 1024
2323        }
2324    }
2325
2326    pub mod udf {
2327        pub fn enable_embedded_python_udf() -> bool {
2328            false
2329        }
2330
2331        pub fn enable_embedded_javascript_udf() -> bool {
2332            true
2333        }
2334
2335        pub fn enable_embedded_wasm_udf() -> bool {
2336            true
2337        }
2338    }
2339
2340    pub mod compaction_config {
2341        const MB: u64 = 1024 * 1024;
2342        const GB: u64 = 1024 * 1024 * 1024;
2343        const DEFAULT_MAX_COMPACTION_BYTES: u64 = 2 * GB; // 2GB
2344        const DEFAULT_MIN_COMPACTION_BYTES: u64 = 128 * MB; // 128MB
2345        const DEFAULT_MAX_BYTES_FOR_LEVEL_BASE: u64 = 512 * MB; // 512MB
2346
2347        // decrease this configure when the generation of checkpoint barrier is not frequent.
2348        const DEFAULT_TIER_COMPACT_TRIGGER_NUMBER: u64 = 12;
2349        const DEFAULT_TARGET_FILE_SIZE_BASE: u64 = 32 * MB;
2350        // 32MB
2351        const DEFAULT_MAX_SUB_COMPACTION: u32 = 4;
2352        const DEFAULT_LEVEL_MULTIPLIER: u64 = 5;
2353        const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * MB; // 512MB;
2354        const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 300;
2355        const DEFAULT_MAX_COMPACTION_FILE_COUNT: u64 = 100;
2356        const DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 3;
2357        const DEFAULT_MIN_OVERLAPPING_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 12;
2358        const DEFAULT_TOMBSTONE_RATIO_PERCENT: u32 = 40;
2359        const DEFAULT_EMERGENCY_PICKER: bool = true;
2360        const DEFAULT_MAX_LEVEL: u32 = 6;
2361        const DEFAULT_MAX_L0_COMPACT_LEVEL_COUNT: u32 = 42;
2362        const DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MIN_SIZE: u64 = 4 * MB;
2363        const DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MAX_COUNT: u32 = 64;
2364        const DEFAULT_EMERGENCY_LEVEL0_SST_FILE_COUNT: u32 = 2000; // > 50G / 32M = 1600
2365        const DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION: u32 = 256;
2366        const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT: u32 = 10000; // 10000 * 32M = 320G
2367        const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SIZE: u64 = 300 * 1024 * MB; // 300GB
2368
2369        use crate::catalog::hummock::CompactionFilterFlag;
2370
2371        pub fn max_bytes_for_level_base() -> u64 {
2372            DEFAULT_MAX_BYTES_FOR_LEVEL_BASE
2373        }
2374
2375        pub fn max_bytes_for_level_multiplier() -> u64 {
2376            DEFAULT_LEVEL_MULTIPLIER
2377        }
2378
2379        pub fn max_compaction_bytes() -> u64 {
2380            DEFAULT_MAX_COMPACTION_BYTES
2381        }
2382
2383        pub fn sub_level_max_compaction_bytes() -> u64 {
2384            DEFAULT_MIN_COMPACTION_BYTES
2385        }
2386
2387        pub fn level0_tier_compact_file_number() -> u64 {
2388            DEFAULT_TIER_COMPACT_TRIGGER_NUMBER
2389        }
2390
2391        pub fn target_file_size_base() -> u64 {
2392            DEFAULT_TARGET_FILE_SIZE_BASE
2393        }
2394
2395        pub fn compaction_filter_mask() -> u32 {
2396            (CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL).into()
2397        }
2398
2399        pub fn max_sub_compaction() -> u32 {
2400            DEFAULT_MAX_SUB_COMPACTION
2401        }
2402
2403        pub fn level0_stop_write_threshold_sub_level_number() -> u64 {
2404            DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER
2405        }
2406
2407        pub fn level0_sub_level_compact_level_count() -> u32 {
2408            DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT
2409        }
2410
2411        pub fn level0_overlapping_sub_level_compact_level_count() -> u32 {
2412            DEFAULT_MIN_OVERLAPPING_SUB_LEVEL_COMPACT_LEVEL_COUNT
2413        }
2414
2415        pub fn max_space_reclaim_bytes() -> u64 {
2416            DEFAULT_MAX_SPACE_RECLAIM_BYTES
2417        }
2418
2419        pub fn level0_max_compact_file_number() -> u64 {
2420            DEFAULT_MAX_COMPACTION_FILE_COUNT
2421        }
2422
2423        pub fn tombstone_reclaim_ratio() -> u32 {
2424            DEFAULT_TOMBSTONE_RATIO_PERCENT
2425        }
2426
2427        pub fn enable_emergency_picker() -> bool {
2428            DEFAULT_EMERGENCY_PICKER
2429        }
2430
2431        pub fn max_level() -> u32 {
2432            DEFAULT_MAX_LEVEL
2433        }
2434
2435        pub fn max_l0_compact_level_count() -> u32 {
2436            DEFAULT_MAX_L0_COMPACT_LEVEL_COUNT
2437        }
2438
2439        pub fn sst_allowed_trivial_move_min_size() -> u64 {
2440            DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MIN_SIZE
2441        }
2442
2443        pub fn disable_auto_group_scheduling() -> bool {
2444            false
2445        }
2446
2447        pub fn max_overlapping_level_size() -> u64 {
2448            256 * MB
2449        }
2450
2451        pub fn sst_allowed_trivial_move_max_count() -> u32 {
2452            DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MAX_COUNT
2453        }
2454
2455        pub fn emergency_level0_sst_file_count() -> u32 {
2456            DEFAULT_EMERGENCY_LEVEL0_SST_FILE_COUNT
2457        }
2458
2459        pub fn emergency_level0_sub_level_partition() -> u32 {
2460            DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION
2461        }
2462
2463        pub fn level0_stop_write_threshold_max_sst_count() -> u32 {
2464            DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT
2465        }
2466
2467        pub fn level0_stop_write_threshold_max_size() -> u64 {
2468            DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SIZE
2469        }
2470    }
2471
2472    pub mod object_store_config {
2473        const DEFAULT_REQ_BACKOFF_INTERVAL_MS: u64 = 1000; // 1s
2474        const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; // 10s
2475        const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3;
2476
2477        pub fn set_atomic_write_dir() -> bool {
2478            false
2479        }
2480
2481        pub fn object_store_req_backoff_interval_ms() -> u64 {
2482            DEFAULT_REQ_BACKOFF_INTERVAL_MS
2483        }
2484
2485        pub fn object_store_req_backoff_max_delay_ms() -> u64 {
2486            DEFAULT_REQ_BACKOFF_MAX_DELAY_MS // 10s
2487        }
2488
2489        pub fn object_store_req_backoff_factor() -> u64 {
2490            2
2491        }
2492
2493        pub fn object_store_upload_attempt_timeout_ms() -> u64 {
2494            8 * 1000 // 8s
2495        }
2496
2497        pub fn object_store_upload_retry_attempts() -> usize {
2498            DEFAULT_REQ_MAX_RETRY_ATTEMPTS
2499        }
2500
2501        // init + upload_part + finish
2502        pub fn object_store_streaming_upload_attempt_timeout_ms() -> u64 {
2503            5 * 1000 // 5s
2504        }
2505
2506        pub fn object_store_streaming_upload_retry_attempts() -> usize {
2507            DEFAULT_REQ_MAX_RETRY_ATTEMPTS
2508        }
2509
2510        // tips: depend on block_size
2511        pub fn object_store_read_attempt_timeout_ms() -> u64 {
2512            8 * 1000 // 8s
2513        }
2514
2515        pub fn object_store_read_retry_attempts() -> usize {
2516            DEFAULT_REQ_MAX_RETRY_ATTEMPTS
2517        }
2518
2519        pub fn object_store_streaming_read_attempt_timeout_ms() -> u64 {
2520            3 * 1000 // 3s
2521        }
2522
2523        pub fn object_store_streaming_read_retry_attempts() -> usize {
2524            DEFAULT_REQ_MAX_RETRY_ATTEMPTS
2525        }
2526
2527        pub fn object_store_metadata_attempt_timeout_ms() -> u64 {
2528            60 * 1000 // 1min
2529        }
2530
2531        pub fn object_store_metadata_retry_attempts() -> usize {
2532            DEFAULT_REQ_MAX_RETRY_ATTEMPTS
2533        }
2534
2535        pub fn object_store_delete_attempt_timeout_ms() -> u64 {
2536            5 * 1000
2537        }
2538
2539        pub fn object_store_delete_retry_attempts() -> usize {
2540            DEFAULT_REQ_MAX_RETRY_ATTEMPTS
2541        }
2542
2543        // tips: depend on batch size
2544        pub fn object_store_delete_objects_attempt_timeout_ms() -> u64 {
2545            5 * 1000
2546        }
2547
2548        pub fn object_store_delete_objects_retry_attempts() -> usize {
2549            DEFAULT_REQ_MAX_RETRY_ATTEMPTS
2550        }
2551
2552        pub fn object_store_list_attempt_timeout_ms() -> u64 {
2553            10 * 60 * 1000
2554        }
2555
2556        pub fn object_store_list_retry_attempts() -> usize {
2557            DEFAULT_REQ_MAX_RETRY_ATTEMPTS
2558        }
2559
2560        pub fn opendal_upload_concurrency() -> usize {
2561            256
2562        }
2563
2564        pub fn upload_part_size() -> usize {
2565            // 16m
2566            16 * 1024 * 1024
2567        }
2568
2569        pub mod s3 {
2570            const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5;
2571
2572            const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min
2573
2574            pub fn keepalive_ms() -> Option<u64> {
2575                Some(DEFAULT_KEEPALIVE_MS) // 10min
2576            }
2577
2578            pub fn recv_buffer_size() -> Option<usize> {
2579                Some(1 << 21) // 2m
2580            }
2581
2582            pub fn send_buffer_size() -> Option<usize> {
2583                None
2584            }
2585
2586            pub fn nodelay() -> Option<bool> {
2587                Some(true)
2588            }
2589
2590            pub fn identity_resolution_timeout_s() -> u64 {
2591                DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S
2592            }
2593
2594            pub mod developer {
2595                pub fn retry_unknown_service_error() -> bool {
2596                    false
2597                }
2598
2599                pub fn retryable_service_error_codes() -> Vec<String> {
2600                    vec!["SlowDown".into(), "TooManyRequests".into()]
2601                }
2602
2603                pub fn use_opendal() -> bool {
2604                    true
2605                }
2606            }
2607        }
2608    }
2609
2610    pub mod meta_store_config {
2611        const DEFAULT_MAX_CONNECTIONS: u32 = 10;
2612        const DEFAULT_MIN_CONNECTIONS: u32 = 1;
2613        const DEFAULT_CONNECTION_TIMEOUT_SEC: u64 = 10;
2614        const DEFAULT_IDLE_TIMEOUT_SEC: u64 = 30;
2615        const DEFAULT_ACQUIRE_TIMEOUT_SEC: u64 = 30;
2616
2617        pub fn max_connections() -> u32 {
2618            DEFAULT_MAX_CONNECTIONS
2619        }
2620
2621        pub fn min_connections() -> u32 {
2622            DEFAULT_MIN_CONNECTIONS
2623        }
2624
2625        pub fn connection_timeout_sec() -> u64 {
2626            DEFAULT_CONNECTION_TIMEOUT_SEC
2627        }
2628
2629        pub fn idle_timeout_sec() -> u64 {
2630            DEFAULT_IDLE_TIMEOUT_SEC
2631        }
2632
2633        pub fn acquire_timeout_sec() -> u64 {
2634            DEFAULT_ACQUIRE_TIMEOUT_SEC
2635        }
2636    }
2637}
2638
2639#[derive(Debug, Clone)]
2640pub enum EvictionConfig {
2641    Lru(LruConfig),
2642    Lfu(LfuConfig),
2643    S3Fifo(S3FifoConfig),
2644}
2645
2646impl EvictionConfig {
2647    pub fn for_test() -> Self {
2648        Self::Lru(LruConfig {
2649            high_priority_pool_ratio: 0.0,
2650        })
2651    }
2652}
2653
2654impl From<EvictionConfig> for foyer::EvictionConfig {
2655    fn from(value: EvictionConfig) -> Self {
2656        match value {
2657            EvictionConfig::Lru(lru) => foyer::EvictionConfig::Lru(lru),
2658            EvictionConfig::Lfu(lfu) => foyer::EvictionConfig::Lfu(lfu),
2659            EvictionConfig::S3Fifo(s3fifo) => foyer::EvictionConfig::S3Fifo(s3fifo),
2660        }
2661    }
2662}
2663
2664pub struct StorageMemoryConfig {
2665    pub block_cache_capacity_mb: usize,
2666    pub block_cache_shard_num: usize,
2667    pub meta_cache_capacity_mb: usize,
2668    pub meta_cache_shard_num: usize,
2669    pub shared_buffer_capacity_mb: usize,
2670    pub compactor_memory_limit_mb: usize,
2671    pub prefetch_buffer_capacity_mb: usize,
2672    pub block_cache_eviction_config: EvictionConfig,
2673    pub meta_cache_eviction_config: EvictionConfig,
2674    pub block_file_cache_flush_buffer_threshold_mb: usize,
2675    pub meta_file_cache_flush_buffer_threshold_mb: usize,
2676}
2677
2678pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
2679pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
2680pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.
2681
2682pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
2683    let block_cache_capacity_mb = s.storage.cache.block_cache_capacity_mb.unwrap_or(
2684        // adapt to old version
2685        s.storage
2686            .block_cache_capacity_mb
2687            .unwrap_or(default::storage::block_cache_capacity_mb()),
2688    );
2689    let meta_cache_capacity_mb = s.storage.cache.meta_cache_capacity_mb.unwrap_or(
2690        // adapt to old version
2691        s.storage
2692            .block_cache_capacity_mb
2693            .unwrap_or(default::storage::meta_cache_capacity_mb()),
2694    );
2695    let shared_buffer_capacity_mb = s
2696        .storage
2697        .shared_buffer_capacity_mb
2698        .unwrap_or(default::storage::shared_buffer_capacity_mb());
2699    let meta_cache_shard_num = s.storage.cache.meta_cache_shard_num.unwrap_or_else(|| {
2700        let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
2701        while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 {
2702            shard_bits -= 1;
2703        }
2704        shard_bits
2705    });
2706    let block_cache_shard_num = s.storage.cache.block_cache_shard_num.unwrap_or_else(|| {
2707        let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
2708        while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0
2709        {
2710            shard_bits -= 1;
2711        }
2712        shard_bits
2713    });
2714    let compactor_memory_limit_mb = s
2715        .storage
2716        .compactor_memory_limit_mb
2717        .unwrap_or(default::storage::compactor_memory_limit_mb());
2718
2719    let get_eviction_config = |c: &CacheEvictionConfig| {
2720        match c {
2721            CacheEvictionConfig::Lru {
2722                high_priority_ratio_in_percent,
2723            } => EvictionConfig::Lru(LruConfig {
2724                high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
2725                    // adapt to old version
2726                    s.storage
2727                        .high_priority_ratio_in_percent
2728                        .unwrap_or(default::storage::high_priority_ratio_in_percent()),
2729                ) as f64
2730                    / 100.0,
2731            }),
2732            CacheEvictionConfig::Lfu {
2733                window_capacity_ratio_in_percent,
2734                protected_capacity_ratio_in_percent,
2735                cmsketch_eps,
2736                cmsketch_confidence,
2737            } => EvictionConfig::Lfu(LfuConfig {
2738                window_capacity_ratio: window_capacity_ratio_in_percent
2739                    .unwrap_or(default::storage::window_capacity_ratio_in_percent())
2740                    as f64
2741                    / 100.0,
2742                protected_capacity_ratio: protected_capacity_ratio_in_percent
2743                    .unwrap_or(default::storage::protected_capacity_ratio_in_percent())
2744                    as f64
2745                    / 100.0,
2746                cmsketch_eps: cmsketch_eps.unwrap_or(default::storage::cmsketch_eps()),
2747                cmsketch_confidence: cmsketch_confidence
2748                    .unwrap_or(default::storage::cmsketch_confidence()),
2749            }),
2750            CacheEvictionConfig::S3Fifo {
2751                small_queue_capacity_ratio_in_percent,
2752                ghost_queue_capacity_ratio_in_percent,
2753                small_to_main_freq_threshold,
2754            } => EvictionConfig::S3Fifo(S3FifoConfig {
2755                small_queue_capacity_ratio: small_queue_capacity_ratio_in_percent
2756                    .unwrap_or(default::storage::small_queue_capacity_ratio_in_percent())
2757                    as f64
2758                    / 100.0,
2759                ghost_queue_capacity_ratio: ghost_queue_capacity_ratio_in_percent
2760                    .unwrap_or(default::storage::ghost_queue_capacity_ratio_in_percent())
2761                    as f64
2762                    / 100.0,
2763                small_to_main_freq_threshold: small_to_main_freq_threshold
2764                    .unwrap_or(default::storage::small_to_main_freq_threshold()),
2765            }),
2766        }
2767    };
2768
2769    let block_cache_eviction_config = get_eviction_config(&s.storage.cache.block_cache_eviction);
2770    let meta_cache_eviction_config = get_eviction_config(&s.storage.cache.meta_cache_eviction);
2771
2772    let prefetch_buffer_capacity_mb =
2773        s.storage
2774            .shared_buffer_capacity_mb
2775            .unwrap_or(match &block_cache_eviction_config {
2776                EvictionConfig::Lru(lru) => {
2777                    ((1.0 - lru.high_priority_pool_ratio) * block_cache_capacity_mb as f64) as usize
2778                }
2779                EvictionConfig::Lfu(lfu) => {
2780                    ((1.0 - lfu.protected_capacity_ratio) * block_cache_capacity_mb as f64) as usize
2781                }
2782                EvictionConfig::S3Fifo(s3fifo) => {
2783                    (s3fifo.small_queue_capacity_ratio * block_cache_capacity_mb as f64) as usize
2784                }
2785            });
2786
2787    let block_file_cache_flush_buffer_threshold_mb = s
2788        .storage
2789        .data_file_cache
2790        .flush_buffer_threshold_mb
2791        .unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb());
2792    let meta_file_cache_flush_buffer_threshold_mb = s
2793        .storage
2794        .meta_file_cache
2795        .flush_buffer_threshold_mb
2796        .unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb());
2797
2798    StorageMemoryConfig {
2799        block_cache_capacity_mb,
2800        block_cache_shard_num,
2801        meta_cache_capacity_mb,
2802        meta_cache_shard_num,
2803        shared_buffer_capacity_mb,
2804        compactor_memory_limit_mb,
2805        prefetch_buffer_capacity_mb,
2806        block_cache_eviction_config,
2807        meta_cache_eviction_config,
2808        block_file_cache_flush_buffer_threshold_mb,
2809        meta_file_cache_flush_buffer_threshold_mb,
2810    }
2811}
2812
2813#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
2814pub struct CompactionConfig {
2815    #[serde(default = "default::compaction_config::max_bytes_for_level_base")]
2816    pub max_bytes_for_level_base: u64,
2817    #[serde(default = "default::compaction_config::max_bytes_for_level_multiplier")]
2818    pub max_bytes_for_level_multiplier: u64,
2819    #[serde(default = "default::compaction_config::max_compaction_bytes")]
2820    pub max_compaction_bytes: u64,
2821    #[serde(default = "default::compaction_config::sub_level_max_compaction_bytes")]
2822    pub sub_level_max_compaction_bytes: u64,
2823    #[serde(default = "default::compaction_config::level0_tier_compact_file_number")]
2824    pub level0_tier_compact_file_number: u64,
2825    #[serde(default = "default::compaction_config::target_file_size_base")]
2826    pub target_file_size_base: u64,
2827    #[serde(default = "default::compaction_config::compaction_filter_mask")]
2828    pub compaction_filter_mask: u32,
2829    #[serde(default = "default::compaction_config::max_sub_compaction")]
2830    pub max_sub_compaction: u32,
2831    #[serde(default = "default::compaction_config::level0_stop_write_threshold_sub_level_number")]
2832    pub level0_stop_write_threshold_sub_level_number: u64,
2833    #[serde(default = "default::compaction_config::level0_sub_level_compact_level_count")]
2834    pub level0_sub_level_compact_level_count: u32,
2835    #[serde(
2836        default = "default::compaction_config::level0_overlapping_sub_level_compact_level_count"
2837    )]
2838    pub level0_overlapping_sub_level_compact_level_count: u32,
2839    #[serde(default = "default::compaction_config::max_space_reclaim_bytes")]
2840    pub max_space_reclaim_bytes: u64,
2841    #[serde(default = "default::compaction_config::level0_max_compact_file_number")]
2842    pub level0_max_compact_file_number: u64,
2843    #[serde(default = "default::compaction_config::tombstone_reclaim_ratio")]
2844    pub tombstone_reclaim_ratio: u32,
2845    #[serde(default = "default::compaction_config::enable_emergency_picker")]
2846    pub enable_emergency_picker: bool,
2847    #[serde(default = "default::compaction_config::max_level")]
2848    pub max_level: u32,
2849    #[serde(default = "default::compaction_config::sst_allowed_trivial_move_min_size")]
2850    pub sst_allowed_trivial_move_min_size: u64,
2851    #[serde(default = "default::compaction_config::sst_allowed_trivial_move_max_count")]
2852    pub sst_allowed_trivial_move_max_count: u32,
2853    #[serde(default = "default::compaction_config::max_l0_compact_level_count")]
2854    pub max_l0_compact_level_count: u32,
2855    #[serde(default = "default::compaction_config::disable_auto_group_scheduling")]
2856    pub disable_auto_group_scheduling: bool,
2857    #[serde(default = "default::compaction_config::max_overlapping_level_size")]
2858    pub max_overlapping_level_size: u64,
2859    #[serde(default = "default::compaction_config::emergency_level0_sst_file_count")]
2860    pub emergency_level0_sst_file_count: u32,
2861    #[serde(default = "default::compaction_config::emergency_level0_sub_level_partition")]
2862    pub emergency_level0_sub_level_partition: u32,
2863    #[serde(default = "default::compaction_config::level0_stop_write_threshold_max_sst_count")]
2864    pub level0_stop_write_threshold_max_sst_count: u32,
2865    #[serde(default = "default::compaction_config::level0_stop_write_threshold_max_size")]
2866    pub level0_stop_write_threshold_max_size: u64,
2867}
2868
2869/// Note: only applies to meta store backends other than `SQLite`.
2870#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
2871pub struct MetaStoreConfig {
2872    /// Maximum number of connections for the meta store connection pool.
2873    #[serde(default = "default::meta_store_config::max_connections")]
2874    pub max_connections: u32,
2875    /// Minimum number of connections for the meta store connection pool.
2876    #[serde(default = "default::meta_store_config::min_connections")]
2877    pub min_connections: u32,
2878    /// Connection timeout in seconds for a meta store connection.
2879    #[serde(default = "default::meta_store_config::connection_timeout_sec")]
2880    pub connection_timeout_sec: u64,
2881    /// Idle timeout in seconds for a meta store connection.
2882    #[serde(default = "default::meta_store_config::idle_timeout_sec")]
2883    pub idle_timeout_sec: u64,
2884    /// Acquire timeout in seconds for a meta store connection.
2885    #[serde(default = "default::meta_store_config::acquire_timeout_sec")]
2886    pub acquire_timeout_sec: u64,
2887}
2888
2889#[cfg(test)]
2890mod tests {
2891    use risingwave_license::LicenseKey;
2892
2893    use super::*;
2894
2895    fn default_config_for_docs() -> RwConfig {
2896        let mut config = RwConfig::default();
2897        // Set `license_key` to empty in the docs to avoid any confusion.
2898        config.system.license_key = Some(LicenseKey::empty());
2899        config
2900    }
2901
2902    /// This test ensures that `config/example.toml` is up-to-date with the default values specified
2903    /// in this file. Developer should run `./risedev generate-example-config` to update it if this
2904    /// test fails.
2905    #[test]
2906    fn test_example_up_to_date() {
2907        const HEADER: &str = "# This file is generated by ./risedev generate-example-config
2908# Check detailed comments in src/common/src/config.rs";
2909
2910        let actual = expect_test::expect_file!["../../config/example.toml"];
2911        let default = toml::to_string(&default_config_for_docs()).expect("failed to serialize");
2912
2913        let expected = format!("{HEADER}\n\n{default}");
2914        actual.assert_eq(&expected);
2915
2916        let expected = rw_config_to_markdown();
2917        let actual = expect_test::expect_file!["../../config/docs.md"];
2918        actual.assert_eq(&expected);
2919    }
2920
2921    #[derive(Debug)]
2922    struct ConfigItemDoc {
2923        desc: String,
2924        default: String,
2925    }
2926
2927    fn rw_config_to_markdown() -> String {
2928        let mut config_rustdocs = BTreeMap::<String, Vec<(String, String)>>::new();
2929        RwConfig::config_docs("".to_owned(), &mut config_rustdocs);
2930
2931        // Section -> Config Name -> ConfigItemDoc
2932        let mut configs: BTreeMap<String, BTreeMap<String, ConfigItemDoc>> = config_rustdocs
2933            .into_iter()
2934            .map(|(k, v)| {
2935                let docs: BTreeMap<String, ConfigItemDoc> = v
2936                    .into_iter()
2937                    .map(|(name, desc)| {
2938                        (
2939                            name,
2940                            ConfigItemDoc {
2941                                desc,
2942                                default: "".to_owned(), // unset
2943                            },
2944                        )
2945                    })
2946                    .collect();
2947                (k, docs)
2948            })
2949            .collect();
2950
2951        let toml_doc: BTreeMap<String, toml::Value> =
2952            toml::from_str(&toml::to_string(&default_config_for_docs()).unwrap()).unwrap();
2953        toml_doc.into_iter().for_each(|(name, value)| {
2954            set_default_values("".to_owned(), name, value, &mut configs);
2955        });
2956
2957        let mut markdown = "# RisingWave System Configurations\n\n".to_owned()
2958            + "This page is automatically generated by `./risedev generate-example-config`\n";
2959        for (section, configs) in configs {
2960            if configs.is_empty() {
2961                continue;
2962            }
2963            markdown.push_str(&format!("\n## {}\n\n", section));
2964            markdown.push_str("| Config | Description | Default |\n");
2965            markdown.push_str("|--------|-------------|---------|\n");
2966            for (config, doc) in configs {
2967                markdown.push_str(&format!(
2968                    "| {} | {} | {} |\n",
2969                    config, doc.desc, doc.default
2970                ));
2971            }
2972        }
2973        markdown
2974    }
2975
2976    fn set_default_values(
2977        section: String,
2978        name: String,
2979        value: toml::Value,
2980        configs: &mut BTreeMap<String, BTreeMap<String, ConfigItemDoc>>,
2981    ) {
2982        // Set the default value if it's a config name-value pair, otherwise it's a sub-section (Table) that should be recursively processed.
2983        if let toml::Value::Table(table) = value {
2984            let section_configs: BTreeMap<String, toml::Value> =
2985                table.clone().into_iter().collect();
2986            let sub_section = if section.is_empty() {
2987                name
2988            } else {
2989                format!("{}.{}", section, name)
2990            };
2991            section_configs
2992                .into_iter()
2993                .for_each(|(k, v)| set_default_values(sub_section.clone(), k, v, configs))
2994        } else if let Some(t) = configs.get_mut(&section) {
2995            if let Some(item_doc) = t.get_mut(&name) {
2996                item_doc.default = format!("{}", value);
2997            }
2998        }
2999    }
3000
3001    #[test]
3002    fn test_object_store_configs_backward_compatibility() {
3003        // Define configs with the old name and make sure it still works
3004        {
3005            let config: RwConfig = toml::from_str(
3006                r#"
3007            [storage.object_store]
3008            object_store_set_atomic_write_dir = true
3009
3010            [storage.object_store.s3]
3011            object_store_keepalive_ms = 1
3012            object_store_send_buffer_size = 1
3013            object_store_recv_buffer_size = 1
3014            object_store_nodelay = false
3015
3016            [storage.object_store.s3.developer]
3017            object_store_retry_unknown_service_error = true
3018            object_store_retryable_service_error_codes = ['dummy']
3019
3020
3021            "#,
3022            )
3023            .unwrap();
3024
3025            assert!(config.storage.object_store.set_atomic_write_dir);
3026            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
3027            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
3028            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
3029            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
3030            assert!(
3031                config
3032                    .storage
3033                    .object_store
3034                    .s3
3035                    .developer
3036                    .retry_unknown_service_error
3037            );
3038            assert_eq!(
3039                config
3040                    .storage
3041                    .object_store
3042                    .s3
3043                    .developer
3044                    .retryable_service_error_codes,
3045                vec!["dummy".to_owned()]
3046            );
3047        }
3048
3049        // Define configs with the new name and make sure it works
3050        {
3051            let config: RwConfig = toml::from_str(
3052                r#"
3053            [storage.object_store]
3054            set_atomic_write_dir = true
3055
3056            [storage.object_store.s3]
3057            keepalive_ms = 1
3058            send_buffer_size = 1
3059            recv_buffer_size = 1
3060            nodelay = false
3061
3062            [storage.object_store.s3.developer]
3063            retry_unknown_service_error = true
3064            retryable_service_error_codes = ['dummy']
3065
3066
3067            "#,
3068            )
3069            .unwrap();
3070
3071            assert!(config.storage.object_store.set_atomic_write_dir);
3072            assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
3073            assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
3074            assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
3075            assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
3076            assert!(
3077                config
3078                    .storage
3079                    .object_store
3080                    .s3
3081                    .developer
3082                    .retry_unknown_service_error
3083            );
3084            assert_eq!(
3085                config
3086                    .storage
3087                    .object_store
3088                    .s3
3089                    .developer
3090                    .retryable_service_error_codes,
3091                vec!["dummy".to_owned()]
3092            );
3093        }
3094    }
3095
3096    #[test]
3097    fn test_meta_configs_backward_compatibility() {
3098        // Test periodic_space_reclaim_compaction_interval_sec
3099        {
3100            let config: RwConfig = toml::from_str(
3101                r#"
3102            [meta]
3103            periodic_split_compact_group_interval_sec = 1
3104            table_write_throughput_threshold = 10
3105            min_table_split_write_throughput = 5
3106            "#,
3107            )
3108            .unwrap();
3109
3110            assert_eq!(
3111                config
3112                    .meta
3113                    .periodic_scheduling_compaction_group_split_interval_sec,
3114                1
3115            );
3116            assert_eq!(config.meta.table_high_write_throughput_threshold, 10);
3117            assert_eq!(config.meta.table_low_write_throughput_threshold, 5);
3118        }
3119    }
3120}