risingwave_meta/controller/
system_param.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use risingwave_common::system_param::common::CommonHandler;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::system_param::{
22 check_missing_params, derive_missing_fields, set_system_param, validate_init_system_params,
23};
24use risingwave_common::{for_all_params, key_of};
25use risingwave_meta_model::prelude::SystemParameter;
26use risingwave_meta_model::system_parameter;
27use risingwave_pb::meta::PbSystemParams;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use sea_orm::ActiveValue::Set;
30use sea_orm::{DatabaseConnection, EntityTrait, TransactionTrait};
31use tokio::sync::RwLock;
32use tokio::sync::oneshot::Sender;
33use tokio::task::JoinHandle;
34
35use crate::controller::SqlMetaStore;
36use crate::manager::{LocalNotification, NotificationManagerRef};
37use crate::{MetaError, MetaResult};
38
39pub type SystemParamsControllerRef = Arc<SystemParamsController>;
40
41pub struct SystemParamsController {
42 db: DatabaseConnection,
43 notification_manager: NotificationManagerRef,
45 params: RwLock<PbSystemParams>,
47 common_handler: CommonHandler,
49}
50
51macro_rules! impl_system_params_from_db {
53 ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
54 #[expect(deprecated)]
57 pub fn system_params_from_db(mut models: Vec<system_parameter::Model>) -> MetaResult<PbSystemParams> {
58 let mut params = PbSystemParams::default();
59 models.retain(|model| {
60 match model.name.as_str() {
61 $(
62 key_of!($field) => {
63 params.$field = Some(model.value.parse::<$type>().unwrap().into());
64 false
65 }
66 )*
67 _ => true,
68 }
69 });
70 derive_missing_fields(&mut params);
71 if !models.is_empty() {
72 let unrecognized_params = models.into_iter().map(|model| model.name).collect::<Vec<_>>();
73 tracing::warn!("unrecognized system params {:?}", unrecognized_params);
74 }
75 Ok(params)
76 }
77 };
78}
79
80macro_rules! impl_system_params_to_models {
82 ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
83 #[expect(deprecated)]
84 #[allow(clippy::vec_init_then_push)]
85 pub fn system_params_to_model(params: &PbSystemParams) -> MetaResult<Vec<system_parameter::ActiveModel>> {
86 check_missing_params(params).map_err(|e| anyhow!(e))?;
87 let mut models = Vec::new();
88 $(
89 let value = params.$field.as_ref().unwrap().to_string();
90 models.push(system_parameter::ActiveModel {
91 name: Set(key_of!($field).to_string()),
92 value: Set(value),
93 is_mutable: Set($is_mutable),
94 description: Set(None),
95 });
96 )*
97 Ok(models)
98 }
99 };
100}
101
102macro_rules! impl_merge_params {
110 ($({ $field:ident, $($rest:tt)* },)*) => {
111 #[expect(deprecated)]
112 fn merge_params(mut persisted: PbSystemParams, init: PbSystemParams) -> PbSystemParams {
113 $(
114 match (persisted.$field.as_ref(), init.$field) {
115 (Some(persisted), Some(init)) => {
116 if persisted != &init {
117 tracing::warn!(
118 "The initializing value of {} ({}) differ from persisted ({}), using persisted value",
119 key_of!($field),
120 init,
121 persisted
122 );
123 }
124 },
125 (None, Some(init)) => persisted.$field = Some(init),
126 _ => {},
127 }
128 )*
129 persisted
130 }
131 };
132}
133
134for_all_params!(impl_system_params_from_db);
135for_all_params!(impl_merge_params);
136for_all_params!(impl_system_params_to_models);
137
138impl SystemParamsController {
139 pub async fn new(
140 sql_meta_store: SqlMetaStore,
141 notification_manager: NotificationManagerRef,
142 init_params: PbSystemParams,
143 ) -> MetaResult<Self> {
144 let db = sql_meta_store.conn;
145 let params = SystemParameter::find().all(&db).await?;
146 let params = merge_params(system_params_from_db(params)?, init_params);
147 tracing::info!(initial_params = ?SystemParamsReader::new(¶ms), "initialize system parameters");
148 check_missing_params(¶ms).map_err(|e| anyhow!(e))?;
149 validate_init_system_params(¶ms).map_err(|e| anyhow!(e))?;
150 let ctl = Self {
151 db,
152 notification_manager,
153 params: RwLock::new(params.clone()),
154 common_handler: CommonHandler::new(params.into()),
155 };
156 ctl.flush_params().await?;
158
159 Ok(ctl)
160 }
161
162 pub async fn get_pb_params(&self) -> PbSystemParams {
163 self.params.read().await.clone()
164 }
165
166 pub async fn get_params(&self) -> SystemParamsReader {
167 self.params.read().await.clone().into()
168 }
169
170 async fn flush_params(&self) -> MetaResult<()> {
171 let params = self.params.read().await;
172 let models = system_params_to_model(¶ms)?;
173 let txn = self.db.begin().await?;
174 SystemParameter::delete_many().exec(&txn).await?;
177 SystemParameter::insert_many(models).exec(&txn).await?;
178 txn.commit().await?;
179 Ok(())
180 }
181
182 pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<PbSystemParams> {
183 let mut params_guard = self.params.write().await;
184
185 let Some(param) = SystemParameter::find_by_id(name.to_owned())
186 .one(&self.db)
187 .await?
188 else {
189 return Err(MetaError::system_params(format!(
190 "unrecognized system parameter {:?}",
191 name
192 )));
193 };
194 let mut params = params_guard.clone();
195 let mut param: system_parameter::ActiveModel = param.into();
196 let Some((new_value, diff)) =
197 set_system_param(&mut params, name, value).map_err(MetaError::system_params)?
198 else {
199 return Ok(params);
201 };
202
203 param.value = Set(new_value);
204 SystemParameter::update(param).exec(&self.db).await?;
205 *params_guard = params.clone();
206
207 self.common_handler.handle_change(&diff);
209
210 self.notification_manager
214 .notify_local_subscribers(LocalNotification::SystemParamsChange(params.clone().into()));
215
216 self.notify_workers(¶ms);
218
219 Ok(params)
220 }
221
222 pub fn start_params_notifier(
224 system_params_controller: Arc<Self>,
225 ) -> (JoinHandle<()>, Sender<()>) {
226 const NOTIFY_INTERVAL: Duration = Duration::from_millis(5000);
227
228 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
229 let join_handle = tokio::spawn(async move {
230 let mut interval = tokio::time::interval(NOTIFY_INTERVAL);
231 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
232 loop {
233 tokio::select! {
234 _ = interval.tick() => {},
235 _ = &mut shutdown_rx => {
236 tracing::info!("System params notifier is stopped");
237 return;
238 }
239 }
240 system_params_controller
241 .notify_workers(&*system_params_controller.params.read().await);
242 }
243 });
244
245 (join_handle, shutdown_tx)
246 }
247
248 fn notify_workers(&self, params: &PbSystemParams) {
251 self.notification_manager
252 .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
253 self.notification_manager
254 .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
255 self.notification_manager
256 .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use risingwave_common::system_param::system_params_for_test;
263
264 use super::*;
265 use crate::manager::MetaSrvEnv;
266
267 #[tokio::test]
268 async fn test_system_params() {
269 let env = MetaSrvEnv::for_test().await;
270 let meta_store = env.meta_store();
271 let init_params = system_params_for_test();
272
273 let system_param_ctl = SystemParamsController::new(
275 meta_store.clone(),
276 env.notification_manager_ref(),
277 init_params.clone(),
278 )
279 .await
280 .unwrap();
281 let params = system_param_ctl.get_pb_params().await;
282 assert_eq!(params, system_params_for_test());
283
284 let new_params = system_param_ctl
286 .set_param("pause_on_next_bootstrap", Some("true".into()))
287 .await
288 .unwrap();
289
290 let deprecated_param = system_parameter::ActiveModel {
292 name: Set("deprecated_param".into()),
293 value: Set("foo".into()),
294 is_mutable: Set(true),
295 description: Set(None),
296 };
297 SystemParameter::insert(deprecated_param)
298 .exec(&system_param_ctl.db)
299 .await
300 .unwrap();
301
302 let system_param_ctl = SystemParamsController::new(
304 meta_store,
305 env.notification_manager_ref(),
306 init_params.clone(),
307 )
308 .await
309 .unwrap();
310 assert!(
312 SystemParameter::find_by_id("deprecated_param".to_owned())
313 .one(&system_param_ctl.db)
314 .await
315 .unwrap()
316 .is_none()
317 );
318 let params = system_param_ctl.get_pb_params().await;
320 assert_eq!(params, new_params);
321 let models = SystemParameter::find()
323 .all(&system_param_ctl.db)
324 .await
325 .unwrap();
326 let db_params = system_params_from_db(models).unwrap();
327 assert_eq!(db_params, new_params);
328 }
329}