risingwave_meta/controller/
session_params.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use itertools::Itertools;
18use risingwave_common::session_config::{SessionConfig, SessionConfigError};
19use risingwave_meta_model::prelude::SessionParameter;
20use risingwave_meta_model::session_parameter;
21use risingwave_pb::meta::SetSessionParamRequest;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use sea_orm::ActiveValue::Set;
24use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
25use thiserror_ext::AsReport;
26use tokio::sync::RwLock;
27use tracing::info;
28
29use crate::controller::SqlMetaStore;
30use crate::manager::{LocalNotification, NotificationManagerRef};
31use crate::{MetaError, MetaResult};
32
33pub type SessionParamsControllerRef = Arc<SessionParamsController>;
34
35/// Manages the global default session params on meta.
36/// Note that the session params in each session will be initialized from the default value here.
37pub struct SessionParamsController {
38    db: DatabaseConnection,
39    // Cached parameters.
40    params: RwLock<SessionConfig>,
41    notification_manager: NotificationManagerRef,
42}
43
44impl SessionParamsController {
45    pub async fn new(
46        sql_meta_store: SqlMetaStore,
47        notification_manager: NotificationManagerRef,
48        mut init_params: SessionConfig,
49    ) -> MetaResult<Self> {
50        let db = sql_meta_store.conn;
51        let params = SessionParameter::find().all(&db).await?;
52        for param in params {
53            if let Err(e) = init_params.set(&param.name, param.value, &mut ()) {
54                match e {
55                    SessionConfigError::InvalidValue { .. } => {
56                        tracing::error!(error = %e.as_report(), "failed to set parameter from meta database, using default value {}", init_params.get(&param.name)?)
57                    }
58                    SessionConfigError::UnrecognizedEntry(_) => {
59                        tracing::error!(error = %e.as_report(), "failed to set parameter from meta database")
60                    }
61                }
62            }
63        }
64
65        info!(?init_params, "session parameters");
66
67        let ctl = Self {
68            db,
69            params: RwLock::new(init_params.clone()),
70            notification_manager,
71        };
72        // flush to db.
73        ctl.flush_params().await?;
74
75        Ok(ctl)
76    }
77
78    pub async fn get_params(&self) -> SessionConfig {
79        self.params.read().await.clone()
80    }
81
82    async fn flush_params(&self) -> MetaResult<()> {
83        let params = self.params.read().await.list_all();
84        let models = params
85            .into_iter()
86            .map(|param| session_parameter::ActiveModel {
87                name: Set(param.name),
88                value: Set(param.setting),
89                description: Set(Some(param.description)),
90            })
91            .collect_vec();
92        let txn = self.db.begin().await?;
93        // delete all params first and then insert all params. It follows the same logic
94        // as the old code, we'd better change it to another way later to keep consistency.
95        SessionParameter::delete_many().exec(&txn).await?;
96        SessionParameter::insert_many(models).exec(&txn).await?;
97        txn.commit().await?;
98        Ok(())
99    }
100
101    pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<String> {
102        let mut params_guard = self.params.write().await;
103        let name = SessionConfig::alias_to_entry_name(name);
104        let Some(param) = SessionParameter::find_by_id(name.clone())
105            .one(&self.db)
106            .await?
107        else {
108            return Err(MetaError::system_params(format!(
109                "unrecognized session parameter {:?}",
110                name
111            )));
112        };
113        let old_batch_parallelism = params_guard.batch_parallelism();
114        // FIXME: use a real reporter
115        let reporter = &mut ();
116        let new_param = if let Some(value) = value {
117            params_guard.set(&name, value, reporter)?
118        } else {
119            params_guard.reset(&name, reporter)?
120        };
121
122        let mut param: session_parameter::ActiveModel = param.into();
123        param.value = Set(new_param.clone());
124        param.update(&self.db).await?;
125        let new_batch_parallelism = params_guard.batch_parallelism();
126        self.notify_workers(name.clone(), new_param.clone());
127        if old_batch_parallelism != new_batch_parallelism {
128            self.notification_manager
129                .notify_local_subscribers(LocalNotification::BatchParallelismChange);
130        }
131
132        Ok(new_param)
133    }
134
135    pub fn notify_workers(&self, name: String, value: String) {
136        self.notification_manager.notify_frontend_without_version(
137            Operation::Update,
138            Info::SessionParam(SetSessionParamRequest {
139                param: name,
140                value: Some(value),
141            }),
142        );
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use sea_orm::ColumnTrait;
149
150    use super::*;
151    use crate::manager::MetaSrvEnv;
152
153    #[tokio::test]
154    async fn test_session_params() {
155        use sea_orm::QueryFilter;
156
157        let env = MetaSrvEnv::for_test().await;
158        let meta_store = env.meta_store_ref();
159        let init_params = SessionConfig::default();
160
161        // init system parameter controller as first launch.
162        let session_param_ctl = SessionParamsController::new(
163            meta_store.clone(),
164            env.notification_manager_ref(),
165            init_params.clone(),
166        )
167        .await
168        .unwrap();
169        let params = session_param_ctl.get_params().await;
170        assert_eq!(params, init_params);
171
172        // set parameter.
173        let new_params = session_param_ctl
174            .set_param("rw_implicit_flush", Some("true".into()))
175            .await
176            .unwrap();
177
178        // insert deprecated params.
179        let deprecated_param = session_parameter::ActiveModel {
180            name: Set("deprecated_param".into()),
181            value: Set("foo".into()),
182            description: Set(None),
183        };
184        deprecated_param
185            .insert(&session_param_ctl.db)
186            .await
187            .unwrap();
188
189        // init system parameter controller as not first launch.
190        let session_param_ctl = SessionParamsController::new(
191            meta_store.clone(),
192            env.notification_manager_ref(),
193            init_params.clone(),
194        )
195        .await
196        .unwrap();
197        // check deprecated params are cleaned up.
198        assert!(
199            SessionParameter::find_by_id("deprecated_param".to_owned())
200                .one(&session_param_ctl.db)
201                .await
202                .unwrap()
203                .is_none()
204        );
205        // check new params are set.
206        let params = session_param_ctl.get_params().await;
207        assert_eq!(params.get("rw_implicit_flush").unwrap(), new_params);
208        assert_eq!(
209            params.get("rw_implicit_flush").unwrap(),
210            params.get("implicit_flush").unwrap()
211        );
212        // check db consistency.
213        // rw_implicit_flush is alias to implicit_flush <https://github.com/risingwavelabs/risingwave/pull/18769>
214        let models = SessionParameter::find()
215            .filter(session_parameter::Column::Name.eq("rw_implicit_flush"))
216            .one(&session_param_ctl.db)
217            .await
218            .unwrap();
219        assert!(models.is_none());
220        let models = SessionParameter::find()
221            .filter(session_parameter::Column::Name.eq("implicit_flush"))
222            .one(&session_param_ctl.db)
223            .await
224            .unwrap()
225            .unwrap();
226        assert_eq!(models.value, params.get("rw_implicit_flush").unwrap());
227    }
228}