Skip to main content

risingwave_meta/controller/
system_param.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use risingwave_common::system_param::common::CommonHandler;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_common::system_param::{
22    check_missing_params, derive_missing_fields, set_system_param, validate_init_system_params,
23};
24use risingwave_common::{for_all_params, key_of};
25use risingwave_meta_model::prelude::SystemParameter;
26use risingwave_meta_model::system_parameter;
27use risingwave_pb::meta::PbSystemParams;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use sea_orm::ActiveValue::Set;
30use sea_orm::{DatabaseConnection, EntityTrait, TransactionTrait};
31use tokio::sync::RwLock;
32use tokio::sync::oneshot::Sender;
33use tokio::task::JoinHandle;
34
35use crate::controller::SqlMetaStore;
36use crate::manager::{LocalNotification, NotificationManagerRef};
37use crate::{MetaError, MetaResult};
38
39pub type SystemParamsControllerRef = Arc<SystemParamsController>;
40
41pub struct SystemParamsController {
42    db: DatabaseConnection,
43    // Notify workers and local subscribers of parameter change.
44    notification_manager: NotificationManagerRef,
45    // Cached parameters.
46    params: RwLock<PbSystemParams>,
47    /// Common handler for system params.
48    common_handler: CommonHandler,
49}
50
51/// Derive system params from db models.
52macro_rules! impl_system_params_from_db {
53    ($({ $field:ident, $type:ty, $($rest:tt)* },)*) => {
54        /// Try to deserialize deprecated fields as well.
55        /// Warn if there are unrecognized fields.
56        #[expect(deprecated)]
57        pub fn system_params_from_db(mut models: Vec<system_parameter::Model>) -> MetaResult<PbSystemParams> {
58            let mut params = PbSystemParams::default();
59            models.retain(|model| {
60                match model.name.as_str() {
61                    $(
62                        key_of!($field) => {
63                            params.$field = Some(model.value.parse::<$type>().unwrap().into());
64                            false
65                        }
66                    )*
67                    _ => true,
68                }
69            });
70            derive_missing_fields(&mut params);
71            if !models.is_empty() {
72                let unrecognized_params = models.into_iter().map(|model| model.name).collect::<Vec<_>>();
73                tracing::warn!("unrecognized system params {:?}", unrecognized_params);
74            }
75            Ok(params)
76        }
77    };
78}
79
80/// Derive serialization to db models.
81macro_rules! impl_system_params_to_models {
82    ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
83        #[expect(deprecated)]
84        #[allow(clippy::vec_init_then_push)]
85        pub fn system_params_to_model(params: &PbSystemParams) -> MetaResult<Vec<system_parameter::ActiveModel>> {
86            check_missing_params(params).map_err(|e| anyhow!(e))?;
87            let mut models = Vec::new();
88            $(
89                let value = params.$field.as_ref().unwrap().to_string();
90                models.push(system_parameter::ActiveModel {
91                    name: Set(key_of!($field).to_string()),
92                    value: Set(value),
93                    is_mutable: Set($is_mutable),
94                    description: Set(None),
95                });
96            )*
97            Ok(models)
98       }
99    };
100}
101
102// For each field in `persisted` and `init`
103// 1. Some, None: The persisted field is deprecated, so just ignore it.
104// 2. Some, Some: Check equality and warn if they differ.
105// 3. None, Some: A new version of RW cluster is launched for the first time and newly introduced
106// params are not set. Use init value.
107// 4. None, None: A new version of RW cluster is launched for the first time and newly introduced
108// params are not set. The new field is not initialized either, just leave it as `None`.
109macro_rules! impl_merge_params {
110    ($({ $field:ident, $($rest:tt)* },)*) => {
111        #[expect(deprecated)]
112        fn merge_params(mut persisted: PbSystemParams, init: PbSystemParams) -> PbSystemParams {
113            $(
114                match (persisted.$field.as_ref(), init.$field) {
115                    (Some(persisted), Some(init)) => {
116                        if persisted != &init {
117                            tracing::warn!(
118                                "The initializing value of {} ({}) differ from persisted ({}), using persisted value",
119                                key_of!($field),
120                                init,
121                                persisted
122                            );
123                        }
124                    },
125                    (None, Some(init)) => persisted.$field = Some(init),
126                    _ => {},
127                }
128            )*
129            persisted
130        }
131    };
132}
133
134for_all_params!(impl_system_params_from_db);
135for_all_params!(impl_merge_params);
136for_all_params!(impl_system_params_to_models);
137
138impl SystemParamsController {
139    pub async fn new(
140        sql_meta_store: SqlMetaStore,
141        notification_manager: NotificationManagerRef,
142        init_params: PbSystemParams,
143    ) -> MetaResult<Self> {
144        let db = sql_meta_store.conn;
145        let params = SystemParameter::find().all(&db).await?;
146        let params = merge_params(system_params_from_db(params)?, init_params);
147        tracing::info!(initial_params = ?SystemParamsReader::new(&params), "initialize system parameters");
148        check_missing_params(&params).map_err(|e| anyhow!(e))?;
149        validate_init_system_params(&params).map_err(|e| anyhow!(e))?;
150        let ctl = Self {
151            db,
152            notification_manager,
153            params: RwLock::new(params.clone()),
154            common_handler: CommonHandler::new(params.into()),
155        };
156        // flush to db.
157        ctl.flush_params().await?;
158
159        Ok(ctl)
160    }
161
162    pub async fn get_pb_params(&self) -> PbSystemParams {
163        self.params.read().await.clone()
164    }
165
166    pub async fn get_params(&self) -> SystemParamsReader {
167        self.params.read().await.clone().into()
168    }
169
170    async fn flush_params(&self) -> MetaResult<()> {
171        let params = self.params.read().await;
172        let models = system_params_to_model(&params)?;
173        let txn = self.db.begin().await?;
174        // delete all params first and then insert all params. It follows the same logic
175        // as the old code, we'd better change it to another way later to keep consistency.
176        SystemParameter::delete_many().exec(&txn).await?;
177        SystemParameter::insert_many(models).exec(&txn).await?;
178        txn.commit().await?;
179        Ok(())
180    }
181
182    pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<PbSystemParams> {
183        let mut params_guard = self.params.write().await;
184
185        let Some(param) = SystemParameter::find_by_id(name.to_owned())
186            .one(&self.db)
187            .await?
188        else {
189            return Err(MetaError::system_params(format!(
190                "unrecognized system parameter {:?}",
191                name
192            )));
193        };
194        let mut params = params_guard.clone();
195        let mut param: system_parameter::ActiveModel = param.into();
196        let Some((new_value, diff)) =
197            set_system_param(&mut params, name, value).map_err(MetaError::system_params)?
198        else {
199            // No changes on the parameter.
200            return Ok(params);
201        };
202
203        param.value = Set(new_value);
204        SystemParameter::update(param).exec(&self.db).await?;
205        *params_guard = params.clone();
206
207        // Run common handler.
208        self.common_handler.handle_change(&diff);
209
210        // TODO: notify the diff instead of the snapshot.
211
212        // Sync params to other managers on the meta node only once, since it's infallible.
213        self.notification_manager
214            .notify_local_subscribers(LocalNotification::SystemParamsChange(params.clone().into()));
215
216        // Sync params to worker nodes.
217        self.notify_workers(&params);
218
219        Ok(params)
220    }
221
222    // Periodically sync params to worker nodes.
223    pub fn start_params_notifier(
224        system_params_controller: Arc<Self>,
225    ) -> (JoinHandle<()>, Sender<()>) {
226        const NOTIFY_INTERVAL: Duration = Duration::from_millis(5000);
227
228        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
229        let join_handle = tokio::spawn(async move {
230            let mut interval = tokio::time::interval(NOTIFY_INTERVAL);
231            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
232            loop {
233                tokio::select! {
234                    _ = interval.tick() => {},
235                    _ = &mut shutdown_rx => {
236                        tracing::info!("System params notifier is stopped");
237                        return;
238                    }
239                }
240                system_params_controller
241                    .notify_workers(&*system_params_controller.params.read().await);
242            }
243        });
244
245        (join_handle, shutdown_tx)
246    }
247
248    // Notify workers of parameter change.
249    // TODO: add system params into snapshot to avoid periodically sync.
250    fn notify_workers(&self, params: &PbSystemParams) {
251        self.notification_manager
252            .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
253        self.notification_manager
254            .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
255        self.notification_manager
256            .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use risingwave_common::system_param::system_params_for_test;
263
264    use super::*;
265    use crate::manager::MetaSrvEnv;
266
267    #[tokio::test]
268    async fn test_system_params() {
269        let env = MetaSrvEnv::for_test().await;
270        let meta_store = env.meta_store();
271        let init_params = system_params_for_test();
272
273        // init system parameter controller as first launch.
274        let system_param_ctl = SystemParamsController::new(
275            meta_store.clone(),
276            env.notification_manager_ref(),
277            init_params.clone(),
278        )
279        .await
280        .unwrap();
281        let params = system_param_ctl.get_pb_params().await;
282        assert_eq!(params, system_params_for_test());
283
284        // set parameter.
285        let new_params = system_param_ctl
286            .set_param("pause_on_next_bootstrap", Some("true".into()))
287            .await
288            .unwrap();
289
290        // insert deprecated params.
291        let deprecated_param = system_parameter::ActiveModel {
292            name: Set("deprecated_param".into()),
293            value: Set("foo".into()),
294            is_mutable: Set(true),
295            description: Set(None),
296        };
297        SystemParameter::insert(deprecated_param)
298            .exec(&system_param_ctl.db)
299            .await
300            .unwrap();
301
302        // init system parameter controller as not first launch.
303        let system_param_ctl = SystemParamsController::new(
304            meta_store,
305            env.notification_manager_ref(),
306            init_params.clone(),
307        )
308        .await
309        .unwrap();
310        // check deprecated params are cleaned up.
311        assert!(
312            SystemParameter::find_by_id("deprecated_param".to_owned())
313                .one(&system_param_ctl.db)
314                .await
315                .unwrap()
316                .is_none()
317        );
318        // check new params are set.
319        let params = system_param_ctl.get_pb_params().await;
320        assert_eq!(params, new_params);
321        // check db consistency.
322        let models = SystemParameter::find()
323            .all(&system_param_ctl.db)
324            .await
325            .unwrap();
326        let db_params = system_params_from_db(models).unwrap();
327        assert_eq!(db_params, new_params);
328    }
329}