risingwave_meta/controller/
session_params.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use itertools::Itertools;
18use risingwave_common::session_config::{SessionConfig, SessionConfigError};
19use risingwave_meta_model::prelude::SessionParameter;
20use risingwave_meta_model::session_parameter;
21use risingwave_pb::meta::SetSessionParamRequest;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use sea_orm::ActiveValue::Set;
24use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
25use thiserror_ext::AsReport;
26use tokio::sync::RwLock;
27use tracing::info;
28
29use crate::controller::SqlMetaStore;
30use crate::manager::NotificationManagerRef;
31use crate::{MetaError, MetaResult};
32
33pub type SessionParamsControllerRef = Arc<SessionParamsController>;
34
35/// Manages the global default session params on meta.
36/// Note that the session params in each session will be initialized from the default value here.
37pub struct SessionParamsController {
38    db: DatabaseConnection,
39    // Cached parameters.
40    params: RwLock<SessionConfig>,
41    notification_manager: NotificationManagerRef,
42}
43
44impl SessionParamsController {
45    pub async fn new(
46        sql_meta_store: SqlMetaStore,
47        notification_manager: NotificationManagerRef,
48        mut init_params: SessionConfig,
49    ) -> MetaResult<Self> {
50        let db = sql_meta_store.conn;
51        let params = SessionParameter::find().all(&db).await?;
52        for param in params {
53            if let Err(e) = init_params.set(&param.name, param.value, &mut ()) {
54                match e {
55                    SessionConfigError::InvalidValue { .. } => {
56                        tracing::error!(error = %e.as_report(), "failed to set parameter from meta database, using default value {}", init_params.get(&param.name)?)
57                    }
58                    SessionConfigError::UnrecognizedEntry(_) => {
59                        tracing::error!(error = %e.as_report(), "failed to set parameter from meta database")
60                    }
61                }
62            }
63        }
64
65        info!(?init_params, "session parameters");
66
67        let ctl = Self {
68            db,
69            params: RwLock::new(init_params.clone()),
70            notification_manager,
71        };
72        // flush to db.
73        ctl.flush_params().await?;
74
75        Ok(ctl)
76    }
77
78    pub async fn get_params(&self) -> SessionConfig {
79        self.params.read().await.clone()
80    }
81
82    async fn flush_params(&self) -> MetaResult<()> {
83        let params = self.params.read().await.list_all();
84        let models = params
85            .into_iter()
86            .map(|param| session_parameter::ActiveModel {
87                name: Set(param.name),
88                value: Set(param.setting),
89                description: Set(Some(param.description)),
90            })
91            .collect_vec();
92        let txn = self.db.begin().await?;
93        // delete all params first and then insert all params. It follows the same logic
94        // as the old code, we'd better change it to another way later to keep consistency.
95        SessionParameter::delete_many().exec(&txn).await?;
96        SessionParameter::insert_many(models).exec(&txn).await?;
97        txn.commit().await?;
98        Ok(())
99    }
100
101    pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<String> {
102        let mut params_guard = self.params.write().await;
103        let name = SessionConfig::alias_to_entry_name(name);
104        let Some(param) = SessionParameter::find_by_id(name.clone())
105            .one(&self.db)
106            .await?
107        else {
108            return Err(MetaError::system_params(format!(
109                "unrecognized session parameter {:?}",
110                name
111            )));
112        };
113        // FIXME: use a real reporter
114        let reporter = &mut ();
115        let new_param = if let Some(value) = value {
116            params_guard.set(&name, value, reporter)?
117        } else {
118            params_guard.reset(&name, reporter)?
119        };
120
121        let mut param: session_parameter::ActiveModel = param.into();
122        param.value = Set(new_param.clone());
123        param.update(&self.db).await?;
124
125        self.notify_workers(name.clone(), new_param.clone());
126
127        Ok(new_param)
128    }
129
130    pub fn notify_workers(&self, name: String, value: String) {
131        self.notification_manager.notify_frontend_without_version(
132            Operation::Update,
133            Info::SessionParam(SetSessionParamRequest {
134                param: name,
135                value: Some(value),
136            }),
137        );
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use sea_orm::ColumnTrait;
144
145    use super::*;
146    use crate::manager::MetaSrvEnv;
147
148    #[tokio::test]
149    async fn test_session_params() {
150        use sea_orm::QueryFilter;
151
152        let env = MetaSrvEnv::for_test().await;
153        let meta_store = env.meta_store_ref();
154        let init_params = SessionConfig::default();
155
156        // init system parameter controller as first launch.
157        let session_param_ctl = SessionParamsController::new(
158            meta_store.clone(),
159            env.notification_manager_ref(),
160            init_params.clone(),
161        )
162        .await
163        .unwrap();
164        let params = session_param_ctl.get_params().await;
165        assert_eq!(params, init_params);
166
167        // set parameter.
168        let new_params = session_param_ctl
169            .set_param("rw_implicit_flush", Some("true".into()))
170            .await
171            .unwrap();
172
173        // insert deprecated params.
174        let deprecated_param = session_parameter::ActiveModel {
175            name: Set("deprecated_param".into()),
176            value: Set("foo".into()),
177            description: Set(None),
178        };
179        deprecated_param
180            .insert(&session_param_ctl.db)
181            .await
182            .unwrap();
183
184        // init system parameter controller as not first launch.
185        let session_param_ctl = SessionParamsController::new(
186            meta_store.clone(),
187            env.notification_manager_ref(),
188            init_params.clone(),
189        )
190        .await
191        .unwrap();
192        // check deprecated params are cleaned up.
193        assert!(
194            SessionParameter::find_by_id("deprecated_param".to_owned())
195                .one(&session_param_ctl.db)
196                .await
197                .unwrap()
198                .is_none()
199        );
200        // check new params are set.
201        let params = session_param_ctl.get_params().await;
202        assert_eq!(params.get("rw_implicit_flush").unwrap(), new_params);
203        assert_eq!(
204            params.get("rw_implicit_flush").unwrap(),
205            params.get("implicit_flush").unwrap()
206        );
207        // check db consistency.
208        // rw_implicit_flush is alias to implicit_flush <https://github.com/risingwavelabs/risingwave/pull/18769>
209        let models = SessionParameter::find()
210            .filter(session_parameter::Column::Name.eq("rw_implicit_flush"))
211            .one(&session_param_ctl.db)
212            .await
213            .unwrap();
214        assert!(models.is_none());
215        let models = SessionParameter::find()
216            .filter(session_parameter::Column::Name.eq("implicit_flush"))
217            .one(&session_param_ctl.db)
218            .await
219            .unwrap()
220            .unwrap();
221        assert_eq!(models.value, params.get("rw_implicit_flush").unwrap());
222    }
223}