risingwave_meta/controller/
system_param.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use risingwave_common::system_param::common::CommonHandler;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::system_param::{
22 check_missing_params, derive_missing_fields, set_system_param,
23};
24use risingwave_common::{for_all_params, key_of};
25use risingwave_meta_model::prelude::SystemParameter;
26use risingwave_meta_model::system_parameter;
27use risingwave_pb::meta::PbSystemParams;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use sea_orm::ActiveValue::Set;
30use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
31use tokio::sync::RwLock;
32use tokio::sync::oneshot::Sender;
33use tokio::task::JoinHandle;
34
35use crate::controller::SqlMetaStore;
36use crate::manager::{LocalNotification, NotificationManagerRef};
37use crate::{MetaError, MetaResult};
38
39pub type SystemParamsControllerRef = Arc<SystemParamsController>;
40
41pub struct SystemParamsController {
42 db: DatabaseConnection,
43 notification_manager: NotificationManagerRef,
45 params: RwLock<PbSystemParams>,
47 common_handler: CommonHandler,
49}
50
51macro_rules! impl_system_params_from_db {
53 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
54 pub fn system_params_from_db(mut models: Vec<system_parameter::Model>) -> MetaResult<PbSystemParams> {
57 let mut params = PbSystemParams::default();
58 models.retain(|model| {
59 match model.name.as_str() {
60 $(
61 key_of!($field) => {
62 params.$field = Some(model.value.parse::<$type>().unwrap().into());
63 false
64 }
65 )*
66 _ => true,
67 }
68 });
69 derive_missing_fields(&mut params);
70 if !models.is_empty() {
71 let unrecognized_params = models.into_iter().map(|model| model.name).collect::<Vec<_>>();
72 tracing::warn!("unrecognized system params {:?}", unrecognized_params);
73 }
74 Ok(params)
75 }
76 };
77}
78
79macro_rules! impl_system_params_to_models {
81 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
82 #[allow(clippy::vec_init_then_push)]
83 pub fn system_params_to_model(params: &PbSystemParams) -> MetaResult<Vec<system_parameter::ActiveModel>> {
84 check_missing_params(params).map_err(|e| anyhow!(e))?;
85 let mut models = Vec::new();
86 $(
87 let value = params.$field.as_ref().unwrap().to_string();
88 models.push(system_parameter::ActiveModel {
89 name: Set(key_of!($field).to_string()),
90 value: Set(value),
91 is_mutable: Set($is_mutable),
92 description: Set(None),
93 });
94 )*
95 Ok(models)
96 }
97 };
98}
99
100macro_rules! impl_merge_params {
108 ($({ $field:ident, $($rest:tt)* },)*) => {
109 fn merge_params(mut persisted: PbSystemParams, init: PbSystemParams) -> PbSystemParams {
110 $(
111 match (persisted.$field.as_ref(), init.$field) {
112 (Some(persisted), Some(init)) => {
113 if persisted != &init {
114 tracing::warn!(
115 "The initializing value of {} ({}) differ from persisted ({}), using persisted value",
116 key_of!($field),
117 init,
118 persisted
119 );
120 }
121 },
122 (None, Some(init)) => persisted.$field = Some(init),
123 _ => {},
124 }
125 )*
126 persisted
127 }
128 };
129}
130
131for_all_params!(impl_system_params_from_db);
132for_all_params!(impl_merge_params);
133for_all_params!(impl_system_params_to_models);
134
135impl SystemParamsController {
136 pub async fn new(
137 sql_meta_store: SqlMetaStore,
138 notification_manager: NotificationManagerRef,
139 init_params: PbSystemParams,
140 ) -> MetaResult<Self> {
141 let db = sql_meta_store.conn;
142 let params = SystemParameter::find().all(&db).await?;
143 let params = merge_params(system_params_from_db(params)?, init_params);
144 tracing::info!(initial_params = ?SystemParamsReader::new(¶ms), "initialize system parameters");
145 check_missing_params(¶ms).map_err(|e| anyhow!(e))?;
146 let ctl = Self {
147 db,
148 notification_manager,
149 params: RwLock::new(params.clone()),
150 common_handler: CommonHandler::new(params.into()),
151 };
152 ctl.flush_params().await?;
154
155 Ok(ctl)
156 }
157
158 pub async fn get_pb_params(&self) -> PbSystemParams {
159 self.params.read().await.clone()
160 }
161
162 pub async fn get_params(&self) -> SystemParamsReader {
163 self.params.read().await.clone().into()
164 }
165
166 async fn flush_params(&self) -> MetaResult<()> {
167 let params = self.params.read().await;
168 let models = system_params_to_model(¶ms)?;
169 let txn = self.db.begin().await?;
170 SystemParameter::delete_many().exec(&txn).await?;
173 SystemParameter::insert_many(models).exec(&txn).await?;
174 txn.commit().await?;
175 Ok(())
176 }
177
178 pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<PbSystemParams> {
179 let mut params_guard = self.params.write().await;
180
181 let Some(param) = SystemParameter::find_by_id(name.to_owned())
182 .one(&self.db)
183 .await?
184 else {
185 return Err(MetaError::system_params(format!(
186 "unrecognized system parameter {:?}",
187 name
188 )));
189 };
190 let mut params = params_guard.clone();
191 let mut param: system_parameter::ActiveModel = param.into();
192 let Some((new_value, diff)) =
193 set_system_param(&mut params, name, value).map_err(MetaError::system_params)?
194 else {
195 return Ok(params);
197 };
198
199 param.value = Set(new_value);
200 param.update(&self.db).await?;
201 *params_guard = params.clone();
202
203 self.common_handler.handle_change(&diff);
205
206 self.notification_manager
210 .notify_local_subscribers(LocalNotification::SystemParamsChange(params.clone().into()))
211 .await;
212
213 self.notify_workers(¶ms);
215
216 Ok(params)
217 }
218
219 pub fn start_params_notifier(
221 system_params_controller: Arc<Self>,
222 ) -> (JoinHandle<()>, Sender<()>) {
223 const NOTIFY_INTERVAL: Duration = Duration::from_millis(5000);
224
225 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
226 let join_handle = tokio::spawn(async move {
227 let mut interval = tokio::time::interval(NOTIFY_INTERVAL);
228 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
229 loop {
230 tokio::select! {
231 _ = interval.tick() => {},
232 _ = &mut shutdown_rx => {
233 tracing::info!("System params notifier is stopped");
234 return;
235 }
236 }
237 system_params_controller
238 .notify_workers(&*system_params_controller.params.read().await);
239 }
240 });
241
242 (join_handle, shutdown_tx)
243 }
244
245 fn notify_workers(&self, params: &PbSystemParams) {
248 self.notification_manager
249 .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
250 self.notification_manager
251 .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
252 self.notification_manager
253 .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use risingwave_common::system_param::system_params_for_test;
260
261 use super::*;
262 use crate::manager::MetaSrvEnv;
263
264 #[tokio::test]
265 async fn test_system_params() {
266 let env = MetaSrvEnv::for_test().await;
267 let meta_store = env.meta_store();
268 let init_params = system_params_for_test();
269
270 let system_param_ctl = SystemParamsController::new(
272 meta_store.clone(),
273 env.notification_manager_ref(),
274 init_params.clone(),
275 )
276 .await
277 .unwrap();
278 let params = system_param_ctl.get_pb_params().await;
279 assert_eq!(params, system_params_for_test());
280
281 let new_params = system_param_ctl
283 .set_param("pause_on_next_bootstrap", Some("true".into()))
284 .await
285 .unwrap();
286
287 let deprecated_param = system_parameter::ActiveModel {
289 name: Set("deprecated_param".into()),
290 value: Set("foo".into()),
291 is_mutable: Set(true),
292 description: Set(None),
293 };
294 deprecated_param.insert(&system_param_ctl.db).await.unwrap();
295
296 let system_param_ctl = SystemParamsController::new(
298 meta_store,
299 env.notification_manager_ref(),
300 init_params.clone(),
301 )
302 .await
303 .unwrap();
304 assert!(
306 SystemParameter::find_by_id("deprecated_param".to_owned())
307 .one(&system_param_ctl.db)
308 .await
309 .unwrap()
310 .is_none()
311 );
312 let params = system_param_ctl.get_pb_params().await;
314 assert_eq!(params, new_params);
315 let models = SystemParameter::find()
317 .all(&system_param_ctl.db)
318 .await
319 .unwrap();
320 let db_params = system_params_from_db(models).unwrap();
321 assert_eq!(db_params, new_params);
322 }
323}