risingwave_meta/controller/
system_param.rsuse std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use risingwave_common::system_param::common::CommonHandler;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::system_param::{
check_missing_params, default, derive_missing_fields, set_system_param,
};
use risingwave_common::{for_all_params, key_of};
use risingwave_meta_model::prelude::SystemParameter;
use risingwave_meta_model::system_parameter;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PbSystemParams;
use sea_orm::ActiveValue::Set;
use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
use tokio::sync::oneshot::Sender;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::controller::SqlMetaStore;
use crate::manager::{LocalNotification, NotificationManagerRef};
use crate::{MetaError, MetaResult};
pub type SystemParamsControllerRef = Arc<SystemParamsController>;
pub struct SystemParamsController {
db: DatabaseConnection,
notification_manager: NotificationManagerRef,
params: RwLock<PbSystemParams>,
common_handler: CommonHandler,
}
macro_rules! impl_system_params_from_db {
($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
pub fn system_params_from_db(mut models: Vec<system_parameter::Model>) -> MetaResult<PbSystemParams> {
let mut params = PbSystemParams::default();
models.retain(|model| {
match model.name.as_str() {
$(
key_of!($field) => {
params.$field = Some(model.value.parse::<$type>().unwrap().into());
false
}
)*
_ => true,
}
});
derive_missing_fields(&mut params);
if !models.is_empty() {
let unrecognized_params = models.into_iter().map(|model| model.name).collect::<Vec<_>>();
tracing::warn!("unrecognized system params {:?}", unrecognized_params);
}
Ok(params)
}
};
}
macro_rules! impl_system_params_to_models {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
#[allow(clippy::vec_init_then_push)]
pub fn system_params_to_model(params: &PbSystemParams) -> MetaResult<Vec<system_parameter::ActiveModel>> {
check_missing_params(params).map_err(|e| anyhow!(e))?;
let mut models = Vec::new();
$(
let value = params.$field.as_ref().unwrap().to_string();
models.push(system_parameter::ActiveModel {
name: Set(key_of!($field).to_string()),
value: Set(value),
is_mutable: Set($is_mutable),
description: Set(None),
});
)*
Ok(models)
}
};
}
macro_rules! impl_merge_params {
($({ $field:ident, $($rest:tt)* },)*) => {
fn merge_params(mut persisted: PbSystemParams, init: PbSystemParams) -> PbSystemParams {
$(
match (persisted.$field.as_ref(), init.$field) {
(Some(persisted), Some(init)) => {
if persisted != &init {
tracing::warn!(
"The initializing value of {} ({}) differ from persisted ({}), using persisted value",
key_of!($field),
init,
persisted
);
}
},
(None, Some(init)) => persisted.$field = Some(init),
_ => {},
}
)*
persisted
}
};
}
for_all_params!(impl_system_params_from_db);
for_all_params!(impl_merge_params);
for_all_params!(impl_system_params_to_models);
fn apply_hard_code_override(params: &mut PbSystemParams) {
if params
.time_travel_retention_ms
.map(|v| v == 0)
.unwrap_or(true)
{
let default_v = default::time_travel_retention_ms();
tracing::info!("time_travel_retention_ms has been overridden to {default_v}");
params.time_travel_retention_ms = Some(default_v);
}
}
impl SystemParamsController {
pub async fn new(
sql_meta_store: SqlMetaStore,
notification_manager: NotificationManagerRef,
init_params: PbSystemParams,
) -> MetaResult<Self> {
let db = sql_meta_store.conn;
let params = SystemParameter::find().all(&db).await?;
let mut params = merge_params(system_params_from_db(params)?, init_params);
apply_hard_code_override(&mut params);
tracing::info!(initial_params = ?SystemParamsReader::new(¶ms), "initialize system parameters");
check_missing_params(¶ms).map_err(|e| anyhow!(e))?;
let ctl = Self {
db,
notification_manager,
params: RwLock::new(params.clone()),
common_handler: CommonHandler::new(params.into()),
};
ctl.flush_params().await?;
Ok(ctl)
}
pub async fn get_pb_params(&self) -> PbSystemParams {
self.params.read().await.clone()
}
pub async fn get_params(&self) -> SystemParamsReader {
self.params.read().await.clone().into()
}
async fn flush_params(&self) -> MetaResult<()> {
let params = self.params.read().await;
let models = system_params_to_model(¶ms)?;
let txn = self.db.begin().await?;
SystemParameter::delete_many().exec(&txn).await?;
SystemParameter::insert_many(models).exec(&txn).await?;
txn.commit().await?;
Ok(())
}
pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<PbSystemParams> {
let mut params_guard = self.params.write().await;
let Some(param) = SystemParameter::find_by_id(name.to_string())
.one(&self.db)
.await?
else {
return Err(MetaError::system_params(format!(
"unrecognized system parameter {:?}",
name
)));
};
let mut params = params_guard.clone();
let mut param: system_parameter::ActiveModel = param.into();
let Some((new_value, diff)) =
set_system_param(&mut params, name, value).map_err(MetaError::system_params)?
else {
return Ok(params);
};
param.value = Set(new_value);
param.update(&self.db).await?;
*params_guard = params.clone();
self.common_handler.handle_change(&diff);
self.notification_manager
.notify_local_subscribers(LocalNotification::SystemParamsChange(params.clone().into()))
.await;
self.notify_workers(¶ms);
Ok(params)
}
pub fn start_params_notifier(
system_params_controller: Arc<Self>,
) -> (JoinHandle<()>, Sender<()>) {
const NOTIFY_INTERVAL: Duration = Duration::from_millis(5000);
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(NOTIFY_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = interval.tick() => {},
_ = &mut shutdown_rx => {
tracing::info!("System params notifier is stopped");
return;
}
}
system_params_controller
.notify_workers(&*system_params_controller.params.read().await);
}
});
(join_handle, shutdown_tx)
}
fn notify_workers(&self, params: &PbSystemParams) {
self.notification_manager
.notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
self.notification_manager
.notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
self.notification_manager
.notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
}
}
#[cfg(test)]
mod tests {
use risingwave_common::system_param::system_params_for_test;
use super::*;
use crate::manager::MetaSrvEnv;
#[tokio::test]
async fn test_system_params() {
let env = MetaSrvEnv::for_test().await;
let meta_store = env.meta_store();
let init_params = system_params_for_test();
let system_param_ctl = SystemParamsController::new(
meta_store.clone(),
env.notification_manager_ref(),
init_params.clone(),
)
.await
.unwrap();
let params = system_param_ctl.get_pb_params().await;
assert_eq!(params, system_params_for_test());
let new_params = system_param_ctl
.set_param("pause_on_next_bootstrap", Some("true".into()))
.await
.unwrap();
let deprecated_param = system_parameter::ActiveModel {
name: Set("deprecated_param".into()),
value: Set("foo".into()),
is_mutable: Set(true),
description: Set(None),
};
deprecated_param.insert(&system_param_ctl.db).await.unwrap();
let system_param_ctl = SystemParamsController::new(
meta_store,
env.notification_manager_ref(),
init_params.clone(),
)
.await
.unwrap();
assert!(SystemParameter::find_by_id("deprecated_param".to_string())
.one(&system_param_ctl.db)
.await
.unwrap()
.is_none());
let params = system_param_ctl.get_pb_params().await;
assert_eq!(params, new_params);
let models = SystemParameter::find()
.all(&system_param_ctl.db)
.await
.unwrap();
let db_params = system_params_from_db(models).unwrap();
assert_eq!(db_params, new_params);
}
}