1pub mod adaptive_parallelism_strategy;
24pub mod common;
25pub mod diff;
26pub mod local_manager;
27pub mod reader;
28
29use std::fmt::Debug;
30use std::ops::RangeBounds;
31use std::str::FromStr;
32
33use paste::paste;
34use risingwave_license::{LicenseKey, LicenseKeyRef};
35use risingwave_pb::meta::PbSystemParams;
36
37use self::diff::SystemParamsDiff;
38pub use crate::system_param::adaptive_parallelism_strategy::AdaptiveParallelismStrategy;
39
40pub type SystemParamsError = String;
41
42type Result<T> = core::result::Result<T, SystemParamsError>;
43
44pub trait ParamValue: ToString + FromStr {
46 type Borrowed<'a>;
47}
48
49macro_rules! impl_param_value {
50 ($type:ty) => {
51 impl_param_value!($type => $type);
52 };
53 ($type:ty => $borrowed:ty) => {
54 impl ParamValue for $type {
55 type Borrowed<'a> = $borrowed;
56 }
57 };
58}
59
60impl_param_value!(bool);
61impl_param_value!(u32);
62impl_param_value!(u64);
63impl_param_value!(f64);
64impl_param_value!(String => &'a str);
65impl_param_value!(LicenseKey => LicenseKeyRef<'a>);
66
67#[macro_export]
77macro_rules! for_all_params {
78 ($macro:ident) => {
79 $macro! {
80 { barrier_interval_ms, u32, Some(1000_u32), true, "The interval of periodic barrier.", },
82 { checkpoint_frequency, u64, Some(1_u64), true, "There will be a checkpoint for every n barriers.", },
83 { sstable_size_mb, u32, Some(256_u32), false, "Target size of the Sstable.", },
84 { parallel_compact_size_mb, u32, Some(512_u32), false, "The size of parallel task for one compact/flush job.", },
85 { block_size_kb, u32, Some(64_u32), false, "Size of each block in bytes in SST.", },
86 { bloom_false_positive, f64, Some(0.001_f64), false, "False positive probability of bloom filter.", },
87 { state_store, String, None, false, "URL for the state store", },
88 { data_directory, String, None, false, "Remote directory for storing data and metadata objects.", },
89 { backup_storage_url, String, None, true, "Remote storage url for storing snapshots.", },
90 { backup_storage_directory, String, None, true, "Remote directory for storing snapshots.", },
91 { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
92 { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
93 { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
94 { use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
95 { license_key, risingwave_license::LicenseKey, Some(Default::default()), true, "The license key to activate enterprise features.", },
96 { time_travel_retention_ms, u64, Some(600000_u64), true, "The data retention period for time travel.", },
97 { per_database_isolation, bool, Some(true), true, "Whether per database isolation is enabled", },
98 { enforce_secret, bool, Some(false), true, "Whether to enforce secret on cloud.", },
99 }
100 };
101}
102
103pub const NOTICE_BARRIER_INTERVAL_MS: u32 = 300000;
105pub const NOTICE_CHECKPOINT_FREQUENCY: u64 = 60;
107
108#[macro_export]
110macro_rules! key_of {
111 ($field:ident) => {
112 stringify!($field)
113 };
114}
115
116macro_rules! def_key {
118 ($({ $field:ident, $($rest:tt)* },)*) => {
119 paste! {
120 $(
121 pub const [<$field:upper _KEY>]: &str = key_of!($field);
122 )*
123 }
124 };
125}
126
127for_all_params!(def_key);
128
129macro_rules! def_default_opt {
131 ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
132 $(
133 paste::paste!(
134 pub fn [<$field _opt>]() -> Option<$type> {
135 $default
136 }
137 );
138 )*
139 };
140}
141
142macro_rules! def_default {
144 ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
145 $(
146 def_default!(@ $field, $type, $default);
147 )*
148 };
149 (@ $field:ident, $type:ty, None) => {};
150 (@ $field:ident, $type:ty, $default: expr) => {
151 pub fn $field() -> $type {
152 $default.unwrap()
153 }
154 paste::paste!(
155 pub static [<$field:upper>]: LazyLock<$type> = LazyLock::new($field);
156 );
157 };
158}
159
160pub mod default {
162 use std::sync::LazyLock;
163
164 for_all_params!(def_default_opt);
165 for_all_params!(def_default);
166}
167
168macro_rules! impl_check_missing_fields {
169 ($({ $field:ident, $($rest:tt)* },)*) => {
170 pub fn check_missing_params(params: &PbSystemParams) -> Result<()> {
172 $(
173 if params.$field.is_none() {
174 return Err(format!("missing system param {:?}", key_of!($field)));
175 }
176 )*
177 Ok(())
178 }
179 };
180}
181
182macro_rules! impl_system_params_to_kv {
184 ($({ $field:ident, $($rest:tt)* },)*) => {
185 #[allow(clippy::vec_init_then_push)]
188 pub fn system_params_to_kv(params: &PbSystemParams) -> Result<Vec<(String, String)>> {
189 check_missing_params(params)?;
190 let mut ret = Vec::new();
191 $(ret.push((
192 key_of!($field).to_owned(),
193 params.$field.as_ref().unwrap().to_string(),
194 ));)*
195 Ok(ret)
196 }
197 };
198}
199
200macro_rules! impl_derive_missing_fields {
201 ($({ $field:ident, $($rest:tt)* },)*) => {
202 pub fn derive_missing_fields(params: &mut PbSystemParams) {
203 $(
204 if params.$field.is_none() && let Some(v) = OverrideFromParams::$field(params) {
205 params.$field = Some(v.into());
206 }
207 )*
208 }
209 };
210}
211
212macro_rules! impl_system_params_from_kv {
214 ($({ $field:ident, $($rest:tt)* },)*) => {
215 pub fn system_params_from_kv<K, V>(mut kvs: Vec<(K, V)>) -> Result<PbSystemParams>
218 where
219 K: AsRef<[u8]> + Debug,
220 V: AsRef<[u8]> + Debug,
221 {
222 let mut ret = PbSystemParams::default();
223 kvs.retain(|(k,v)| {
224 let k = std::str::from_utf8(k.as_ref()).unwrap();
225 let v = std::str::from_utf8(v.as_ref()).unwrap();
226 match k {
227 $(
228 key_of!($field) => {
229 ret.$field = Some(v.parse().unwrap());
230 false
231 }
232 )*
233 _ => {
234 true
235 }
236 }
237 });
238 derive_missing_fields(&mut ret);
239 if !kvs.is_empty() {
240 let unrecognized_params = kvs.into_iter().map(|(k, v)| {
241 (
242 std::str::from_utf8(k.as_ref()).unwrap().to_owned(),
243 std::str::from_utf8(v.as_ref()).unwrap().to_owned(),
244 )
245 }).collect::<Vec<_>>();
246 tracing::warn!("unrecognized system params {:?}", unrecognized_params);
247 }
248 Ok(ret)
249 }
250 };
251}
252
253macro_rules! impl_default_validation {
257 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
258 #[allow(clippy::ptr_arg)]
259 pub trait Validate {
260 $(
261 fn $field(_v: &$type) -> Result<()> {
264 Ok(())
265 }
266 )*
267
268 fn expect_range<T, R>(v: T, range: R) -> Result<()>
269 where
270 T: Debug + PartialOrd,
271 R: RangeBounds<T> + Debug,
272 {
273 if !range.contains::<T>(&v) {
274 Err(format!("value {:?} out of range, expect {:?}", v, range))
275 } else {
276 Ok(())
277 }
278 }
279 }
280 }
281}
282
283macro_rules! impl_default_from_other_params {
304 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
305 trait FromParams {
306 $(
307 fn $field(_params: &PbSystemParams) -> Option<$type> {
308 None
309 }
310 )*
311 }
312 };
313}
314
315macro_rules! impl_set_system_param {
316 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
317 pub fn set_system_param(
322 params: &mut PbSystemParams,
323 key: &str,
324 value: Option<impl AsRef<str>>,
325 ) -> Result<Option<(String, SystemParamsDiff)>> {
326 use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};
327
328 match key {
329 $(
330 key_of!($field) => {
331 if !$is_mutable {
332 return Err(format!("{:?} is immutable", key_of!($field)));
333 }
334
335 let v: $type = if let Some(v) = value {
336 #[allow(rw::format_error)]
337 v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
338 } else {
339 $default.ok_or_else(|| format!("{} does not have a default value", key))?
340 };
341 OverrideValidate::$field(&v)?;
342
343 let changed = SystemParamsReader::new(&*params).$field() != v;
344 if changed {
345 let diff = SystemParamsDiff {
346 $field: Some(v.to_owned()),
347 ..Default::default()
348 };
349 params.$field = Some(v.into()); let new_value = params.$field.as_ref().unwrap().to_string(); Ok(Some((new_value, diff)))
352 } else {
353 Ok(None)
354 }
355 },
356 )*
357 _ => {
358 Err(format!(
359 "unrecognized system parameter {:?}",
360 key
361 ))
362 }
363 }
364 }
365 };
366}
367
368macro_rules! impl_is_mutable {
369 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
370 pub fn is_mutable(field: &str) -> Result<bool> {
371 match field {
372 $(
373 key_of!($field) => Ok($is_mutable),
374 )*
375 _ => Err(format!("{:?} is not a system parameter", field))
376 }
377 }
378 }
379}
380
381macro_rules! impl_system_params_for_test {
382 ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
383 #[allow(clippy::needless_update)]
384 pub fn system_params_for_test() -> PbSystemParams {
385 let mut ret = PbSystemParams {
386 $(
387 $field: ($default as Option<$type>).map(Into::into),
388 )*
389 ..Default::default() };
391 ret.data_directory = Some("hummock_001".to_owned());
392 ret.state_store = Some("hummock+memory-isolated-for-test".to_owned());
393 ret.backup_storage_url = Some("memory-isolated-for-test".into());
394 ret.backup_storage_directory = Some("backup".into());
395 ret.use_new_object_prefix_strategy = Some(false);
396 ret.time_travel_retention_ms = Some(0);
397 ret
398 }
399 };
400}
401
402macro_rules! impl_validate_all_params {
403 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
404 #[allow(rw::format_error)]
410 pub fn validate_init_system_params(params: &PbSystemParams) -> Result<()> {
411 $(
412 if let Some(ref v_pb) = params.$field {
413 let logical_v: $type = v_pb.to_string().parse()
418 .map_err(|e| format!("cannot parse value for parameter '{}': {}", key_of!($field), e))?;
419 OverrideValidate::$field(&logical_v)
421 .map_err(|e| format!("invalid value for parameter '{}': {}", key_of!($field), e))?;
422 }
423 )*
424 Ok(())
425 }
426 };
427}
428
429for_all_params!(impl_system_params_from_kv);
430for_all_params!(impl_is_mutable);
431for_all_params!(impl_derive_missing_fields);
432for_all_params!(impl_check_missing_fields);
433for_all_params!(impl_system_params_to_kv);
434for_all_params!(impl_set_system_param);
435for_all_params!(impl_default_validation);
436for_all_params!(impl_validate_all_params);
437for_all_params!(impl_system_params_for_test);
438
439pub struct OverrideValidate;
440impl Validate for OverrideValidate {
441 fn barrier_interval_ms(v: &u32) -> Result<()> {
442 Self::expect_range(*v, 50..)
443 }
444
445 fn checkpoint_frequency(v: &u64) -> Result<()> {
446 Self::expect_range(*v, 1..)
447 }
448
449 fn backup_storage_directory(v: &String) -> Result<()> {
450 if v.trim().is_empty() {
451 return Err("backup_storage_directory cannot be empty".into());
452 }
453 Ok(())
454 }
455
456 fn backup_storage_url(v: &String) -> Result<()> {
457 if v.trim().is_empty() {
458 return Err("backup_storage_url cannot be empty".into());
459 }
460 Ok(())
461 }
462
463 fn time_travel_retention_ms(v: &u64) -> Result<()> {
464 let min_retention_ms = 600_000;
466 if *v != 0 && *v < min_retention_ms {
468 return Err(format!(
469 "time_travel_retention_ms cannot be less than {min_retention_ms}"
470 ));
471 }
472 Ok(())
473 }
474}
475
476for_all_params!(impl_default_from_other_params);
477
478struct OverrideFromParams;
479impl FromParams for OverrideFromParams {}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484
485 #[test]
486 fn test_to_from_kv() {
487 let kvs = vec![
489 (BARRIER_INTERVAL_MS_KEY, "1"),
490 (CHECKPOINT_FREQUENCY_KEY, "1"),
491 (SSTABLE_SIZE_MB_KEY, "1"),
492 (PARALLEL_COMPACT_SIZE_MB_KEY, "2"),
493 (BLOCK_SIZE_KB_KEY, "1"),
494 (BLOOM_FALSE_POSITIVE_KEY, "1"),
495 (STATE_STORE_KEY, "a"),
496 (DATA_DIRECTORY_KEY, "a"),
497 (BACKUP_STORAGE_URL_KEY, "a"),
498 (BACKUP_STORAGE_DIRECTORY_KEY, "a"),
499 (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
500 (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
501 (ENABLE_TRACING_KEY, "true"),
502 (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
503 (LICENSE_KEY_KEY, "foo"),
504 (TIME_TRAVEL_RETENTION_MS_KEY, "0"),
505 (PER_DATABASE_ISOLATION_KEY, "true"),
506 (ENFORCE_SECRET_KEY, "false"),
507 ("a_deprecated_param", "foo"),
508 ];
509
510 let p = PbSystemParams::default();
512 assert!(system_params_to_kv(&p).is_err());
513
514 assert!(system_params_from_kv(vec![("?", "?")]).is_ok());
516
517 let p = system_params_from_kv(kvs).unwrap();
519 assert_eq!(
520 p,
521 system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap()
522 );
523 }
524
525 #[test]
526 fn test_set() {
527 let mut p = system_params_for_test();
528 assert!(set_system_param(&mut p, "?", Some("?".to_owned())).is_err());
530 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("-1".to_owned())).is_err());
532 assert!(set_system_param(&mut p, STATE_STORE_KEY, Some("?".to_owned())).is_err());
534 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("?".to_owned())).is_err());
536 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("500".to_owned())).is_ok());
538 assert_eq!(p.checkpoint_frequency, Some(500));
539 }
540
541 #[test]
542 fn test_init() {
543 let mut p = system_params_for_test();
544 assert!(validate_init_system_params(&p).is_ok());
546 p.barrier_interval_ms = Some(10);
547 assert!(validate_init_system_params(&p).is_err());
548 p.barrier_interval_ms = Some(1000);
549 assert!(validate_init_system_params(&p).is_ok());
550 }
551
552 #[test]
555 fn test_redacted_type() {
556 let mut p = system_params_for_test();
557
558 let new_license_key_value = "new_license_key_value";
559 assert_ne!(p.license_key(), new_license_key_value);
560
561 let (new_string_value, diff) =
562 set_system_param(&mut p, LICENSE_KEY_KEY, Some(new_license_key_value))
563 .expect("should succeed")
564 .expect("should changed");
565
566 assert_eq!(new_string_value, new_license_key_value);
569
570 let new_value = diff.license_key.unwrap();
571 assert_eq!(new_value.to_string(), "<redacted>");
573 assert_eq!(String::from(new_value.as_ref()), new_license_key_value);
575 }
576}