1pub mod adaptive_parallelism_strategy;
24pub mod common;
25pub mod diff;
26pub mod local_manager;
27pub mod reader;
28
29use std::fmt::Debug;
30use std::ops::RangeBounds;
31use std::str::FromStr;
32
33use paste::paste;
34use risingwave_license::{LicenseKey, LicenseKeyRef};
35use risingwave_pb::meta::PbSystemParams;
36
37use self::diff::SystemParamsDiff;
38pub use crate::system_param::adaptive_parallelism_strategy::AdaptiveParallelismStrategy;
39
40pub type SystemParamsError = String;
41
42type Result<T> = core::result::Result<T, SystemParamsError>;
43
44pub trait ParamValue: ToString + FromStr {
46 type Borrowed<'a>;
47}
48
49macro_rules! impl_param_value {
50 ($type:ty) => {
51 impl_param_value!($type => $type);
52 };
53 ($type:ty => $borrowed:ty) => {
54 impl ParamValue for $type {
55 type Borrowed<'a> = $borrowed;
56 }
57 };
58}
59
60impl_param_value!(bool);
61impl_param_value!(u32);
62impl_param_value!(u64);
63impl_param_value!(f64);
64impl_param_value!(String => &'a str);
65impl_param_value!(LicenseKey => LicenseKeyRef<'a>);
66
67#[macro_export]
77macro_rules! for_all_params {
78 ($macro:ident) => {
79 $macro! {
80 { barrier_interval_ms, u32, Some(1000_u32), true, "The interval of periodic barrier.", },
82 { checkpoint_frequency, u64, Some(1_u64), true, "There will be a checkpoint for every n barriers.", },
83 { sstable_size_mb, u32, Some(256_u32), false, "Target size of the Sstable.", },
84 { parallel_compact_size_mb, u32, Some(512_u32), false, "The size of parallel task for one compact/flush job.", },
85 { block_size_kb, u32, Some(64_u32), false, "Size of each block in bytes in SST.", },
86 { bloom_false_positive, f64, Some(0.001_f64), false, "False positive probability of bloom filter.", },
87 { state_store, String, None, false, "URL for the state store", },
88 { data_directory, String, None, false, "Remote directory for storing data and metadata objects.", },
89 { backup_storage_url, String, None, true, "Remote storage url for storing snapshots.", },
90 { backup_storage_directory, String, None, true, "Remote directory for storing snapshots.", },
91 { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
92 { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
93 { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
94 { use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
95 { license_key, risingwave_license::LicenseKey, Some(Default::default()), true, "The license key to activate enterprise features.", },
96 { time_travel_retention_ms, u64, Some(600000_u64), true, "The data retention period for time travel.", },
97 { adaptive_parallelism_strategy, risingwave_common::system_param::AdaptiveParallelismStrategy, Some(Default::default()), true, "The strategy for Adaptive Parallelism.", },
98 { per_database_isolation, bool, Some(true), true, "Whether per database isolation is enabled", },
99 }
100 };
101}
102
103#[macro_export]
105macro_rules! key_of {
106 ($field:ident) => {
107 stringify!($field)
108 };
109}
110
111macro_rules! def_key {
113 ($({ $field:ident, $($rest:tt)* },)*) => {
114 paste! {
115 $(
116 pub const [<$field:upper _KEY>]: &str = key_of!($field);
117 )*
118 }
119 };
120}
121
122for_all_params!(def_key);
123
124macro_rules! def_default_opt {
126 ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
127 $(
128 paste::paste!(
129 pub fn [<$field _opt>]() -> Option<$type> {
130 $default
131 }
132 );
133 )*
134 };
135}
136
137macro_rules! def_default {
139 ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
140 $(
141 def_default!(@ $field, $type, $default);
142 )*
143 };
144 (@ $field:ident, $type:ty, None) => {};
145 (@ $field:ident, $type:ty, $default: expr) => {
146 pub fn $field() -> $type {
147 $default.unwrap()
148 }
149 paste::paste!(
150 pub static [<$field:upper>]: LazyLock<$type> = LazyLock::new($field);
151 );
152 };
153}
154
155pub mod default {
157 use std::sync::LazyLock;
158
159 for_all_params!(def_default_opt);
160 for_all_params!(def_default);
161}
162
163macro_rules! impl_check_missing_fields {
164 ($({ $field:ident, $($rest:tt)* },)*) => {
165 pub fn check_missing_params(params: &PbSystemParams) -> Result<()> {
167 $(
168 if params.$field.is_none() {
169 return Err(format!("missing system param {:?}", key_of!($field)));
170 }
171 )*
172 Ok(())
173 }
174 };
175}
176
177macro_rules! impl_system_params_to_kv {
179 ($({ $field:ident, $($rest:tt)* },)*) => {
180 #[allow(clippy::vec_init_then_push)]
183 pub fn system_params_to_kv(params: &PbSystemParams) -> Result<Vec<(String, String)>> {
184 check_missing_params(params)?;
185 let mut ret = Vec::new();
186 $(ret.push((
187 key_of!($field).to_owned(),
188 params.$field.as_ref().unwrap().to_string(),
189 ));)*
190 Ok(ret)
191 }
192 };
193}
194
195macro_rules! impl_derive_missing_fields {
196 ($({ $field:ident, $($rest:tt)* },)*) => {
197 pub fn derive_missing_fields(params: &mut PbSystemParams) {
198 $(
199 if params.$field.is_none() && let Some(v) = OverrideFromParams::$field(params) {
200 params.$field = Some(v.into());
201 }
202 )*
203 }
204 };
205}
206
207macro_rules! impl_system_params_from_kv {
209 ($({ $field:ident, $($rest:tt)* },)*) => {
210 pub fn system_params_from_kv<K, V>(mut kvs: Vec<(K, V)>) -> Result<PbSystemParams>
213 where
214 K: AsRef<[u8]> + Debug,
215 V: AsRef<[u8]> + Debug,
216 {
217 let mut ret = PbSystemParams::default();
218 kvs.retain(|(k,v)| {
219 let k = std::str::from_utf8(k.as_ref()).unwrap();
220 let v = std::str::from_utf8(v.as_ref()).unwrap();
221 match k {
222 $(
223 key_of!($field) => {
224 ret.$field = Some(v.parse().unwrap());
225 false
226 }
227 )*
228 _ => {
229 true
230 }
231 }
232 });
233 derive_missing_fields(&mut ret);
234 if !kvs.is_empty() {
235 let unrecognized_params = kvs.into_iter().map(|(k, v)| {
236 (
237 std::str::from_utf8(k.as_ref()).unwrap().to_owned(),
238 std::str::from_utf8(v.as_ref()).unwrap().to_owned(),
239 )
240 }).collect::<Vec<_>>();
241 tracing::warn!("unrecognized system params {:?}", unrecognized_params);
242 }
243 Ok(ret)
244 }
245 };
246}
247
248macro_rules! impl_default_validation_on_set {
252 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
253 #[allow(clippy::ptr_arg)]
254 trait ValidateOnSet {
255 $(
256 fn $field(_v: &$type) -> Result<()> {
257 if !$is_mutable {
258 Err(format!("{:?} is immutable", key_of!($field)))
259 } else {
260 Ok(())
261 }
262 }
263 )*
264
265 fn expect_range<T, R>(v: T, range: R) -> Result<()>
266 where
267 T: Debug + PartialOrd,
268 R: RangeBounds<T> + Debug,
269 {
270 if !range.contains::<T>(&v) {
271 Err(format!("value {:?} out of range, expect {:?}", v, range))
272 } else {
273 Ok(())
274 }
275 }
276 }
277 }
278}
279
280macro_rules! impl_default_from_other_params {
301 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
302 trait FromParams {
303 $(
304 fn $field(_params: &PbSystemParams) -> Option<$type> {
305 None
306 }
307 )*
308 }
309 };
310}
311
312macro_rules! impl_set_system_param {
313 ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
314 pub fn set_system_param(
319 params: &mut PbSystemParams,
320 key: &str,
321 value: Option<impl AsRef<str>>,
322 ) -> Result<Option<(String, SystemParamsDiff)>> {
323 use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};
324
325 match key {
326 $(
327 key_of!($field) => {
328 let v: $type = if let Some(v) = value {
329 #[allow(rw::format_error)]
330 v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
331 } else {
332 $default.ok_or_else(|| format!("{} does not have a default value", key))?
333 };
334 OverrideValidateOnSet::$field(&v)?;
335
336 let changed = SystemParamsReader::new(&*params).$field() != v;
337 if changed {
338 let diff = SystemParamsDiff {
339 $field: Some(v.to_owned()),
340 ..Default::default()
341 };
342 params.$field = Some(v.into()); let new_value = params.$field.as_ref().unwrap().to_string(); Ok(Some((new_value, diff)))
345 } else {
346 Ok(None)
347 }
348 },
349 )*
350 _ => {
351 Err(format!(
352 "unrecognized system parameter {:?}",
353 key
354 ))
355 }
356 }
357 }
358 };
359}
360
361macro_rules! impl_is_mutable {
362 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
363 pub fn is_mutable(field: &str) -> Result<bool> {
364 match field {
365 $(
366 key_of!($field) => Ok($is_mutable),
367 )*
368 _ => Err(format!("{:?} is not a system parameter", field))
369 }
370 }
371 }
372}
373
374macro_rules! impl_system_params_for_test {
375 ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
376 #[allow(clippy::needless_update)]
377 pub fn system_params_for_test() -> PbSystemParams {
378 let mut ret = PbSystemParams {
379 $(
380 $field: ($default as Option<$type>).map(Into::into),
381 )*
382 ..Default::default() };
384 ret.data_directory = Some("hummock_001".to_owned());
385 ret.state_store = Some("hummock+memory".to_owned());
386 ret.backup_storage_url = Some("memory".into());
387 ret.backup_storage_directory = Some("backup".into());
388 ret.use_new_object_prefix_strategy = Some(false);
389 ret.time_travel_retention_ms = Some(0);
390 ret
391 }
392 };
393}
394
395for_all_params!(impl_system_params_from_kv);
396for_all_params!(impl_is_mutable);
397for_all_params!(impl_derive_missing_fields);
398for_all_params!(impl_check_missing_fields);
399for_all_params!(impl_system_params_to_kv);
400for_all_params!(impl_set_system_param);
401for_all_params!(impl_default_validation_on_set);
402for_all_params!(impl_system_params_for_test);
403
404struct OverrideValidateOnSet;
405impl ValidateOnSet for OverrideValidateOnSet {
406 fn barrier_interval_ms(v: &u32) -> Result<()> {
407 Self::expect_range(*v, 100..)
408 }
409
410 fn checkpoint_frequency(v: &u64) -> Result<()> {
411 Self::expect_range(*v, 1..)
412 }
413
414 fn backup_storage_directory(v: &String) -> Result<()> {
415 if v.trim().is_empty() {
416 return Err("backup_storage_directory cannot be empty".into());
417 }
418 Ok(())
419 }
420
421 fn backup_storage_url(v: &String) -> Result<()> {
422 if v.trim().is_empty() {
423 return Err("backup_storage_url cannot be empty".into());
424 }
425 Ok(())
426 }
427
428 fn time_travel_retention_ms(v: &u64) -> Result<()> {
429 let min_retention_ms = 600_000;
431 if *v < min_retention_ms {
432 return Err(format!(
433 "time_travel_retention_ms cannot be less than {min_retention_ms}"
434 ));
435 }
436 Ok(())
437 }
438}
439
440for_all_params!(impl_default_from_other_params);
441
442struct OverrideFromParams;
443impl FromParams for OverrideFromParams {}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448
449 #[test]
450 fn test_to_from_kv() {
451 let kvs = vec![
453 (BARRIER_INTERVAL_MS_KEY, "1"),
454 (CHECKPOINT_FREQUENCY_KEY, "1"),
455 (SSTABLE_SIZE_MB_KEY, "1"),
456 (PARALLEL_COMPACT_SIZE_MB_KEY, "2"),
457 (BLOCK_SIZE_KB_KEY, "1"),
458 (BLOOM_FALSE_POSITIVE_KEY, "1"),
459 (STATE_STORE_KEY, "a"),
460 (DATA_DIRECTORY_KEY, "a"),
461 (BACKUP_STORAGE_URL_KEY, "a"),
462 (BACKUP_STORAGE_DIRECTORY_KEY, "a"),
463 (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
464 (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
465 (ENABLE_TRACING_KEY, "true"),
466 (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
467 (LICENSE_KEY_KEY, "foo"),
468 (TIME_TRAVEL_RETENTION_MS_KEY, "0"),
469 (ADAPTIVE_PARALLELISM_STRATEGY_KEY, "Auto"),
470 (PER_DATABASE_ISOLATION_KEY, "true"),
471 ("a_deprecated_param", "foo"),
472 ];
473
474 let p = PbSystemParams::default();
476 assert!(system_params_to_kv(&p).is_err());
477
478 assert!(system_params_from_kv(vec![("?", "?")]).is_ok());
480
481 let p = system_params_from_kv(kvs).unwrap();
483 assert_eq!(
484 p,
485 system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap()
486 );
487 }
488
489 #[test]
490 fn test_set() {
491 let mut p = system_params_for_test();
492 assert!(set_system_param(&mut p, "?", Some("?".to_owned())).is_err());
494 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("-1".to_owned())).is_err());
496 assert!(set_system_param(&mut p, STATE_STORE_KEY, Some("?".to_owned())).is_err());
498 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("?".to_owned())).is_err());
500 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("500".to_owned())).is_ok());
502 assert_eq!(p.checkpoint_frequency, Some(500));
503 }
504
505 #[test]
508 fn test_redacted_type() {
509 let mut p = system_params_for_test();
510
511 let new_license_key_value = "new_license_key_value";
512 assert_ne!(p.license_key(), new_license_key_value);
513
514 let (new_string_value, diff) =
515 set_system_param(&mut p, LICENSE_KEY_KEY, Some(new_license_key_value))
516 .expect("should succeed")
517 .expect("should changed");
518
519 assert_eq!(new_string_value, new_license_key_value);
522
523 let new_value = diff.license_key.unwrap();
524 assert_eq!(new_value.to_string(), "<redacted>");
526 assert_eq!(String::from(new_value.as_ref()), new_license_key_value);
528 }
529}