1pub mod adaptive_parallelism_strategy;
24pub mod common;
25pub mod diff;
26pub mod local_manager;
27pub mod reader;
28
29use std::fmt::Debug;
30use std::ops::RangeBounds;
31use std::str::FromStr;
32
33use paste::paste;
34use risingwave_license::{LicenseKey, LicenseKeyRef};
35use risingwave_pb::meta::PbSystemParams;
36
37use self::diff::SystemParamsDiff;
38pub use crate::system_param::adaptive_parallelism_strategy::AdaptiveParallelismStrategy;
39
40pub type SystemParamsError = String;
41
42type Result<T> = core::result::Result<T, SystemParamsError>;
43
44pub trait ParamValue: ToString + FromStr {
46 type Borrowed<'a>;
47}
48
49macro_rules! impl_param_value {
50 ($type:ty) => {
51 impl_param_value!($type => $type);
52 };
53 ($type:ty => $borrowed:ty) => {
54 impl ParamValue for $type {
55 type Borrowed<'a> = $borrowed;
56 }
57 };
58}
59
60impl_param_value!(bool);
61impl_param_value!(u32);
62impl_param_value!(u64);
63impl_param_value!(f64);
64impl_param_value!(String => &'a str);
65impl_param_value!(LicenseKey => LicenseKeyRef<'a>);
66
67#[macro_export]
77macro_rules! for_all_params {
78 ($macro:ident) => {
79 $macro! {
80 { barrier_interval_ms, u32, Some(1000_u32), true, "The interval of periodic barrier.", },
82 { checkpoint_frequency, u64, Some(1_u64), true, "There will be a checkpoint for every n barriers.", },
83 { sstable_size_mb, u32, Some(256_u32), false, "Target size of the Sstable.", },
84 { parallel_compact_size_mb, u32, Some(512_u32), false, "The size of parallel task for one compact/flush job.", },
85 { block_size_kb, u32, Some(64_u32), false, "Size of each block in bytes in SST.", },
86 { bloom_false_positive, f64, Some(0.001_f64), false, "DEPRECATED: Bloom filter is no longer a supported SST filter implementation. This field is kept for backward compatibility and no longer controls whether SST filters are emitted.", },
87 { state_store, String, None, false, "URL for the state store", },
88 { data_directory, String, None, false, "Remote directory for storing data and metadata objects.", },
89 { backup_storage_url, String, None, true, "Remote storage url for storing snapshots.", },
90 { backup_storage_directory, String, None, true, "Remote directory for storing snapshots.", },
91 { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
92 { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
93 { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
94 { use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
95 { license_key, risingwave_license::LicenseKey, Some(Default::default()), true, "The license key to activate enterprise features.", },
96 { time_travel_retention_ms, u64, Some(600000_u64), true, "The data retention period for time travel.", },
97 { per_database_isolation, bool, Some(true), true, "Whether per database isolation is enabled", },
98 { enforce_secret, bool, Some(false), true, "Whether to enforce secret on cloud.", },
99 }
100 };
101}
102
103pub const NOTICE_BARRIER_INTERVAL_MS: u32 = 300000;
105pub const NOTICE_CHECKPOINT_FREQUENCY: u64 = 60;
107
108#[macro_export]
110macro_rules! key_of {
111 ($field:ident) => {
112 stringify!($field)
113 };
114}
115
116macro_rules! def_key {
118 ($({ $field:ident, $($rest:tt)* },)*) => {
119 paste! {
120 $(
121 pub const [<$field:upper _KEY>]: &str = key_of!($field);
122 )*
123 }
124 };
125}
126
127for_all_params!(def_key);
128
129macro_rules! def_default_opt {
131 ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
132 $(
133 paste::paste!(
134 pub fn [<$field _opt>]() -> Option<$type> {
135 $default
136 }
137 );
138 )*
139 };
140}
141
142macro_rules! def_default {
144 ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
145 $(
146 def_default!(@ $field, $type, $default);
147 )*
148 };
149 (@ $field:ident, $type:ty, None) => {};
150 (@ $field:ident, $type:ty, $default: expr) => {
151 pub fn $field() -> $type {
152 $default.unwrap()
153 }
154 paste::paste!(
155 pub static [<$field:upper>]: LazyLock<$type> = LazyLock::new($field);
156 );
157 };
158}
159
160pub mod default {
162 use std::sync::LazyLock;
163
164 for_all_params!(def_default_opt);
165 for_all_params!(def_default);
166}
167
168macro_rules! impl_check_missing_fields {
169 ($({ $field:ident, $($rest:tt)* },)*) => {
170 #[expect(deprecated)]
172 pub fn check_missing_params(params: &PbSystemParams) -> Result<()> {
173 $(
174 if params.$field.is_none() {
175 return Err(format!("missing system param {:?}", key_of!($field)));
176 }
177 )*
178 Ok(())
179 }
180 };
181}
182
183macro_rules! impl_system_params_to_kv {
185 ($({ $field:ident, $($rest:tt)* },)*) => {
186 #[allow(clippy::vec_init_then_push)]
189 #[expect(deprecated)]
190 pub fn system_params_to_kv(params: &PbSystemParams) -> Result<Vec<(String, String)>> {
191 check_missing_params(params)?;
192 let mut ret = Vec::new();
193 $(ret.push((
194 key_of!($field).to_owned(),
195 params.$field.as_ref().unwrap().to_string(),
196 ));)*
197 Ok(ret)
198 }
199 };
200}
201
202macro_rules! impl_derive_missing_fields {
203 ($({ $field:ident, $($rest:tt)* },)*) => {
204 #[expect(deprecated)]
205 pub fn derive_missing_fields(params: &mut PbSystemParams) {
206 $(
207 if params.$field.is_none() && let Some(v) = OverrideFromParams::$field(params) {
208 params.$field = Some(v.into());
209 }
210 )*
211 }
212 };
213}
214
215macro_rules! impl_system_params_from_kv {
217 ($({ $field:ident, $($rest:tt)* },)*) => {
218 #[expect(deprecated)]
221 pub fn system_params_from_kv<K, V>(mut kvs: Vec<(K, V)>) -> Result<PbSystemParams>
222 where
223 K: AsRef<[u8]> + Debug,
224 V: AsRef<[u8]> + Debug,
225 {
226 let mut ret = PbSystemParams::default();
227 kvs.retain(|(k,v)| {
228 let k = std::str::from_utf8(k.as_ref()).unwrap();
229 let v = std::str::from_utf8(v.as_ref()).unwrap();
230 match k {
231 $(
232 key_of!($field) => {
233 ret.$field = Some(v.parse().unwrap());
234 false
235 }
236 )*
237 _ => {
238 true
239 }
240 }
241 });
242 derive_missing_fields(&mut ret);
243 if !kvs.is_empty() {
244 let unrecognized_params = kvs.into_iter().map(|(k, v)| {
245 (
246 std::str::from_utf8(k.as_ref()).unwrap().to_owned(),
247 std::str::from_utf8(v.as_ref()).unwrap().to_owned(),
248 )
249 }).collect::<Vec<_>>();
250 tracing::warn!("unrecognized system params {:?}", unrecognized_params);
251 }
252 Ok(ret)
253 }
254 };
255}
256
257macro_rules! impl_default_validation {
261 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
262 #[allow(clippy::ptr_arg)]
263 pub trait Validate {
264 $(
265 fn $field(_v: &$type) -> Result<()> {
268 Ok(())
269 }
270 )*
271
272 fn expect_range<T, R>(v: T, range: R) -> Result<()>
273 where
274 T: Debug + PartialOrd,
275 R: RangeBounds<T> + Debug,
276 {
277 if !range.contains::<T>(&v) {
278 Err(format!("value {:?} out of range, expect {:?}", v, range))
279 } else {
280 Ok(())
281 }
282 }
283 }
284 }
285}
286
287macro_rules! impl_default_from_other_params {
308 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
309 trait FromParams {
310 $(
311 fn $field(_params: &PbSystemParams) -> Option<$type> {
312 None
313 }
314 )*
315 }
316 };
317}
318
319macro_rules! impl_set_system_param {
320 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
321 #[expect(deprecated)]
326 pub fn set_system_param(
327 params: &mut PbSystemParams,
328 key: &str,
329 value: Option<impl AsRef<str>>,
330 ) -> Result<Option<(String, SystemParamsDiff)>> {
331 use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};
332
333 match key {
334 $(
335 key_of!($field) => {
336 if !$is_mutable {
337 return Err(format!("{:?} is immutable", key_of!($field)));
338 }
339
340 let v: $type = if let Some(v) = value {
341 #[allow(rw::format_error)]
342 v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
343 } else {
344 $default.ok_or_else(|| format!("{} does not have a default value", key))?
345 };
346 OverrideValidate::$field(&v)?;
347
348 let changed = SystemParamsReader::new(&*params).$field() != v;
349 if changed {
350 let diff = SystemParamsDiff {
351 $field: Some(v.to_owned()),
352 ..Default::default()
353 };
354 params.$field = Some(v.into()); let new_value = params.$field.as_ref().unwrap().to_string(); Ok(Some((new_value, diff)))
357 } else {
358 Ok(None)
359 }
360 },
361 )*
362 _ => {
363 Err(format!(
364 "unrecognized system parameter {:?}",
365 key
366 ))
367 }
368 }
369 }
370 };
371}
372
373macro_rules! impl_is_mutable {
374 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
375 pub fn is_mutable(field: &str) -> Result<bool> {
376 match field {
377 $(
378 key_of!($field) => Ok($is_mutable),
379 )*
380 _ => Err(format!("{:?} is not a system parameter", field))
381 }
382 }
383 }
384}
385
386macro_rules! impl_system_params_for_test {
387 ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
388 #[allow(clippy::needless_update)]
389 #[expect(deprecated)]
390 pub fn system_params_for_test() -> PbSystemParams {
391 let mut ret = PbSystemParams {
392 $(
393 $field: ($default as Option<$type>).map(Into::into),
394 )*
395 ..Default::default() };
397 ret.data_directory = Some("hummock_001".to_owned());
398 ret.state_store = Some("hummock+memory-isolated-for-test".to_owned());
399 ret.backup_storage_url = Some("memory-isolated-for-test".into());
400 ret.backup_storage_directory = Some("backup".into());
401 ret.use_new_object_prefix_strategy = Some(false);
402 ret.time_travel_retention_ms = Some(0);
403 ret
404 }
405 };
406}
407
408macro_rules! impl_validate_all_params {
409 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
410 #[allow(rw::format_error)]
416 #[expect(deprecated)]
417 pub fn validate_init_system_params(params: &PbSystemParams) -> Result<()> {
418 $(
419 if let Some(ref v_pb) = params.$field {
420 let logical_v: $type = v_pb.to_string().parse()
425 .map_err(|e| format!("cannot parse value for parameter '{}': {}", key_of!($field), e))?;
426 OverrideValidate::$field(&logical_v)
428 .map_err(|e| format!("invalid value for parameter '{}': {}", key_of!($field), e))?;
429 }
430 )*
431 Ok(())
432 }
433 };
434}
435
436for_all_params!(impl_system_params_from_kv);
437for_all_params!(impl_is_mutable);
438for_all_params!(impl_derive_missing_fields);
439for_all_params!(impl_check_missing_fields);
440for_all_params!(impl_system_params_to_kv);
441for_all_params!(impl_set_system_param);
442for_all_params!(impl_default_validation);
443for_all_params!(impl_validate_all_params);
444for_all_params!(impl_system_params_for_test);
445
446pub struct OverrideValidate;
447impl Validate for OverrideValidate {
448 fn barrier_interval_ms(v: &u32) -> Result<()> {
449 Self::expect_range(*v, 50..)
450 }
451
452 fn checkpoint_frequency(v: &u64) -> Result<()> {
453 Self::expect_range(*v, 1..)
454 }
455
456 fn backup_storage_directory(v: &String) -> Result<()> {
457 if v.trim().is_empty() {
458 return Err("backup_storage_directory cannot be empty".into());
459 }
460 Ok(())
461 }
462
463 fn backup_storage_url(v: &String) -> Result<()> {
464 if v.trim().is_empty() {
465 return Err("backup_storage_url cannot be empty".into());
466 }
467 Ok(())
468 }
469
470 fn time_travel_retention_ms(v: &u64) -> Result<()> {
471 let min_retention_ms = 600_000;
473 if *v != 0 && *v < min_retention_ms {
475 return Err(format!(
476 "time_travel_retention_ms cannot be less than {min_retention_ms}"
477 ));
478 }
479 Ok(())
480 }
481}
482
483for_all_params!(impl_default_from_other_params);
484
485struct OverrideFromParams;
486impl FromParams for OverrideFromParams {}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 #[test]
493 fn test_to_from_kv() {
494 let kvs = vec![
496 (BARRIER_INTERVAL_MS_KEY, "1"),
497 (CHECKPOINT_FREQUENCY_KEY, "1"),
498 (SSTABLE_SIZE_MB_KEY, "1"),
499 (PARALLEL_COMPACT_SIZE_MB_KEY, "2"),
500 (BLOCK_SIZE_KB_KEY, "1"),
501 (BLOOM_FALSE_POSITIVE_KEY, "1"),
502 (STATE_STORE_KEY, "a"),
503 (DATA_DIRECTORY_KEY, "a"),
504 (BACKUP_STORAGE_URL_KEY, "a"),
505 (BACKUP_STORAGE_DIRECTORY_KEY, "a"),
506 (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
507 (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
508 (ENABLE_TRACING_KEY, "true"),
509 (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
510 (LICENSE_KEY_KEY, "foo"),
511 (TIME_TRAVEL_RETENTION_MS_KEY, "0"),
512 (PER_DATABASE_ISOLATION_KEY, "true"),
513 (ENFORCE_SECRET_KEY, "false"),
514 ("a_deprecated_param", "foo"),
515 ];
516
517 let p = PbSystemParams::default();
519 assert!(system_params_to_kv(&p).is_err());
520
521 assert!(system_params_from_kv(vec![("?", "?")]).is_ok());
523
524 let p = system_params_from_kv(kvs).unwrap();
526 assert_eq!(
527 p,
528 system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap()
529 );
530 }
531
532 #[test]
533 fn test_set() {
534 let mut p = system_params_for_test();
535 assert!(set_system_param(&mut p, "?", Some("?".to_owned())).is_err());
537 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("-1".to_owned())).is_err());
539 assert!(set_system_param(&mut p, STATE_STORE_KEY, Some("?".to_owned())).is_err());
541 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("?".to_owned())).is_err());
543 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("500".to_owned())).is_ok());
545 assert_eq!(p.checkpoint_frequency, Some(500));
546 }
547
548 #[test]
549 fn test_init() {
550 let mut p = system_params_for_test();
551 assert!(validate_init_system_params(&p).is_ok());
553 p.barrier_interval_ms = Some(10);
554 assert!(validate_init_system_params(&p).is_err());
555 p.barrier_interval_ms = Some(1000);
556 assert!(validate_init_system_params(&p).is_ok());
557 }
558
559 #[test]
562 fn test_redacted_type() {
563 let mut p = system_params_for_test();
564
565 let new_license_key_value = "new_license_key_value";
566 assert_ne!(p.license_key(), new_license_key_value);
567
568 let (new_string_value, diff) =
569 set_system_param(&mut p, LICENSE_KEY_KEY, Some(new_license_key_value))
570 .expect("should succeed")
571 .expect("should changed");
572
573 assert_eq!(new_string_value, new_license_key_value);
576
577 let new_value = diff.license_key.unwrap();
578 assert_eq!(new_value.to_string(), "<redacted>");
580 assert_eq!(String::from(new_value.as_ref()), new_license_key_value);
582 }
583}