risingwave_common/system_param/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module defines utilities to work with system parameters ([`PbSystemParams`] in
16//! `meta.proto`).
17//!
18//! To add a new system parameter:
19//! - Add a new field to [`PbSystemParams`] in `meta.proto`.
20//! - Add a new entry to `for_all_params` in this file.
21//! - Add a new method to [`reader::SystemParamsReader`].
22
23pub mod adaptive_parallelism_strategy;
24pub mod common;
25pub mod diff;
26pub mod local_manager;
27pub mod reader;
28
29use std::fmt::Debug;
30use std::ops::RangeBounds;
31use std::str::FromStr;
32
33use paste::paste;
34use risingwave_license::{LicenseKey, LicenseKeyRef};
35use risingwave_pb::meta::PbSystemParams;
36
37use self::diff::SystemParamsDiff;
38pub use crate::system_param::adaptive_parallelism_strategy::AdaptiveParallelismStrategy;
39
40pub type SystemParamsError = String;
41
42type Result<T> = core::result::Result<T, SystemParamsError>;
43
44/// The trait for the value type of a system parameter.
45pub trait ParamValue: ToString + FromStr {
46    type Borrowed<'a>;
47}
48
49macro_rules! impl_param_value {
50    ($type:ty) => {
51        impl_param_value!($type => $type);
52    };
53    ($type:ty => $borrowed:ty) => {
54        impl ParamValue for $type {
55            type Borrowed<'a> = $borrowed;
56        }
57    };
58}
59
60impl_param_value!(bool);
61impl_param_value!(u32);
62impl_param_value!(u64);
63impl_param_value!(f64);
64impl_param_value!(String => &'a str);
65impl_param_value!(LicenseKey => LicenseKeyRef<'a>);
66
67/// Define all system parameters here.
68///
69/// To match all these information, write the match arm as follows:
70/// ```text
71/// ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => {
72/// ```
73///
74/// Note:
75/// - Having `None` as default value means the parameter must be initialized.
76#[macro_export]
77macro_rules! for_all_params {
78    ($macro:ident) => {
79        $macro! {
80            // name                                     type                            default value                   mut?    doc
81            { barrier_interval_ms,                      u32,                            Some(1000_u32),                 true,   "The interval of periodic barrier.", },
82            { checkpoint_frequency,                     u64,                            Some(1_u64),                    true,   "There will be a checkpoint for every n barriers.", },
83            { sstable_size_mb,                          u32,                            Some(256_u32),                  false,  "Target size of the Sstable.", },
84            { parallel_compact_size_mb,                 u32,                            Some(512_u32),                  false,  "The size of parallel task for one compact/flush job.", },
85            { block_size_kb,                            u32,                            Some(64_u32),                   false,  "Size of each block in bytes in SST.", },
86            { bloom_false_positive,                     f64,                            Some(0.001_f64),                false,  "False positive probability of bloom filter.", },
87            { state_store,                              String,                         None,                           false,  "URL for the state store", },
88            { data_directory,                           String,                         None,                           false,  "Remote directory for storing data and metadata objects.", },
89            { backup_storage_url,                       String,                         None,                           true,   "Remote storage url for storing snapshots.", },
90            { backup_storage_directory,                 String,                         None,                           true,   "Remote directory for storing snapshots.", },
91            { max_concurrent_creating_streaming_jobs,   u32,                            Some(1_u32),                    true,   "Max number of concurrent creating streaming jobs.", },
92            { pause_on_next_bootstrap,                  bool,                           Some(false),                    true,   "Whether to pause all data sources on next bootstrap.", },
93            { enable_tracing,                           bool,                           Some(false),                    true,   "Whether to enable distributed tracing.", },
94            { use_new_object_prefix_strategy,           bool,                           None,                           false,  "Whether to split object prefix.", },
95            { license_key,                              risingwave_license::LicenseKey, Some(Default::default()),       true,   "The license key to activate enterprise features.", },
96            { time_travel_retention_ms,                 u64,                            Some(600000_u64),               true,   "The data retention period for time travel.", },
97            { adaptive_parallelism_strategy,            risingwave_common::system_param::AdaptiveParallelismStrategy,   Some(Default::default()),       true,   "The strategy for Adaptive Parallelism.", },
98            { per_database_isolation,                   bool,                           Some(true),                     true,   "Whether per database isolation is enabled", },
99        }
100    };
101}
102
103/// Convert field name to string.
104#[macro_export]
105macro_rules! key_of {
106    ($field:ident) => {
107        stringify!($field)
108    };
109}
110
111/// Define key constants for fields in `PbSystemParams` for use of other modules.
112macro_rules! def_key {
113    ($({ $field:ident, $($rest:tt)* },)*) => {
114        paste! {
115            $(
116                pub const [<$field:upper _KEY>]: &str = key_of!($field);
117            )*
118        }
119    };
120}
121
122for_all_params!(def_key);
123
124/// Define default value functions returning `Option`.
125macro_rules! def_default_opt {
126    ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
127        $(
128            paste::paste!(
129                pub fn [<$field _opt>]() -> Option<$type> {
130                    $default
131                }
132            );
133        )*
134    };
135}
136
137/// Define default value functions for those with `Some` default values.
138macro_rules! def_default {
139    ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
140        $(
141            def_default!(@ $field, $type, $default);
142        )*
143    };
144    (@ $field:ident, $type:ty, None) => {};
145    (@ $field:ident, $type:ty, $default: expr) => {
146        pub fn $field() -> $type {
147            $default.unwrap()
148        }
149        paste::paste!(
150            pub static [<$field:upper>]: LazyLock<$type> = LazyLock::new($field);
151        );
152    };
153}
154
155/// Default values for all parameters.
156pub mod default {
157    use std::sync::LazyLock;
158
159    for_all_params!(def_default_opt);
160    for_all_params!(def_default);
161}
162
163macro_rules! impl_check_missing_fields {
164    ($({ $field:ident, $($rest:tt)* },)*) => {
165        /// Check if any undeprecated fields are missing.
166        pub fn check_missing_params(params: &PbSystemParams) -> Result<()> {
167            $(
168                if params.$field.is_none() {
169                    return Err(format!("missing system param {:?}", key_of!($field)));
170                }
171            )*
172            Ok(())
173        }
174    };
175}
176
177/// Derive serialization to kv pairs.
178macro_rules! impl_system_params_to_kv {
179    ($({ $field:ident, $($rest:tt)* },)*) => {
180        /// The returned map only contains undeprecated fields.
181        /// Return error if there are missing fields.
182        #[allow(clippy::vec_init_then_push)]
183        pub fn system_params_to_kv(params: &PbSystemParams) -> Result<Vec<(String, String)>> {
184            check_missing_params(params)?;
185            let mut ret = Vec::new();
186            $(ret.push((
187                key_of!($field).to_owned(),
188                params.$field.as_ref().unwrap().to_string(),
189            ));)*
190            Ok(ret)
191        }
192    };
193}
194
195macro_rules! impl_derive_missing_fields {
196    ($({ $field:ident, $($rest:tt)* },)*) => {
197        pub fn derive_missing_fields(params: &mut PbSystemParams) {
198            $(
199                if params.$field.is_none() && let Some(v) = OverrideFromParams::$field(params) {
200                    params.$field = Some(v.into());
201                }
202            )*
203        }
204    };
205}
206
207/// Derive deserialization from kv pairs.
208macro_rules! impl_system_params_from_kv {
209    ($({ $field:ident, $($rest:tt)* },)*) => {
210        /// Try to deserialize deprecated fields as well.
211        /// Return error if there are unrecognized fields.
212        pub fn system_params_from_kv<K, V>(mut kvs: Vec<(K, V)>) -> Result<PbSystemParams>
213        where
214            K: AsRef<[u8]> + Debug,
215            V: AsRef<[u8]> + Debug,
216        {
217            let mut ret = PbSystemParams::default();
218            kvs.retain(|(k,v)| {
219                let k = std::str::from_utf8(k.as_ref()).unwrap();
220                let v = std::str::from_utf8(v.as_ref()).unwrap();
221                match k {
222                    $(
223                        key_of!($field) => {
224                            ret.$field = Some(v.parse().unwrap());
225                            false
226                        }
227                    )*
228                    _ => {
229                        true
230                    }
231                }
232            });
233            derive_missing_fields(&mut ret);
234            if !kvs.is_empty() {
235                let unrecognized_params = kvs.into_iter().map(|(k, v)| {
236                    (
237                        std::str::from_utf8(k.as_ref()).unwrap().to_owned(),
238                        std::str::from_utf8(v.as_ref()).unwrap().to_owned(),
239                    )
240                }).collect::<Vec<_>>();
241                tracing::warn!("unrecognized system params {:?}", unrecognized_params);
242            }
243            Ok(ret)
244        }
245    };
246}
247
248/// Define check rules when a field is changed.
249/// If you want custom rules, please override the default implementation in
250/// `OverrideValidateOnSet` below.
251macro_rules! impl_default_validation_on_set {
252    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
253        #[allow(clippy::ptr_arg)]
254        trait ValidateOnSet {
255            $(
256                fn $field(_v: &$type) -> Result<()> {
257                    if !$is_mutable {
258                        Err(format!("{:?} is immutable", key_of!($field)))
259                    } else {
260                        Ok(())
261                    }
262                }
263            )*
264
265            fn expect_range<T, R>(v: T, range: R) -> Result<()>
266            where
267                T: Debug + PartialOrd,
268                R: RangeBounds<T> + Debug,
269            {
270                if !range.contains::<T>(&v) {
271                    Err(format!("value {:?} out of range, expect {:?}", v, range))
272                } else {
273                    Ok(())
274                }
275            }
276        }
277    }
278}
279
280/// Define rules to derive a parameter from others. This is useful for parameter type change or
281/// semantic change, where a new parameter has to be introduced. When the cluster upgrades to a
282/// newer version, we need to ensure the effect of the new parameter is equal to its older versions.
283/// For example, if you had `interval_sec` and now you want finer granularity, you can introduce a
284/// new param `interval_ms` and try to derive it from `interval_sec` by overriding `FromParams`
285/// trait in `OverrideFromParams`:
286///
287/// ```ignore
288/// impl FromParams for OverrideFromParams {
289///     fn interval_ms(params: &PbSystemParams) -> Option<u64> {
290///         if let Some(sec) = params.interval_sec {
291///             Some(sec * 1000)
292///         } else {
293///             None
294///         }
295///     }
296/// }
297/// ```
298///
299/// Note that newer versions must be prioritized during derivation.
300macro_rules! impl_default_from_other_params {
301    ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
302        trait FromParams {
303            $(
304                fn $field(_params: &PbSystemParams) -> Option<$type> {
305                    None
306                }
307            )*
308        }
309    };
310}
311
312macro_rules! impl_set_system_param {
313    ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
314        /// Set a system parameter with the given value or default one.
315        ///
316        /// Returns the new value if changed, or an error if the parameter is unrecognized
317        /// or the value is invalid.
318        pub fn set_system_param(
319            params: &mut PbSystemParams,
320            key: &str,
321            value: Option<impl AsRef<str>>,
322        ) -> Result<Option<(String, SystemParamsDiff)>> {
323            use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};
324
325            match key {
326                $(
327                    key_of!($field) => {
328                        let v: $type = if let Some(v) = value {
329                            #[allow(rw::format_error)]
330                            v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
331                        } else {
332                            $default.ok_or_else(|| format!("{} does not have a default value", key))?
333                        };
334                        OverrideValidateOnSet::$field(&v)?;
335
336                        let changed = SystemParamsReader::new(&*params).$field() != v;
337                        if changed {
338                            let diff = SystemParamsDiff {
339                                $field: Some(v.to_owned()),
340                                ..Default::default()
341                            };
342                            params.$field = Some(v.into());                                 // do not use `to_string` to avoid writing redacted values
343                            let new_value = params.$field.as_ref().unwrap().to_string();    // can now use `to_string` on protobuf primitive types
344                            Ok(Some((new_value, diff)))
345                        } else {
346                            Ok(None)
347                        }
348                    },
349                )*
350                _ => {
351                    Err(format!(
352                        "unrecognized system parameter {:?}",
353                        key
354                    ))
355                }
356            }
357        }
358    };
359}
360
361macro_rules! impl_is_mutable {
362    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
363        pub fn is_mutable(field: &str) -> Result<bool> {
364            match field {
365                $(
366                    key_of!($field) => Ok($is_mutable),
367                )*
368                _ => Err(format!("{:?} is not a system parameter", field))
369            }
370        }
371    }
372}
373
374macro_rules! impl_system_params_for_test {
375    ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
376        #[allow(clippy::needless_update)]
377        pub fn system_params_for_test() -> PbSystemParams {
378            let mut ret = PbSystemParams {
379                $(
380                    $field: ($default as Option<$type>).map(Into::into),
381                )*
382                ..Default::default() // `None` for deprecated params
383            };
384            ret.data_directory = Some("hummock_001".to_owned());
385            ret.state_store = Some("hummock+memory".to_owned());
386            ret.backup_storage_url = Some("memory".into());
387            ret.backup_storage_directory = Some("backup".into());
388            ret.use_new_object_prefix_strategy = Some(false);
389            ret.time_travel_retention_ms = Some(0);
390            ret
391        }
392    };
393}
394
395for_all_params!(impl_system_params_from_kv);
396for_all_params!(impl_is_mutable);
397for_all_params!(impl_derive_missing_fields);
398for_all_params!(impl_check_missing_fields);
399for_all_params!(impl_system_params_to_kv);
400for_all_params!(impl_set_system_param);
401for_all_params!(impl_default_validation_on_set);
402for_all_params!(impl_system_params_for_test);
403
404struct OverrideValidateOnSet;
405impl ValidateOnSet for OverrideValidateOnSet {
406    fn barrier_interval_ms(v: &u32) -> Result<()> {
407        Self::expect_range(*v, 100..)
408    }
409
410    fn checkpoint_frequency(v: &u64) -> Result<()> {
411        Self::expect_range(*v, 1..)
412    }
413
414    fn backup_storage_directory(v: &String) -> Result<()> {
415        if v.trim().is_empty() {
416            return Err("backup_storage_directory cannot be empty".into());
417        }
418        Ok(())
419    }
420
421    fn backup_storage_url(v: &String) -> Result<()> {
422        if v.trim().is_empty() {
423            return Err("backup_storage_url cannot be empty".into());
424        }
425        Ok(())
426    }
427
428    fn time_travel_retention_ms(v: &u64) -> Result<()> {
429        // This is intended to guarantee that non-time-travel batch query can still function even compute node's recent versions doesn't include the desired version.
430        let min_retention_ms = 600_000;
431        if *v < min_retention_ms {
432            return Err(format!(
433                "time_travel_retention_ms cannot be less than {min_retention_ms}"
434            ));
435        }
436        Ok(())
437    }
438}
439
440for_all_params!(impl_default_from_other_params);
441
442struct OverrideFromParams;
443impl FromParams for OverrideFromParams {}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448
449    #[test]
450    fn test_to_from_kv() {
451        // Include all fields (deprecated also).
452        let kvs = vec![
453            (BARRIER_INTERVAL_MS_KEY, "1"),
454            (CHECKPOINT_FREQUENCY_KEY, "1"),
455            (SSTABLE_SIZE_MB_KEY, "1"),
456            (PARALLEL_COMPACT_SIZE_MB_KEY, "2"),
457            (BLOCK_SIZE_KB_KEY, "1"),
458            (BLOOM_FALSE_POSITIVE_KEY, "1"),
459            (STATE_STORE_KEY, "a"),
460            (DATA_DIRECTORY_KEY, "a"),
461            (BACKUP_STORAGE_URL_KEY, "a"),
462            (BACKUP_STORAGE_DIRECTORY_KEY, "a"),
463            (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
464            (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
465            (ENABLE_TRACING_KEY, "true"),
466            (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
467            (LICENSE_KEY_KEY, "foo"),
468            (TIME_TRAVEL_RETENTION_MS_KEY, "0"),
469            (ADAPTIVE_PARALLELISM_STRATEGY_KEY, "Auto"),
470            (PER_DATABASE_ISOLATION_KEY, "true"),
471            ("a_deprecated_param", "foo"),
472        ];
473
474        // To kv - missing field.
475        let p = PbSystemParams::default();
476        assert!(system_params_to_kv(&p).is_err());
477
478        // From kv - unrecognized field should be ignored
479        assert!(system_params_from_kv(vec![("?", "?")]).is_ok());
480
481        // Deser & ser.
482        let p = system_params_from_kv(kvs).unwrap();
483        assert_eq!(
484            p,
485            system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap()
486        );
487    }
488
489    #[test]
490    fn test_set() {
491        let mut p = system_params_for_test();
492        // Unrecognized param.
493        assert!(set_system_param(&mut p, "?", Some("?".to_owned())).is_err());
494        // Value out of range.
495        assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("-1".to_owned())).is_err());
496        // Set immutable.
497        assert!(set_system_param(&mut p, STATE_STORE_KEY, Some("?".to_owned())).is_err());
498        // Parse error.
499        assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("?".to_owned())).is_err());
500        // Normal set.
501        assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("500".to_owned())).is_ok());
502        assert_eq!(p.checkpoint_frequency, Some(500));
503    }
504
505    // Test that we always redact the value of the license key when displaying it, but when it comes to
506    // persistency, we still write and get the real value.
507    #[test]
508    fn test_redacted_type() {
509        let mut p = system_params_for_test();
510
511        let new_license_key_value = "new_license_key_value";
512        assert_ne!(p.license_key(), new_license_key_value);
513
514        let (new_string_value, diff) =
515            set_system_param(&mut p, LICENSE_KEY_KEY, Some(new_license_key_value))
516                .expect("should succeed")
517                .expect("should changed");
518
519        // New string value should be the same as what we set.
520        // This should not be redacted.
521        assert_eq!(new_string_value, new_license_key_value);
522
523        let new_value = diff.license_key.unwrap();
524        // `to_string` repr will be redacted.
525        assert_eq!(new_value.to_string(), "<redacted>");
526        // while `Into<String>` still shows the real value.
527        assert_eq!(String::from(new_value.as_ref()), new_license_key_value);
528    }
529}