risingwave_meta/controller/
system_param.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use risingwave_common::system_param::common::CommonHandler;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::system_param::{
22    check_missing_params, derive_missing_fields, set_system_param,
23};
24use risingwave_common::{for_all_params, key_of};
25use risingwave_meta_model::prelude::SystemParameter;
26use risingwave_meta_model::system_parameter;
27use risingwave_pb::meta::PbSystemParams;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use sea_orm::ActiveValue::Set;
30use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
31use tokio::sync::RwLock;
32use tokio::sync::oneshot::Sender;
33use tokio::task::JoinHandle;
34
35use crate::controller::SqlMetaStore;
36use crate::manager::{LocalNotification, NotificationManagerRef};
37use crate::{MetaError, MetaResult};
38
39pub type SystemParamsControllerRef = Arc<SystemParamsController>;
40
41pub struct SystemParamsController {
42    db: DatabaseConnection,
43    // Notify workers and local subscribers of parameter change.
44    notification_manager: NotificationManagerRef,
45    // Cached parameters.
46    params: RwLock<PbSystemParams>,
47    /// Common handler for system params.
48    common_handler: CommonHandler,
49}
50
51/// Derive system params from db models.
52macro_rules! impl_system_params_from_db {
53    ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
54        /// Try to deserialize deprecated fields as well.
55        /// Warn if there are unrecognized fields.
56        pub fn system_params_from_db(mut models: Vec<system_parameter::Model>) -> MetaResult<PbSystemParams> {
57            let mut params = PbSystemParams::default();
58            models.retain(|model| {
59                match model.name.as_str() {
60                    $(
61                        key_of!($field) => {
62                            params.$field = Some(model.value.parse::<$type>().unwrap().into());
63                            false
64                        }
65                    )*
66                    _ => true,
67                }
68            });
69            derive_missing_fields(&mut params);
70            if !models.is_empty() {
71                let unrecognized_params = models.into_iter().map(|model| model.name).collect::<Vec<_>>();
72                tracing::warn!("unrecognized system params {:?}", unrecognized_params);
73            }
74            Ok(params)
75        }
76    };
77}
78
79/// Derive serialization to db models.
80macro_rules! impl_system_params_to_models {
81    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
82        #[allow(clippy::vec_init_then_push)]
83        pub fn system_params_to_model(params: &PbSystemParams) -> MetaResult<Vec<system_parameter::ActiveModel>> {
84            check_missing_params(params).map_err(|e| anyhow!(e))?;
85            let mut models = Vec::new();
86            $(
87                let value = params.$field.as_ref().unwrap().to_string();
88                models.push(system_parameter::ActiveModel {
89                    name: Set(key_of!($field).to_string()),
90                    value: Set(value),
91                    is_mutable: Set($is_mutable),
92                    description: Set(None),
93                });
94            )*
95            Ok(models)
96       }
97    };
98}
99
100// For each field in `persisted` and `init`
101// 1. Some, None: The persisted field is deprecated, so just ignore it.
102// 2. Some, Some: Check equality and warn if they differ.
103// 3. None, Some: A new version of RW cluster is launched for the first time and newly introduced
104// params are not set. Use init value.
105// 4. None, None: A new version of RW cluster is launched for the first time and newly introduced
106// params are not set. The new field is not initialized either, just leave it as `None`.
107macro_rules! impl_merge_params {
108    ($({ $field:ident, $($rest:tt)* },)*) => {
109        fn merge_params(mut persisted: PbSystemParams, init: PbSystemParams) -> PbSystemParams {
110            $(
111                match (persisted.$field.as_ref(), init.$field) {
112                    (Some(persisted), Some(init)) => {
113                        if persisted != &init {
114                            tracing::warn!(
115                                "The initializing value of {} ({}) differ from persisted ({}), using persisted value",
116                                key_of!($field),
117                                init,
118                                persisted
119                            );
120                        }
121                    },
122                    (None, Some(init)) => persisted.$field = Some(init),
123                    _ => {},
124                }
125            )*
126            persisted
127        }
128    };
129}
130
131for_all_params!(impl_system_params_from_db);
132for_all_params!(impl_merge_params);
133for_all_params!(impl_system_params_to_models);
134
135impl SystemParamsController {
136    pub async fn new(
137        sql_meta_store: SqlMetaStore,
138        notification_manager: NotificationManagerRef,
139        init_params: PbSystemParams,
140    ) -> MetaResult<Self> {
141        let db = sql_meta_store.conn;
142        let params = SystemParameter::find().all(&db).await?;
143        let params = merge_params(system_params_from_db(params)?, init_params);
144        tracing::info!(initial_params = ?SystemParamsReader::new(&params), "initialize system parameters");
145        check_missing_params(&params).map_err(|e| anyhow!(e))?;
146        let ctl = Self {
147            db,
148            notification_manager,
149            params: RwLock::new(params.clone()),
150            common_handler: CommonHandler::new(params.into()),
151        };
152        // flush to db.
153        ctl.flush_params().await?;
154
155        Ok(ctl)
156    }
157
158    pub async fn get_pb_params(&self) -> PbSystemParams {
159        self.params.read().await.clone()
160    }
161
162    pub async fn get_params(&self) -> SystemParamsReader {
163        self.params.read().await.clone().into()
164    }
165
166    async fn flush_params(&self) -> MetaResult<()> {
167        let params = self.params.read().await;
168        let models = system_params_to_model(&params)?;
169        let txn = self.db.begin().await?;
170        // delete all params first and then insert all params. It follows the same logic
171        // as the old code, we'd better change it to another way later to keep consistency.
172        SystemParameter::delete_many().exec(&txn).await?;
173        SystemParameter::insert_many(models).exec(&txn).await?;
174        txn.commit().await?;
175        Ok(())
176    }
177
178    pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<PbSystemParams> {
179        let mut params_guard = self.params.write().await;
180
181        let Some(param) = SystemParameter::find_by_id(name.to_owned())
182            .one(&self.db)
183            .await?
184        else {
185            return Err(MetaError::system_params(format!(
186                "unrecognized system parameter {:?}",
187                name
188            )));
189        };
190        let mut params = params_guard.clone();
191        let mut param: system_parameter::ActiveModel = param.into();
192        let Some((new_value, diff)) =
193            set_system_param(&mut params, name, value).map_err(MetaError::system_params)?
194        else {
195            // No changes on the parameter.
196            return Ok(params);
197        };
198
199        param.value = Set(new_value);
200        param.update(&self.db).await?;
201        *params_guard = params.clone();
202
203        // Run common handler.
204        self.common_handler.handle_change(&diff);
205
206        // TODO: notify the diff instead of the snapshot.
207
208        // Sync params to other managers on the meta node only once, since it's infallible.
209        self.notification_manager
210            .notify_local_subscribers(LocalNotification::SystemParamsChange(params.clone().into()))
211            .await;
212
213        // Sync params to worker nodes.
214        self.notify_workers(&params);
215
216        Ok(params)
217    }
218
219    // Periodically sync params to worker nodes.
220    pub fn start_params_notifier(
221        system_params_controller: Arc<Self>,
222    ) -> (JoinHandle<()>, Sender<()>) {
223        const NOTIFY_INTERVAL: Duration = Duration::from_millis(5000);
224
225        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
226        let join_handle = tokio::spawn(async move {
227            let mut interval = tokio::time::interval(NOTIFY_INTERVAL);
228            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
229            loop {
230                tokio::select! {
231                    _ = interval.tick() => {},
232                    _ = &mut shutdown_rx => {
233                        tracing::info!("System params notifier is stopped");
234                        return;
235                    }
236                }
237                system_params_controller
238                    .notify_workers(&*system_params_controller.params.read().await);
239            }
240        });
241
242        (join_handle, shutdown_tx)
243    }
244
245    // Notify workers of parameter change.
246    // TODO: add system params into snapshot to avoid periodically sync.
247    fn notify_workers(&self, params: &PbSystemParams) {
248        self.notification_manager
249            .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
250        self.notification_manager
251            .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
252        self.notification_manager
253            .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use risingwave_common::system_param::system_params_for_test;
260
261    use super::*;
262    use crate::manager::MetaSrvEnv;
263
264    #[tokio::test]
265    async fn test_system_params() {
266        let env = MetaSrvEnv::for_test().await;
267        let meta_store = env.meta_store();
268        let init_params = system_params_for_test();
269
270        // init system parameter controller as first launch.
271        let system_param_ctl = SystemParamsController::new(
272            meta_store.clone(),
273            env.notification_manager_ref(),
274            init_params.clone(),
275        )
276        .await
277        .unwrap();
278        let params = system_param_ctl.get_pb_params().await;
279        assert_eq!(params, system_params_for_test());
280
281        // set parameter.
282        let new_params = system_param_ctl
283            .set_param("pause_on_next_bootstrap", Some("true".into()))
284            .await
285            .unwrap();
286
287        // insert deprecated params.
288        let deprecated_param = system_parameter::ActiveModel {
289            name: Set("deprecated_param".into()),
290            value: Set("foo".into()),
291            is_mutable: Set(true),
292            description: Set(None),
293        };
294        deprecated_param.insert(&system_param_ctl.db).await.unwrap();
295
296        // init system parameter controller as not first launch.
297        let system_param_ctl = SystemParamsController::new(
298            meta_store,
299            env.notification_manager_ref(),
300            init_params.clone(),
301        )
302        .await
303        .unwrap();
304        // check deprecated params are cleaned up.
305        assert!(
306            SystemParameter::find_by_id("deprecated_param".to_owned())
307                .one(&system_param_ctl.db)
308                .await
309                .unwrap()
310                .is_none()
311        );
312        // check new params are set.
313        let params = system_param_ctl.get_pb_params().await;
314        assert_eq!(params, new_params);
315        // check db consistency.
316        let models = SystemParameter::find()
317            .all(&system_param_ctl.db)
318            .await
319            .unwrap();
320        let db_params = system_params_from_db(models).unwrap();
321        assert_eq!(db_params, new_params);
322    }
323}