risingwave_meta/controller/
session_params.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use itertools::Itertools;
18use risingwave_common::session_config::{SessionConfig, SessionConfigError};
19use risingwave_meta_model::prelude::SessionParameter;
20use risingwave_meta_model::session_parameter;
21use risingwave_pb::meta::SetSessionParamRequest;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use sea_orm::ActiveValue::Set;
24use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
25use thiserror_ext::AsReport;
26use tokio::sync::RwLock;
27use tracing::info;
28
29use crate::controller::SqlMetaStore;
30use crate::manager::{LocalNotification, NotificationManagerRef};
31use crate::{MetaError, MetaResult};
32
33pub type SessionParamsControllerRef = Arc<SessionParamsController>;
34
35/// Manages the global default session params on meta.
36/// Note that the session params in each session will be initialized from the default value here.
37pub struct SessionParamsController {
38    db: DatabaseConnection,
39    // Cached parameters.
40    params: RwLock<SessionConfig>,
41    notification_manager: NotificationManagerRef,
42}
43
44impl SessionParamsController {
45    pub async fn new(
46        sql_meta_store: SqlMetaStore,
47        notification_manager: NotificationManagerRef,
48        mut init_params: SessionConfig,
49    ) -> MetaResult<Self> {
50        let db = sql_meta_store.conn;
51        let params = SessionParameter::find().all(&db).await?;
52        for param in params {
53            if let Err(e) = init_params.set(&param.name, param.value, &mut ()) {
54                match e {
55                    SessionConfigError::InvalidValue { .. } => {
56                        tracing::error!(error = %e.as_report(), "failed to set parameter from meta database, using default value {}", init_params.get(&param.name)?)
57                    }
58                    SessionConfigError::UnrecognizedEntry(_) => {
59                        tracing::error!(error = %e.as_report(), "failed to set parameter from meta database")
60                    }
61                }
62            }
63        }
64
65        info!(?init_params, "session parameters");
66
67        let ctl = Self {
68            db,
69            params: RwLock::new(init_params.clone()),
70            notification_manager,
71        };
72        // flush to db.
73        ctl.flush_params().await?;
74
75        Ok(ctl)
76    }
77
78    pub async fn get_params(&self) -> SessionConfig {
79        self.params.read().await.clone()
80    }
81
82    async fn flush_params(&self) -> MetaResult<()> {
83        let params = self.params.read().await.list_all();
84        let models = params
85            .into_iter()
86            .map(|param| session_parameter::ActiveModel {
87                name: Set(param.name),
88                value: Set(param.setting),
89                description: Set(Some(param.description)),
90            })
91            .collect_vec();
92        let txn = self.db.begin().await?;
93        // delete all params first and then insert all params. It follows the same logic
94        // as the old code, we'd better change it to another way later to keep consistency.
95        SessionParameter::delete_many().exec(&txn).await?;
96        SessionParameter::insert_many(models).exec(&txn).await?;
97        txn.commit().await?;
98        Ok(())
99    }
100
101    pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<String> {
102        let mut params_guard = self.params.write().await;
103        let name = SessionConfig::alias_to_entry_name(name);
104        let Some(param) = SessionParameter::find_by_id(name.clone())
105            .one(&self.db)
106            .await?
107        else {
108            return Err(MetaError::system_params(format!(
109                "unrecognized session parameter {:?}",
110                name
111            )));
112        };
113        let old_batch_parallelism = params_guard.batch_parallelism();
114        // FIXME: use a real reporter
115        let reporter = &mut ();
116        let new_param = if let Some(value) = value {
117            params_guard.set(&name, value, reporter)?
118        } else {
119            params_guard.reset(&name, reporter)?
120        };
121
122        let mut param: session_parameter::ActiveModel = param.into();
123        param.value = Set(new_param.clone());
124        param.update(&self.db).await?;
125        let new_batch_parallelism = params_guard.batch_parallelism();
126        self.notify_workers(name.clone(), new_param.clone());
127        if old_batch_parallelism != new_batch_parallelism {
128            self.notification_manager
129                .notify_local_subscribers(LocalNotification::BatchParallelismChange)
130                .await;
131        }
132
133        Ok(new_param)
134    }
135
136    pub fn notify_workers(&self, name: String, value: String) {
137        self.notification_manager.notify_frontend_without_version(
138            Operation::Update,
139            Info::SessionParam(SetSessionParamRequest {
140                param: name,
141                value: Some(value),
142            }),
143        );
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use sea_orm::ColumnTrait;
150
151    use super::*;
152    use crate::manager::MetaSrvEnv;
153
154    #[tokio::test]
155    async fn test_session_params() {
156        use sea_orm::QueryFilter;
157
158        let env = MetaSrvEnv::for_test().await;
159        let meta_store = env.meta_store_ref();
160        let init_params = SessionConfig::default();
161
162        // init system parameter controller as first launch.
163        let session_param_ctl = SessionParamsController::new(
164            meta_store.clone(),
165            env.notification_manager_ref(),
166            init_params.clone(),
167        )
168        .await
169        .unwrap();
170        let params = session_param_ctl.get_params().await;
171        assert_eq!(params, init_params);
172
173        // set parameter.
174        let new_params = session_param_ctl
175            .set_param("rw_implicit_flush", Some("true".into()))
176            .await
177            .unwrap();
178
179        // insert deprecated params.
180        let deprecated_param = session_parameter::ActiveModel {
181            name: Set("deprecated_param".into()),
182            value: Set("foo".into()),
183            description: Set(None),
184        };
185        deprecated_param
186            .insert(&session_param_ctl.db)
187            .await
188            .unwrap();
189
190        // init system parameter controller as not first launch.
191        let session_param_ctl = SessionParamsController::new(
192            meta_store.clone(),
193            env.notification_manager_ref(),
194            init_params.clone(),
195        )
196        .await
197        .unwrap();
198        // check deprecated params are cleaned up.
199        assert!(
200            SessionParameter::find_by_id("deprecated_param".to_owned())
201                .one(&session_param_ctl.db)
202                .await
203                .unwrap()
204                .is_none()
205        );
206        // check new params are set.
207        let params = session_param_ctl.get_params().await;
208        assert_eq!(params.get("rw_implicit_flush").unwrap(), new_params);
209        assert_eq!(
210            params.get("rw_implicit_flush").unwrap(),
211            params.get("implicit_flush").unwrap()
212        );
213        // check db consistency.
214        // rw_implicit_flush is alias to implicit_flush <https://github.com/risingwavelabs/risingwave/pull/18769>
215        let models = SessionParameter::find()
216            .filter(session_parameter::Column::Name.eq("rw_implicit_flush"))
217            .one(&session_param_ctl.db)
218            .await
219            .unwrap();
220        assert!(models.is_none());
221        let models = SessionParameter::find()
222            .filter(session_parameter::Column::Name.eq("implicit_flush"))
223            .one(&session_param_ctl.db)
224            .await
225            .unwrap()
226            .unwrap();
227        assert_eq!(models.value, params.get("rw_implicit_flush").unwrap());
228    }
229}