risingwave_meta/controller/
system_param.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use risingwave_common::system_param::common::CommonHandler;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::system_param::{
22    check_missing_params, derive_missing_fields, set_system_param, validate_init_system_params,
23};
24use risingwave_common::{for_all_params, key_of};
25use risingwave_meta_model::prelude::SystemParameter;
26use risingwave_meta_model::system_parameter;
27use risingwave_pb::meta::PbSystemParams;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use sea_orm::ActiveValue::Set;
30use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
31use tokio::sync::RwLock;
32use tokio::sync::oneshot::Sender;
33use tokio::task::JoinHandle;
34
35use crate::controller::SqlMetaStore;
36use crate::manager::{LocalNotification, NotificationManagerRef};
37use crate::{MetaError, MetaResult};
38
39pub type SystemParamsControllerRef = Arc<SystemParamsController>;
40
41pub struct SystemParamsController {
42    db: DatabaseConnection,
43    // Notify workers and local subscribers of parameter change.
44    notification_manager: NotificationManagerRef,
45    // Cached parameters.
46    params: RwLock<PbSystemParams>,
47    /// Common handler for system params.
48    common_handler: CommonHandler,
49}
50
51/// Derive system params from db models.
52macro_rules! impl_system_params_from_db {
53    ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
54        /// Try to deserialize deprecated fields as well.
55        /// Warn if there are unrecognized fields.
56        pub fn system_params_from_db(mut models: Vec<system_parameter::Model>) -> MetaResult<PbSystemParams> {
57            let mut params = PbSystemParams::default();
58            models.retain(|model| {
59                match model.name.as_str() {
60                    $(
61                        key_of!($field) => {
62                            params.$field = Some(model.value.parse::<$type>().unwrap().into());
63                            false
64                        }
65                    )*
66                    _ => true,
67                }
68            });
69            derive_missing_fields(&mut params);
70            if !models.is_empty() {
71                let unrecognized_params = models.into_iter().map(|model| model.name).collect::<Vec<_>>();
72                tracing::warn!("unrecognized system params {:?}", unrecognized_params);
73            }
74            Ok(params)
75        }
76    };
77}
78
79/// Derive serialization to db models.
80macro_rules! impl_system_params_to_models {
81    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
82        #[allow(clippy::vec_init_then_push)]
83        pub fn system_params_to_model(params: &PbSystemParams) -> MetaResult<Vec<system_parameter::ActiveModel>> {
84            check_missing_params(params).map_err(|e| anyhow!(e))?;
85            let mut models = Vec::new();
86            $(
87                let value = params.$field.as_ref().unwrap().to_string();
88                models.push(system_parameter::ActiveModel {
89                    name: Set(key_of!($field).to_string()),
90                    value: Set(value),
91                    is_mutable: Set($is_mutable),
92                    description: Set(None),
93                });
94            )*
95            Ok(models)
96       }
97    };
98}
99
100// For each field in `persisted` and `init`
101// 1. Some, None: The persisted field is deprecated, so just ignore it.
102// 2. Some, Some: Check equality and warn if they differ.
103// 3. None, Some: A new version of RW cluster is launched for the first time and newly introduced
104// params are not set. Use init value.
105// 4. None, None: A new version of RW cluster is launched for the first time and newly introduced
106// params are not set. The new field is not initialized either, just leave it as `None`.
107macro_rules! impl_merge_params {
108    ($({ $field:ident, $($rest:tt)* },)*) => {
109        fn merge_params(mut persisted: PbSystemParams, init: PbSystemParams) -> PbSystemParams {
110            $(
111                match (persisted.$field.as_ref(), init.$field) {
112                    (Some(persisted), Some(init)) => {
113                        if persisted != &init {
114                            tracing::warn!(
115                                "The initializing value of {} ({}) differ from persisted ({}), using persisted value",
116                                key_of!($field),
117                                init,
118                                persisted
119                            );
120                        }
121                    },
122                    (None, Some(init)) => persisted.$field = Some(init),
123                    _ => {},
124                }
125            )*
126            persisted
127        }
128    };
129}
130
131for_all_params!(impl_system_params_from_db);
132for_all_params!(impl_merge_params);
133for_all_params!(impl_system_params_to_models);
134
135impl SystemParamsController {
136    pub async fn new(
137        sql_meta_store: SqlMetaStore,
138        notification_manager: NotificationManagerRef,
139        init_params: PbSystemParams,
140    ) -> MetaResult<Self> {
141        let db = sql_meta_store.conn;
142        let params = SystemParameter::find().all(&db).await?;
143        let params = merge_params(system_params_from_db(params)?, init_params);
144        tracing::info!(initial_params = ?SystemParamsReader::new(&params), "initialize system parameters");
145        check_missing_params(&params).map_err(|e| anyhow!(e))?;
146        validate_init_system_params(&params).map_err(|e| anyhow!(e))?;
147        let ctl = Self {
148            db,
149            notification_manager,
150            params: RwLock::new(params.clone()),
151            common_handler: CommonHandler::new(params.into()),
152        };
153        // flush to db.
154        ctl.flush_params().await?;
155
156        Ok(ctl)
157    }
158
159    pub async fn get_pb_params(&self) -> PbSystemParams {
160        self.params.read().await.clone()
161    }
162
163    pub async fn get_params(&self) -> SystemParamsReader {
164        self.params.read().await.clone().into()
165    }
166
167    async fn flush_params(&self) -> MetaResult<()> {
168        let params = self.params.read().await;
169        let models = system_params_to_model(&params)?;
170        let txn = self.db.begin().await?;
171        // delete all params first and then insert all params. It follows the same logic
172        // as the old code, we'd better change it to another way later to keep consistency.
173        SystemParameter::delete_many().exec(&txn).await?;
174        SystemParameter::insert_many(models).exec(&txn).await?;
175        txn.commit().await?;
176        Ok(())
177    }
178
179    pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<PbSystemParams> {
180        let mut params_guard = self.params.write().await;
181
182        let Some(param) = SystemParameter::find_by_id(name.to_owned())
183            .one(&self.db)
184            .await?
185        else {
186            return Err(MetaError::system_params(format!(
187                "unrecognized system parameter {:?}",
188                name
189            )));
190        };
191        let mut params = params_guard.clone();
192        let mut param: system_parameter::ActiveModel = param.into();
193        let Some((new_value, diff)) =
194            set_system_param(&mut params, name, value).map_err(MetaError::system_params)?
195        else {
196            // No changes on the parameter.
197            return Ok(params);
198        };
199
200        param.value = Set(new_value);
201        param.update(&self.db).await?;
202        *params_guard = params.clone();
203
204        // Run common handler.
205        self.common_handler.handle_change(&diff);
206
207        // TODO: notify the diff instead of the snapshot.
208
209        // Sync params to other managers on the meta node only once, since it's infallible.
210        self.notification_manager
211            .notify_local_subscribers(LocalNotification::SystemParamsChange(params.clone().into()))
212            .await;
213
214        // Sync params to worker nodes.
215        self.notify_workers(&params);
216
217        Ok(params)
218    }
219
220    // Periodically sync params to worker nodes.
221    pub fn start_params_notifier(
222        system_params_controller: Arc<Self>,
223    ) -> (JoinHandle<()>, Sender<()>) {
224        const NOTIFY_INTERVAL: Duration = Duration::from_millis(5000);
225
226        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
227        let join_handle = tokio::spawn(async move {
228            let mut interval = tokio::time::interval(NOTIFY_INTERVAL);
229            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
230            loop {
231                tokio::select! {
232                    _ = interval.tick() => {},
233                    _ = &mut shutdown_rx => {
234                        tracing::info!("System params notifier is stopped");
235                        return;
236                    }
237                }
238                system_params_controller
239                    .notify_workers(&*system_params_controller.params.read().await);
240            }
241        });
242
243        (join_handle, shutdown_tx)
244    }
245
246    // Notify workers of parameter change.
247    // TODO: add system params into snapshot to avoid periodically sync.
248    fn notify_workers(&self, params: &PbSystemParams) {
249        self.notification_manager
250            .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
251        self.notification_manager
252            .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
253        self.notification_manager
254            .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use risingwave_common::system_param::system_params_for_test;
261
262    use super::*;
263    use crate::manager::MetaSrvEnv;
264
265    #[tokio::test]
266    async fn test_system_params() {
267        let env = MetaSrvEnv::for_test().await;
268        let meta_store = env.meta_store();
269        let init_params = system_params_for_test();
270
271        // init system parameter controller as first launch.
272        let system_param_ctl = SystemParamsController::new(
273            meta_store.clone(),
274            env.notification_manager_ref(),
275            init_params.clone(),
276        )
277        .await
278        .unwrap();
279        let params = system_param_ctl.get_pb_params().await;
280        assert_eq!(params, system_params_for_test());
281
282        // set parameter.
283        let new_params = system_param_ctl
284            .set_param("pause_on_next_bootstrap", Some("true".into()))
285            .await
286            .unwrap();
287
288        // insert deprecated params.
289        let deprecated_param = system_parameter::ActiveModel {
290            name: Set("deprecated_param".into()),
291            value: Set("foo".into()),
292            is_mutable: Set(true),
293            description: Set(None),
294        };
295        deprecated_param.insert(&system_param_ctl.db).await.unwrap();
296
297        // init system parameter controller as not first launch.
298        let system_param_ctl = SystemParamsController::new(
299            meta_store,
300            env.notification_manager_ref(),
301            init_params.clone(),
302        )
303        .await
304        .unwrap();
305        // check deprecated params are cleaned up.
306        assert!(
307            SystemParameter::find_by_id("deprecated_param".to_owned())
308                .one(&system_param_ctl.db)
309                .await
310                .unwrap()
311                .is_none()
312        );
313        // check new params are set.
314        let params = system_param_ctl.get_pb_params().await;
315        assert_eq!(params, new_params);
316        // check db consistency.
317        let models = SystemParameter::find()
318            .all(&system_param_ctl.db)
319            .await
320            .unwrap();
321        let db_params = system_params_from_db(models).unwrap();
322        assert_eq!(db_params, new_params);
323    }
324}