1pub mod adaptive_parallelism_strategy;
24pub mod common;
25pub mod diff;
26pub mod local_manager;
27pub mod reader;
28
29use std::fmt::Debug;
30use std::ops::RangeBounds;
31use std::str::FromStr;
32
33use paste::paste;
34use risingwave_license::{LicenseKey, LicenseKeyRef};
35use risingwave_pb::meta::PbSystemParams;
36
37use self::diff::SystemParamsDiff;
38pub use crate::system_param::adaptive_parallelism_strategy::AdaptiveParallelismStrategy;
39
40pub type SystemParamsError = String;
41
42type Result<T> = core::result::Result<T, SystemParamsError>;
43
44pub trait ParamValue: ToString + FromStr {
46 type Borrowed<'a>;
47}
48
49macro_rules! impl_param_value {
50 ($type:ty) => {
51 impl_param_value!($type => $type);
52 };
53 ($type:ty => $borrowed:ty) => {
54 impl ParamValue for $type {
55 type Borrowed<'a> = $borrowed;
56 }
57 };
58}
59
60impl_param_value!(bool);
61impl_param_value!(u32);
62impl_param_value!(u64);
63impl_param_value!(f64);
64impl_param_value!(String => &'a str);
65impl_param_value!(LicenseKey => LicenseKeyRef<'a>);
66
67#[macro_export]
77macro_rules! for_all_params {
78 ($macro:ident) => {
79 $macro! {
80 { barrier_interval_ms, u32, Some(1000_u32), true, "The interval of periodic barrier.", },
82 { checkpoint_frequency, u64, Some(1_u64), true, "There will be a checkpoint for every n barriers.", },
83 { sstable_size_mb, u32, Some(256_u32), false, "Target size of the Sstable.", },
84 { parallel_compact_size_mb, u32, Some(512_u32), false, "The size of parallel task for one compact/flush job.", },
85 { block_size_kb, u32, Some(64_u32), false, "Size of each block in bytes in SST.", },
86 { bloom_false_positive, f64, Some(0.001_f64), false, "False positive probability of bloom filter.", },
87 { state_store, String, None, false, "URL for the state store", },
88 { data_directory, String, None, false, "Remote directory for storing data and metadata objects.", },
89 { backup_storage_url, String, None, true, "Remote storage url for storing snapshots.", },
90 { backup_storage_directory, String, None, true, "Remote directory for storing snapshots.", },
91 { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
92 { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
93 { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
94 { use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
95 { license_key, risingwave_license::LicenseKey, Some(Default::default()), true, "The license key to activate enterprise features.", },
96 { time_travel_retention_ms, u64, Some(600000_u64), true, "The data retention period for time travel.", },
97 { adaptive_parallelism_strategy, risingwave_common::system_param::AdaptiveParallelismStrategy, Some(Default::default()), true, "The strategy for Adaptive Parallelism.", },
98 { per_database_isolation, bool, Some(true), true, "Whether per database isolation is enabled", },
99 { enforce_secret, bool, Some(false), true, "Whether to enforce secret on cloud.", },
100 }
101 };
102}
103
104#[macro_export]
106macro_rules! key_of {
107 ($field:ident) => {
108 stringify!($field)
109 };
110}
111
112macro_rules! def_key {
114 ($({ $field:ident, $($rest:tt)* },)*) => {
115 paste! {
116 $(
117 pub const [<$field:upper _KEY>]: &str = key_of!($field);
118 )*
119 }
120 };
121}
122
123for_all_params!(def_key);
124
125macro_rules! def_default_opt {
127 ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
128 $(
129 paste::paste!(
130 pub fn [<$field _opt>]() -> Option<$type> {
131 $default
132 }
133 );
134 )*
135 };
136}
137
138macro_rules! def_default {
140 ($({ $field:ident, $type:ty, $default: expr, $($rest:tt)* },)*) => {
141 $(
142 def_default!(@ $field, $type, $default);
143 )*
144 };
145 (@ $field:ident, $type:ty, None) => {};
146 (@ $field:ident, $type:ty, $default: expr) => {
147 pub fn $field() -> $type {
148 $default.unwrap()
149 }
150 paste::paste!(
151 pub static [<$field:upper>]: LazyLock<$type> = LazyLock::new($field);
152 );
153 };
154}
155
156pub mod default {
158 use std::sync::LazyLock;
159
160 for_all_params!(def_default_opt);
161 for_all_params!(def_default);
162}
163
164macro_rules! impl_check_missing_fields {
165 ($({ $field:ident, $($rest:tt)* },)*) => {
166 pub fn check_missing_params(params: &PbSystemParams) -> Result<()> {
168 $(
169 if params.$field.is_none() {
170 return Err(format!("missing system param {:?}", key_of!($field)));
171 }
172 )*
173 Ok(())
174 }
175 };
176}
177
178macro_rules! impl_system_params_to_kv {
180 ($({ $field:ident, $($rest:tt)* },)*) => {
181 #[allow(clippy::vec_init_then_push)]
184 pub fn system_params_to_kv(params: &PbSystemParams) -> Result<Vec<(String, String)>> {
185 check_missing_params(params)?;
186 let mut ret = Vec::new();
187 $(ret.push((
188 key_of!($field).to_owned(),
189 params.$field.as_ref().unwrap().to_string(),
190 ));)*
191 Ok(ret)
192 }
193 };
194}
195
196macro_rules! impl_derive_missing_fields {
197 ($({ $field:ident, $($rest:tt)* },)*) => {
198 pub fn derive_missing_fields(params: &mut PbSystemParams) {
199 $(
200 if params.$field.is_none() && let Some(v) = OverrideFromParams::$field(params) {
201 params.$field = Some(v.into());
202 }
203 )*
204 }
205 };
206}
207
208macro_rules! impl_system_params_from_kv {
210 ($({ $field:ident, $($rest:tt)* },)*) => {
211 pub fn system_params_from_kv<K, V>(mut kvs: Vec<(K, V)>) -> Result<PbSystemParams>
214 where
215 K: AsRef<[u8]> + Debug,
216 V: AsRef<[u8]> + Debug,
217 {
218 let mut ret = PbSystemParams::default();
219 kvs.retain(|(k,v)| {
220 let k = std::str::from_utf8(k.as_ref()).unwrap();
221 let v = std::str::from_utf8(v.as_ref()).unwrap();
222 match k {
223 $(
224 key_of!($field) => {
225 ret.$field = Some(v.parse().unwrap());
226 false
227 }
228 )*
229 _ => {
230 true
231 }
232 }
233 });
234 derive_missing_fields(&mut ret);
235 if !kvs.is_empty() {
236 let unrecognized_params = kvs.into_iter().map(|(k, v)| {
237 (
238 std::str::from_utf8(k.as_ref()).unwrap().to_owned(),
239 std::str::from_utf8(v.as_ref()).unwrap().to_owned(),
240 )
241 }).collect::<Vec<_>>();
242 tracing::warn!("unrecognized system params {:?}", unrecognized_params);
243 }
244 Ok(ret)
245 }
246 };
247}
248
249macro_rules! impl_default_validation_on_set {
253 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
254 #[allow(clippy::ptr_arg)]
255 trait ValidateOnSet {
256 $(
257 fn $field(_v: &$type) -> Result<()> {
258 if !$is_mutable {
259 Err(format!("{:?} is immutable", key_of!($field)))
260 } else {
261 Ok(())
262 }
263 }
264 )*
265
266 fn expect_range<T, R>(v: T, range: R) -> Result<()>
267 where
268 T: Debug + PartialOrd,
269 R: RangeBounds<T> + Debug,
270 {
271 if !range.contains::<T>(&v) {
272 Err(format!("value {:?} out of range, expect {:?}", v, range))
273 } else {
274 Ok(())
275 }
276 }
277 }
278 }
279}
280
281macro_rules! impl_default_from_other_params {
302 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
303 trait FromParams {
304 $(
305 fn $field(_params: &PbSystemParams) -> Option<$type> {
306 None
307 }
308 )*
309 }
310 };
311}
312
313macro_rules! impl_set_system_param {
314 ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
315 pub fn set_system_param(
320 params: &mut PbSystemParams,
321 key: &str,
322 value: Option<impl AsRef<str>>,
323 ) -> Result<Option<(String, SystemParamsDiff)>> {
324 use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};
325
326 match key {
327 $(
328 key_of!($field) => {
329 let v: $type = if let Some(v) = value {
330 #[allow(rw::format_error)]
331 v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
332 } else {
333 $default.ok_or_else(|| format!("{} does not have a default value", key))?
334 };
335 OverrideValidateOnSet::$field(&v)?;
336
337 let changed = SystemParamsReader::new(&*params).$field() != v;
338 if changed {
339 let diff = SystemParamsDiff {
340 $field: Some(v.to_owned()),
341 ..Default::default()
342 };
343 params.$field = Some(v.into()); let new_value = params.$field.as_ref().unwrap().to_string(); Ok(Some((new_value, diff)))
346 } else {
347 Ok(None)
348 }
349 },
350 )*
351 _ => {
352 Err(format!(
353 "unrecognized system parameter {:?}",
354 key
355 ))
356 }
357 }
358 }
359 };
360}
361
362macro_rules! impl_is_mutable {
363 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
364 pub fn is_mutable(field: &str) -> Result<bool> {
365 match field {
366 $(
367 key_of!($field) => Ok($is_mutable),
368 )*
369 _ => Err(format!("{:?} is not a system parameter", field))
370 }
371 }
372 }
373}
374
375macro_rules! impl_system_params_for_test {
376 ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
377 #[allow(clippy::needless_update)]
378 pub fn system_params_for_test() -> PbSystemParams {
379 let mut ret = PbSystemParams {
380 $(
381 $field: ($default as Option<$type>).map(Into::into),
382 )*
383 ..Default::default() };
385 ret.data_directory = Some("hummock_001".to_owned());
386 ret.state_store = Some("hummock+memory-isolated-for-test".to_owned());
387 ret.backup_storage_url = Some("memory-isolated-for-test".into());
388 ret.backup_storage_directory = Some("backup".into());
389 ret.use_new_object_prefix_strategy = Some(false);
390 ret.time_travel_retention_ms = Some(0);
391 ret
392 }
393 };
394}
395
396for_all_params!(impl_system_params_from_kv);
397for_all_params!(impl_is_mutable);
398for_all_params!(impl_derive_missing_fields);
399for_all_params!(impl_check_missing_fields);
400for_all_params!(impl_system_params_to_kv);
401for_all_params!(impl_set_system_param);
402for_all_params!(impl_default_validation_on_set);
403for_all_params!(impl_system_params_for_test);
404
405struct OverrideValidateOnSet;
406impl ValidateOnSet for OverrideValidateOnSet {
407 fn barrier_interval_ms(v: &u32) -> Result<()> {
408 Self::expect_range(*v, 100..)
409 }
410
411 fn checkpoint_frequency(v: &u64) -> Result<()> {
412 Self::expect_range(*v, 1..)
413 }
414
415 fn backup_storage_directory(v: &String) -> Result<()> {
416 if v.trim().is_empty() {
417 return Err("backup_storage_directory cannot be empty".into());
418 }
419 Ok(())
420 }
421
422 fn backup_storage_url(v: &String) -> Result<()> {
423 if v.trim().is_empty() {
424 return Err("backup_storage_url cannot be empty".into());
425 }
426 Ok(())
427 }
428
429 fn time_travel_retention_ms(v: &u64) -> Result<()> {
430 let min_retention_ms = 600_000;
432 if *v < min_retention_ms {
433 return Err(format!(
434 "time_travel_retention_ms cannot be less than {min_retention_ms}"
435 ));
436 }
437 Ok(())
438 }
439}
440
441for_all_params!(impl_default_from_other_params);
442
443struct OverrideFromParams;
444impl FromParams for OverrideFromParams {}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 #[test]
451 fn test_to_from_kv() {
452 let kvs = vec![
454 (BARRIER_INTERVAL_MS_KEY, "1"),
455 (CHECKPOINT_FREQUENCY_KEY, "1"),
456 (SSTABLE_SIZE_MB_KEY, "1"),
457 (PARALLEL_COMPACT_SIZE_MB_KEY, "2"),
458 (BLOCK_SIZE_KB_KEY, "1"),
459 (BLOOM_FALSE_POSITIVE_KEY, "1"),
460 (STATE_STORE_KEY, "a"),
461 (DATA_DIRECTORY_KEY, "a"),
462 (BACKUP_STORAGE_URL_KEY, "a"),
463 (BACKUP_STORAGE_DIRECTORY_KEY, "a"),
464 (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
465 (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
466 (ENABLE_TRACING_KEY, "true"),
467 (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
468 (LICENSE_KEY_KEY, "foo"),
469 (TIME_TRAVEL_RETENTION_MS_KEY, "0"),
470 (ADAPTIVE_PARALLELISM_STRATEGY_KEY, "Auto"),
471 (PER_DATABASE_ISOLATION_KEY, "true"),
472 (ENFORCE_SECRET_KEY, "false"),
473 ("a_deprecated_param", "foo"),
474 ];
475
476 let p = PbSystemParams::default();
478 assert!(system_params_to_kv(&p).is_err());
479
480 assert!(system_params_from_kv(vec![("?", "?")]).is_ok());
482
483 let p = system_params_from_kv(kvs).unwrap();
485 assert_eq!(
486 p,
487 system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap()
488 );
489 }
490
491 #[test]
492 fn test_set() {
493 let mut p = system_params_for_test();
494 assert!(set_system_param(&mut p, "?", Some("?".to_owned())).is_err());
496 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("-1".to_owned())).is_err());
498 assert!(set_system_param(&mut p, STATE_STORE_KEY, Some("?".to_owned())).is_err());
500 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("?".to_owned())).is_err());
502 assert!(set_system_param(&mut p, CHECKPOINT_FREQUENCY_KEY, Some("500".to_owned())).is_ok());
504 assert_eq!(p.checkpoint_frequency, Some(500));
505 }
506
507 #[test]
510 fn test_redacted_type() {
511 let mut p = system_params_for_test();
512
513 let new_license_key_value = "new_license_key_value";
514 assert_ne!(p.license_key(), new_license_key_value);
515
516 let (new_string_value, diff) =
517 set_system_param(&mut p, LICENSE_KEY_KEY, Some(new_license_key_value))
518 .expect("should succeed")
519 .expect("should changed");
520
521 assert_eq!(new_string_value, new_license_key_value);
524
525 let new_value = diff.license_key.unwrap();
526 assert_eq!(new_value.to_string(), "<redacted>");
528 assert_eq!(String::from(new_value.as_ref()), new_license_key_value);
530 }
531}