risingwave_meta/controller/
system_param.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use risingwave_common::system_param::common::CommonHandler;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::system_param::{
22 check_missing_params, derive_missing_fields, set_system_param, validate_init_system_params,
23};
24use risingwave_common::{for_all_params, key_of};
25use risingwave_meta_model::prelude::SystemParameter;
26use risingwave_meta_model::system_parameter;
27use risingwave_pb::meta::PbSystemParams;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use sea_orm::ActiveValue::Set;
30use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
31use tokio::sync::RwLock;
32use tokio::sync::oneshot::Sender;
33use tokio::task::JoinHandle;
34
35use crate::controller::SqlMetaStore;
36use crate::manager::{LocalNotification, NotificationManagerRef};
37use crate::{MetaError, MetaResult};
38
39pub type SystemParamsControllerRef = Arc<SystemParamsController>;
40
41pub struct SystemParamsController {
42 db: DatabaseConnection,
43 notification_manager: NotificationManagerRef,
45 params: RwLock<PbSystemParams>,
47 common_handler: CommonHandler,
49}
50
51macro_rules! impl_system_params_from_db {
53 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
54 pub fn system_params_from_db(mut models: Vec<system_parameter::Model>) -> MetaResult<PbSystemParams> {
57 let mut params = PbSystemParams::default();
58 models.retain(|model| {
59 match model.name.as_str() {
60 $(
61 key_of!($field) => {
62 params.$field = Some(model.value.parse::<$type>().unwrap().into());
63 false
64 }
65 )*
66 _ => true,
67 }
68 });
69 derive_missing_fields(&mut params);
70 if !models.is_empty() {
71 let unrecognized_params = models.into_iter().map(|model| model.name).collect::<Vec<_>>();
72 tracing::warn!("unrecognized system params {:?}", unrecognized_params);
73 }
74 Ok(params)
75 }
76 };
77}
78
79macro_rules! impl_system_params_to_models {
81 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
82 #[allow(clippy::vec_init_then_push)]
83 pub fn system_params_to_model(params: &PbSystemParams) -> MetaResult<Vec<system_parameter::ActiveModel>> {
84 check_missing_params(params).map_err(|e| anyhow!(e))?;
85 let mut models = Vec::new();
86 $(
87 let value = params.$field.as_ref().unwrap().to_string();
88 models.push(system_parameter::ActiveModel {
89 name: Set(key_of!($field).to_string()),
90 value: Set(value),
91 is_mutable: Set($is_mutable),
92 description: Set(None),
93 });
94 )*
95 Ok(models)
96 }
97 };
98}
99
100macro_rules! impl_merge_params {
108 ($({ $field:ident, $($rest:tt)* },)*) => {
109 fn merge_params(mut persisted: PbSystemParams, init: PbSystemParams) -> PbSystemParams {
110 $(
111 match (persisted.$field.as_ref(), init.$field) {
112 (Some(persisted), Some(init)) => {
113 if persisted != &init {
114 tracing::warn!(
115 "The initializing value of {} ({}) differ from persisted ({}), using persisted value",
116 key_of!($field),
117 init,
118 persisted
119 );
120 }
121 },
122 (None, Some(init)) => persisted.$field = Some(init),
123 _ => {},
124 }
125 )*
126 persisted
127 }
128 };
129}
130
131for_all_params!(impl_system_params_from_db);
132for_all_params!(impl_merge_params);
133for_all_params!(impl_system_params_to_models);
134
135impl SystemParamsController {
136 pub async fn new(
137 sql_meta_store: SqlMetaStore,
138 notification_manager: NotificationManagerRef,
139 init_params: PbSystemParams,
140 ) -> MetaResult<Self> {
141 let db = sql_meta_store.conn;
142 let params = SystemParameter::find().all(&db).await?;
143 let params = merge_params(system_params_from_db(params)?, init_params);
144 tracing::info!(initial_params = ?SystemParamsReader::new(¶ms), "initialize system parameters");
145 check_missing_params(¶ms).map_err(|e| anyhow!(e))?;
146 validate_init_system_params(¶ms).map_err(|e| anyhow!(e))?;
147 let ctl = Self {
148 db,
149 notification_manager,
150 params: RwLock::new(params.clone()),
151 common_handler: CommonHandler::new(params.into()),
152 };
153 ctl.flush_params().await?;
155
156 Ok(ctl)
157 }
158
159 pub async fn get_pb_params(&self) -> PbSystemParams {
160 self.params.read().await.clone()
161 }
162
163 pub async fn get_params(&self) -> SystemParamsReader {
164 self.params.read().await.clone().into()
165 }
166
167 async fn flush_params(&self) -> MetaResult<()> {
168 let params = self.params.read().await;
169 let models = system_params_to_model(¶ms)?;
170 let txn = self.db.begin().await?;
171 SystemParameter::delete_many().exec(&txn).await?;
174 SystemParameter::insert_many(models).exec(&txn).await?;
175 txn.commit().await?;
176 Ok(())
177 }
178
179 pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<PbSystemParams> {
180 let mut params_guard = self.params.write().await;
181
182 let Some(param) = SystemParameter::find_by_id(name.to_owned())
183 .one(&self.db)
184 .await?
185 else {
186 return Err(MetaError::system_params(format!(
187 "unrecognized system parameter {:?}",
188 name
189 )));
190 };
191 let mut params = params_guard.clone();
192 let mut param: system_parameter::ActiveModel = param.into();
193 let Some((new_value, diff)) =
194 set_system_param(&mut params, name, value).map_err(MetaError::system_params)?
195 else {
196 return Ok(params);
198 };
199
200 param.value = Set(new_value);
201 param.update(&self.db).await?;
202 *params_guard = params.clone();
203
204 self.common_handler.handle_change(&diff);
206
207 self.notification_manager
211 .notify_local_subscribers(LocalNotification::SystemParamsChange(params.clone().into()))
212 .await;
213
214 self.notify_workers(¶ms);
216
217 Ok(params)
218 }
219
220 pub fn start_params_notifier(
222 system_params_controller: Arc<Self>,
223 ) -> (JoinHandle<()>, Sender<()>) {
224 const NOTIFY_INTERVAL: Duration = Duration::from_millis(5000);
225
226 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
227 let join_handle = tokio::spawn(async move {
228 let mut interval = tokio::time::interval(NOTIFY_INTERVAL);
229 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
230 loop {
231 tokio::select! {
232 _ = interval.tick() => {},
233 _ = &mut shutdown_rx => {
234 tracing::info!("System params notifier is stopped");
235 return;
236 }
237 }
238 system_params_controller
239 .notify_workers(&*system_params_controller.params.read().await);
240 }
241 });
242
243 (join_handle, shutdown_tx)
244 }
245
246 fn notify_workers(&self, params: &PbSystemParams) {
249 self.notification_manager
250 .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
251 self.notification_manager
252 .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
253 self.notification_manager
254 .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use risingwave_common::system_param::system_params_for_test;
261
262 use super::*;
263 use crate::manager::MetaSrvEnv;
264
265 #[tokio::test]
266 async fn test_system_params() {
267 let env = MetaSrvEnv::for_test().await;
268 let meta_store = env.meta_store();
269 let init_params = system_params_for_test();
270
271 let system_param_ctl = SystemParamsController::new(
273 meta_store.clone(),
274 env.notification_manager_ref(),
275 init_params.clone(),
276 )
277 .await
278 .unwrap();
279 let params = system_param_ctl.get_pb_params().await;
280 assert_eq!(params, system_params_for_test());
281
282 let new_params = system_param_ctl
284 .set_param("pause_on_next_bootstrap", Some("true".into()))
285 .await
286 .unwrap();
287
288 let deprecated_param = system_parameter::ActiveModel {
290 name: Set("deprecated_param".into()),
291 value: Set("foo".into()),
292 is_mutable: Set(true),
293 description: Set(None),
294 };
295 deprecated_param.insert(&system_param_ctl.db).await.unwrap();
296
297 let system_param_ctl = SystemParamsController::new(
299 meta_store,
300 env.notification_manager_ref(),
301 init_params.clone(),
302 )
303 .await
304 .unwrap();
305 assert!(
307 SystemParameter::find_by_id("deprecated_param".to_owned())
308 .one(&system_param_ctl.db)
309 .await
310 .unwrap()
311 .is_none()
312 );
313 let params = system_param_ctl.get_pb_params().await;
315 assert_eq!(params, new_params);
316 let models = SystemParameter::find()
318 .all(&system_param_ctl.db)
319 .await
320 .unwrap();
321 let db_params = system_params_from_db(models).unwrap();
322 assert_eq!(db_params, new_params);
323 }
324}