risingwave_common/system_param/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module defines utilities to work with system parameters ([`PbSystemParams`] in
16//! `meta.proto`).
17//!
18//! To add a new system parameter:
19//! - Add a new field to [`PbSystemParams`] in `meta.proto`.
20//! - Add a new entry to `for_all_params` in this file.
21//! - Add a new method to [`reader::SystemParamsReader`].
22
23pub mod adaptive_parallelism_strategy;
24pub mod common;
25pub mod diff;
26pub mod local_manager;
27pub mod reader;
28
29use std::fmt::Debug;
30use std::ops::RangeBounds;
31use std::str::FromStr;
32
33use paste::paste;
34use risingwave_license::{LicenseKey, LicenseKeyRef};
35use risingwave_pb::meta::PbSystemParams;
36
37use self::diff::SystemParamsDiff;
38pub use crate::system_param::adaptive_parallelism_strategy::AdaptiveParallelismStrategy;
39
40pub type SystemParamsError = String;
41
42type Result<T> = core::result::Result<T, SystemParamsError>;
43
44/// The trait for the value type of a system parameter.
45pub trait ParamValue: ToString + FromStr {
46    type Borrowed<'a>;
47}
48
49macro_rules! impl_param_value {
50    ($type:ty) => {
51        impl_param_value!($type => $type);
52    };
53    ($type:ty => $borrowed:ty) => {
54        impl ParamValue for $type {
55            type Borrowed<'a> = $borrowed;
56        }
57    };
58}
59
60impl_param_value!(bool);
61impl_param_value!(u32);
62impl_param_value!(u64);
63impl_param_value!(f64);
64impl_param_value!(String => &'a str);
65impl_param_value!(LicenseKey => LicenseKeyRef<'a>);
66
67/// Define all system parameters here.
68///
69/// To match all these information, write the match arm as follows:
70/// ```text
71/// ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => {
72/// ```
73///
74/// Note:
75/// - Having `None` as default value means the parameter must be initialized.
76#[macro_export]
77macro_rules! for_all_params {
78    ($macro:ident) => {
79        $macro! {
80            // name                                     type                            default value                   mut?    doc
81            { barrier_interval_ms,                      u32,                            Some(1000_u32),                 true,   "The interval of periodic barrier.", },
82            { checkpoint_frequency,                     u64,                            Some(1_u64),                    true,   "There will be a checkpoint for every n barriers.", },
83            { sstable_size_mb,                          u32,                            Some(256_u32),                  false,  "Target size of the Sstable.", },
84            { parallel_compact_size_mb,                 u32,                            Some(512_u32),                  false,  "The size of parallel task for one compact/flush job.", },
85            { block_size_kb,                            u32,                            Some(64_u32),                   false,  "Size of each block in bytes in SST.", },
86            { bloom_false_positive,                     f64,                            Some(0.001_f64),                false,  "DEPRECATED: Bloom filter is no longer a supported SST filter implementation. This field is kept for backward compatibility and no longer controls whether SST filters are emitted.", },
87            { state_store,                              String,                         None,                           false,  "URL for the state store", },
88            { data_directory,                           String,                         None,                           false,  "Remote directory for storing data and metadata objects.", },
89            { backup_storage_url,                       String,                         None,                           true,   "Remote storage url for storing snapshots.", },
90            { backup_storage_directory,                 String,                         None,                           true,   "Remote directory for storing snapshots.", },
91            { max_concurrent_creating_streaming_jobs,   u32,                            Some(1_u32),                    true,   "Max number of concurrent creating streaming jobs.", },
92            { pause_on_next_bootstrap,                  bool,                           Some(false),                    true,   "Whether to pause all data sources on next bootstrap.", },
93            { enable_tracing,                           bool,                           Some(false),                    true,   "Whether to enable distributed tracing.", },
94            { use_new_object_prefix_strategy,           bool,                           None,                           false,  "Whether to split object prefix.", },
95            { license_key,                              risingwave_license::LicenseKey, Some(Default::default()),       true,   "The license key to activate enterprise features.", },
96            { time_travel_retention_ms,                 u64,                            Some(600000_u64),               true,   "The data retention period for time travel.", },
97            { per_database_isolation,                   bool,                           Some(true),                     true,   "Whether per database isolation is enabled", },
98            { enforce_secret,                  bool,                           Some(false),                    true,   "Whether to enforce secret on cloud.", },
99        }
100    };
101}
102
103// Warn user if barrier_interval_ms is set above 5mins.
104pub const NOTICE_BARRIER_INTERVAL_MS: u32 = 300000;
105// Warn user if checkpoint_frequency is set above 60.
106pub const NOTICE_CHECKPOINT_FREQUENCY: u64 = 60;
107
108/// Convert field name to string.
109#[macro_export]
110macro_rules! key_of {
111    ($field:ident) => {
112        stringify!($field)
113    };
114}
115
116/// Define key constants for fields in `PbSystemParams` for use of other modules.
117macro_rules! def_key {
118    ($({ $field:ident, $($rest:tt)* },)*) => {
119        paste! {
120            $(
121                pub const [<$field:upper _KEY>]: &str = key_of!($field);
122            )*
123        }
124    };
125}
126
127for_all_params!(def_key);
128
129/// Define default value functions returning `Option`.
130macro_rules! def_default_opt {
131    ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
132        $(
133            paste::paste!(
134                pub fn [<$field _opt>]() -> Option<$type> {
135                    $default
136                }
137            );
138        )*
139    };
140}
141
142/// Define default value functions for those with `Some` default values.
143macro_rules! def_default {
144    ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
145        $(
146            def_default!(@ $field, $type, $default);
147        )*
148    };
149    (@ $field:ident, $type:ty, None) => {};
150    (@ $field:ident, $type:ty, $default: expr) => {
151        pub fn $field() -> $type {
152            $default.unwrap()
153        }
154        paste::paste!(
155            pub static [<$field:upper>]: LazyLock<$type> = LazyLock::new($field);
156        );
157    };
158}
159
160/// Default values for all parameters.
161pub mod default {
162    use std::sync::LazyLock;
163
164    for_all_params!(def_default_opt);
165    for_all_params!(def_default);
166}
167
168macro_rules! impl_check_missing_fields {
169    ($({ $field:ident, $($rest:tt)* },)*) => {
170        /// Check if any undeprecated fields are missing.
171        #[expect(deprecated)]
172        pub fn check_missing_params(params: &PbSystemParams) -> Result<()> {
173            $(
174                if params.$field.is_none() {
175                    return Err(format!("missing system param {:?}", key_of!($field)));
176                }
177            )*
178            Ok(())
179        }
180    };
181}
182
183/// Derive serialization to kv pairs.
184macro_rules! impl_system_params_to_kv {
185    ($({ $field:ident, $($rest:tt)* },)*) => {
186        /// The returned map only contains undeprecated fields.
187        /// Return error if there are missing fields.
188        #[allow(clippy::vec_init_then_push)]
189        #[expect(deprecated)]
190        pub fn system_params_to_kv(params: &PbSystemParams) -> Result<Vec<(String, String)>> {
191            check_missing_params(params)?;
192            let mut ret = Vec::new();
193            $(ret.push((
194                key_of!($field).to_owned(),
195                params.$field.as_ref().unwrap().to_string(),
196            ));)*
197            Ok(ret)
198        }
199    };
200}
201
202macro_rules! impl_derive_missing_fields {
203    ($({ $field:ident, $($rest:tt)* },)*) => {
204        #[expect(deprecated)]
205        pub fn derive_missing_fields(params: &mut PbSystemParams) {
206            $(
207                if params.$field.is_none() && let Some(v) = OverrideFromParams::$field(params) {
208                    params.$field = Some(v.into());
209                }
210            )*
211        }
212    };
213}
214
215/// Derive deserialization from kv pairs.
216macro_rules! impl_system_params_from_kv {
217    ($({ $field:ident, $($rest:tt)* },)*) => {
218        /// Try to deserialize deprecated fields as well.
219        /// Return error if there are unrecognized fields.
220        #[expect(deprecated)]
221        pub fn system_params_from_kv<K, V>(mut kvs: Vec<(K, V)>) -> Result<PbSystemParams>
222        where
223            K: AsRef<[u8]> + Debug,
224            V: AsRef<[u8]> + Debug,
225        {
226            let mut ret = PbSystemParams::default();
227            kvs.retain(|(k,v)| {
228                let k = std::str::from_utf8(k.as_ref()).unwrap();
229                let v = std::str::from_utf8(v.as_ref()).unwrap();
230                match k {
231                    $(
232                        key_of!($field) => {
233                            ret.$field = Some(v.parse().unwrap());
234                            false
235                        }
236                    )*
237                    _ => {
238                        true
239                    }
240                }
241            });
242            derive_missing_fields(&mut ret);
243            if !kvs.is_empty() {
244                let unrecognized_params = kvs.into_iter().map(|(k, v)| {
245                    (
246                        std::str::from_utf8(k.as_ref()).unwrap().to_owned(),
247                        std::str::from_utf8(v.as_ref()).unwrap().to_owned(),
248                    )
249                }).collect::<Vec<_>>();
250                tracing::warn!("unrecognized system params {:?}", unrecognized_params);
251            }
252            Ok(ret)
253        }
254    };
255}
256
257/// Define check rules when a field is changed.
258/// If you want custom rules, please override the default implementation in
259/// `OverrideValidateOnSet` below.
260macro_rules! impl_default_validation {
261    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
262        #[allow(clippy::ptr_arg)]
263        pub trait Validate {
264            $(
265                /// Default implementation does nothing.
266                /// Specific checks are implemented in `OverrideValidate`.
267                fn $field(_v: &$type) -> Result<()> {
268                    Ok(())
269                }
270            )*
271
272            fn expect_range<T, R>(v: T, range: R) -> Result<()>
273            where
274                T: Debug + PartialOrd,
275                R: RangeBounds<T> + Debug,
276            {
277                if !range.contains::<T>(&v) {
278                    Err(format!("value {:?} out of range, expect {:?}", v, range))
279                } else {
280                    Ok(())
281                }
282            }
283        }
284    }
285}
286
287/// Define rules to derive a parameter from others. This is useful for parameter type change or
288/// semantic change, where a new parameter has to be introduced. When the cluster upgrades to a
289/// newer version, we need to ensure the effect of the new parameter is equal to its older versions.
290/// For example, if you had `interval_sec` and now you want finer granularity, you can introduce a
291/// new param `interval_ms` and try to derive it from `interval_sec` by overriding `FromParams`
292/// trait in `OverrideFromParams`:
293///
294/// ```ignore
295/// impl FromParams for OverrideFromParams {
296///     fn interval_ms(params: &PbSystemParams) -> Option<u64> {
297///         if let Some(sec) = params.interval_sec {
298///             Some(sec * 1000)
299///         } else {
300///             None
301///         }
302///     }
303/// }
304/// ```
305///
306/// Note that newer versions must be prioritized during derivation.
307macro_rules! impl_default_from_other_params {
308    ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
309        trait FromParams {
310            $(
311                fn $field(_params: &PbSystemParams) -> Option<$type> {
312                    None
313                }
314            )*
315        }
316    };
317}
318
319macro_rules! impl_set_system_param {
320    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
321        /// Set a system parameter with the given value or default one.
322        ///
323        /// Returns the new value if changed, or an error if the parameter is unrecognized,
324        /// immutable, or the value is invalid.
325        #[expect(deprecated)]
326        pub fn set_system_param(
327            params: &mut PbSystemParams,
328            key: &str,
329            value: Option<impl AsRef<str>>,
330        ) -> Result<Option<(String, SystemParamsDiff)>> {
331            use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};
332
333            match key {
334                $(
335                    key_of!($field) => {
336                        if !$is_mutable {
337                            return Err(format!("{:?} is immutable", key_of!($field)));
338                        }
339
340                        let v: $type = if let Some(v) = value {
341                            #[allow(rw::format_error)]
342                            v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
343                        } else {
344                            $default.ok_or_else(|| format!("{} does not have a default value", key))?
345                        };
346                        OverrideValidate::$field(&v)?;
347
348                        let changed = SystemParamsReader::new(&*params).$field() != v;
349                        if changed {
350                            let diff = SystemParamsDiff {
351                                $field: Some(v.to_owned()),
352                                ..Default::default()
353                            };
354                            params.$field = Some(v.into());                                 // do not use `to_string` to avoid writing redacted values
355                            let new_value = params.$field.as_ref().unwrap().to_string();    // can now use `to_string` on protobuf primitive types
356                            Ok(Some((new_value, diff)))
357                        } else {
358                            Ok(None)
359                        }
360                    },
361                )*
362                _ => {
363                    Err(format!(
364                        "unrecognized system parameter {:?}",
365                        key
366                    ))
367                }
368            }
369        }
370    };
371}
372
373macro_rules! impl_is_mutable {
374    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
375        pub fn is_mutable(field: &str) -> Result<bool> {
376            match field {
377                $(
378                    key_of!($field) => Ok($is_mutable),
379                )*
380                _ => Err(format!("{:?} is not a system parameter", field))
381            }
382        }
383    }
384}
385
386macro_rules! impl_system_params_for_test {
387    ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
388        #[allow(clippy::needless_update)]
389        #[expect(deprecated)]
390        pub fn system_params_for_test() -> PbSystemParams {
391            let mut ret = PbSystemParams {
392                $(
393                    $field: ($default as Option<$type>).map(Into::into),
394                )*
395                ..Default::default() // `None` for deprecated params
396            };
397            ret.data_directory = Some("hummock_001".to_owned());
398            ret.state_store = Some("hummock+memory-isolated-for-test".to_owned());
399            ret.backup_storage_url = Some("memory-isolated-for-test".into());
400            ret.backup_storage_directory = Some("backup".into());
401            ret.use_new_object_prefix_strategy = Some(false);
402            ret.time_travel_retention_ms = Some(0);
403            ret
404        }
405    };
406}
407
408macro_rules! impl_validate_all_params {
409    ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
410        /// Validates all present parameters in a `PbSystemParams`.
411        ///
412        /// This function checks the validity of values against the rules in `OverrideValidate`,
413        /// regardless of whether a parameter is mutable. It is suitable for validating
414        /// initial parameters.
415        #[allow(rw::format_error)]
416        #[expect(deprecated)]
417        pub fn validate_init_system_params(params: &PbSystemParams) -> Result<()> {
418            $(
419                if let Some(ref v_pb) = params.$field {
420                    // 1. Convert the protobuf value (`v_pb`) to a string. `v_pb` could be &u32, &String, etc.
421                    //    `to_string()` works for all of them.
422                    // 2. Parse the string into the target logical type (`$type`), e.g., `LicenseKey`.
423                    //    This relies on the `FromStr` bound on `ParamValue`.
424                    let logical_v: $type = v_pb.to_string().parse()
425                        .map_err(|e| format!("cannot parse value for parameter '{}': {}", key_of!($field), e))?;
426                    // 3. Pass a reference to the correctly-typed logical value to the validator.
427                    OverrideValidate::$field(&logical_v)
428                        .map_err(|e| format!("invalid value for parameter '{}': {}", key_of!($field), e))?;
429                }
430            )*
431            Ok(())
432        }
433    };
434}
435
436for_all_params!(impl_system_params_from_kv);
437for_all_params!(impl_is_mutable);
438for_all_params!(impl_derive_missing_fields);
439for_all_params!(impl_check_missing_fields);
440for_all_params!(impl_system_params_to_kv);
441for_all_params!(impl_set_system_param);
442for_all_params!(impl_default_validation);
443for_all_params!(impl_validate_all_params);
444for_all_params!(impl_system_params_for_test);
445
446pub struct OverrideValidate;
447impl Validate for OverrideValidate {
448    fn barrier_interval_ms(v: &u32) -> Result<()> {
449        Self::expect_range(*v, 50..)
450    }
451
452    fn checkpoint_frequency(v: &u64) -> Result<()> {
453        Self::expect_range(*v, 1..)
454    }
455
456    fn backup_storage_directory(v: &String) -> Result<()> {
457        if v.trim().is_empty() {
458            return Err("backup_storage_directory cannot be empty".into());
459        }
460        Ok(())
461    }
462
463    fn backup_storage_url(v: &String) -> Result<()> {
464        if v.trim().is_empty() {
465            return Err("backup_storage_url cannot be empty".into());
466        }
467        Ok(())
468    }
469
470    fn time_travel_retention_ms(v: &u64) -> Result<()> {
471        // This is intended to guarantee that non-time-travel batch query can still function even compute node's recent versions doesn't include the desired version.
472        let min_retention_ms = 600_000;
473        // 0 is used to disable time travel.
474        if *v != 0 && *v < min_retention_ms {
475            return Err(format!(
476                "time_travel_retention_ms cannot be less than {min_retention_ms}"
477            ));
478        }
479        Ok(())
480    }
481}
482
483for_all_params!(impl_default_from_other_params);
484
485struct OverrideFromParams;
486impl FromParams for OverrideFromParams {}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491
492    #[test]
493    fn test_to_from_kv() {
494        // Include all fields (deprecated also).
495        let kvs = vec![
496            (BARRIER_INTERVAL_MS_KEY, "1"),
497            (CHECKPOINT_FREQUENCY_KEY, "1"),
498            (SSTABLE_SIZE_MB_KEY, "1"),
499            (PARALLEL_COMPACT_SIZE_MB_KEY, "2"),
500            (BLOCK_SIZE_KB_KEY, "1"),
501            (BLOOM_FALSE_POSITIVE_KEY, "1"),
502            (STATE_STORE_KEY, "a"),
503            (DATA_DIRECTORY_KEY, "a"),
504            (BACKUP_STORAGE_URL_KEY, "a"),
505            (BACKUP_STORAGE_DIRECTORY_KEY, "a"),
506            (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
507            (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
508            (ENABLE_TRACING_KEY, "true"),
509            (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
510            (LICENSE_KEY_KEY, "foo"),
511            (TIME_TRAVEL_RETENTION_MS_KEY, "0"),
512            (PER_DATABASE_ISOLATION_KEY, "true"),
513            (ENFORCE_SECRET_KEY, "false"),
514            ("a_deprecated_param", "foo"),
515        ];
516
517        // To kv - missing field.
518        let p = PbSystemParams::default();
519        assert!(system_params_to_kv(&p).is_err());
520
521        // From kv - unrecognized field should be ignored
522        assert!(system_params_from_kv(vec![("?", "?")]).is_ok());
523
524        // Deser & ser.
525        let p = system_params_from_kv(kvs).unwrap();
526        assert_eq!(
527            p,
528            system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap()
529        );
530    }
531
532    #[test]
533    fn test_set() {
534        let mut p = system_params_for_test();
535        // Unrecognized param.
536        assert!(set_system_param(&mut p, "?", Some("?".to_owned())).is_err());
537        // Value out of range.
538        assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("-1".to_owned())).is_err());
539        // Set immutable.
540        assert!(set_system_param(&mut p, STATE_STORE_KEY, Some("?".to_owned())).is_err());
541        // Parse error.
542        assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("?".to_owned())).is_err());
543        // Normal set.
544        assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("500".to_owned())).is_ok());
545        assert_eq!(p.checkpoint_frequency, Some(500));
546    }
547
548    #[test]
549    fn test_init() {
550        let mut p = system_params_for_test();
551        // Validate all params.
552        assert!(validate_init_system_params(&p).is_ok());
553        p.barrier_interval_ms = Some(10);
554        assert!(validate_init_system_params(&p).is_err());
555        p.barrier_interval_ms = Some(1000);
556        assert!(validate_init_system_params(&p).is_ok());
557    }
558
559    // Test that we always redact the value of the license key when displaying it, but when it comes to
560    // persistency, we still write and get the real value.
561    #[test]
562    fn test_redacted_type() {
563        let mut p = system_params_for_test();
564
565        let new_license_key_value = "new_license_key_value";
566        assert_ne!(p.license_key(), new_license_key_value);
567
568        let (new_string_value, diff) =
569            set_system_param(&mut p, LICENSE_KEY_KEY, Some(new_license_key_value))
570                .expect("should succeed")
571                .expect("should changed");
572
573        // New string value should be the same as what we set.
574        // This should not be redacted.
575        assert_eq!(new_string_value, new_license_key_value);
576
577        let new_value = diff.license_key.unwrap();
578        // `to_string` repr will be redacted.
579        assert_eq!(new_value.to_string(), "<redacted>");
580        // while `Into<String>` still shows the real value.
581        assert_eq!(String::from(new_value.as_ref()), new_license_key_value);
582    }
583}