risingwave_meta/controller/
session_params.rs1use std::sync::Arc;
16
17use itertools::Itertools;
18use risingwave_common::session_config::{SessionConfig, SessionConfigError};
19use risingwave_meta_model::prelude::SessionParameter;
20use risingwave_meta_model::session_parameter;
21use risingwave_pb::meta::SetSessionParamRequest;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use sea_orm::ActiveValue::Set;
24use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
25use thiserror_ext::AsReport;
26use tokio::sync::RwLock;
27use tracing::info;
28
29use crate::controller::SqlMetaStore;
30use crate::manager::NotificationManagerRef;
31use crate::{MetaError, MetaResult};
32
33pub type SessionParamsControllerRef = Arc<SessionParamsController>;
34
35pub struct SessionParamsController {
38 db: DatabaseConnection,
39 params: RwLock<SessionConfig>,
41 notification_manager: NotificationManagerRef,
42}
43
44impl SessionParamsController {
45 pub async fn new(
46 sql_meta_store: SqlMetaStore,
47 notification_manager: NotificationManagerRef,
48 mut init_params: SessionConfig,
49 ) -> MetaResult<Self> {
50 let db = sql_meta_store.conn;
51 let params = SessionParameter::find().all(&db).await?;
52 for param in params {
53 if let Err(e) = init_params.set(¶m.name, param.value, &mut ()) {
54 match e {
55 SessionConfigError::InvalidValue { .. } => {
56 tracing::error!(error = %e.as_report(), "failed to set parameter from meta database, using default value {}", init_params.get(¶m.name)?)
57 }
58 SessionConfigError::UnrecognizedEntry(_) => {
59 tracing::error!(error = %e.as_report(), "failed to set parameter from meta database")
60 }
61 }
62 }
63 }
64
65 info!(?init_params, "session parameters");
66
67 let ctl = Self {
68 db,
69 params: RwLock::new(init_params.clone()),
70 notification_manager,
71 };
72 ctl.flush_params().await?;
74
75 Ok(ctl)
76 }
77
78 pub async fn get_params(&self) -> SessionConfig {
79 self.params.read().await.clone()
80 }
81
82 async fn flush_params(&self) -> MetaResult<()> {
83 let params = self.params.read().await.list_all();
84 let models = params
85 .into_iter()
86 .map(|param| session_parameter::ActiveModel {
87 name: Set(param.name),
88 value: Set(param.setting),
89 description: Set(Some(param.description)),
90 })
91 .collect_vec();
92 let txn = self.db.begin().await?;
93 SessionParameter::delete_many().exec(&txn).await?;
96 SessionParameter::insert_many(models).exec(&txn).await?;
97 txn.commit().await?;
98 Ok(())
99 }
100
101 pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<String> {
102 let mut params_guard = self.params.write().await;
103 let name = SessionConfig::alias_to_entry_name(name);
104 let Some(param) = SessionParameter::find_by_id(name.clone())
105 .one(&self.db)
106 .await?
107 else {
108 return Err(MetaError::system_params(format!(
109 "unrecognized session parameter {:?}",
110 name
111 )));
112 };
113 let reporter = &mut ();
115 let new_param = if let Some(value) = value {
116 params_guard.set(&name, value, reporter)?
117 } else {
118 params_guard.reset(&name, reporter)?
119 };
120
121 let mut param: session_parameter::ActiveModel = param.into();
122 param.value = Set(new_param.clone());
123 param.update(&self.db).await?;
124
125 self.notify_workers(name.clone(), new_param.clone());
126
127 Ok(new_param)
128 }
129
130 pub fn notify_workers(&self, name: String, value: String) {
131 self.notification_manager.notify_frontend_without_version(
132 Operation::Update,
133 Info::SessionParam(SetSessionParamRequest {
134 param: name,
135 value: Some(value),
136 }),
137 );
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use sea_orm::ColumnTrait;
144
145 use super::*;
146 use crate::manager::MetaSrvEnv;
147
148 #[tokio::test]
149 async fn test_session_params() {
150 use sea_orm::QueryFilter;
151
152 let env = MetaSrvEnv::for_test().await;
153 let meta_store = env.meta_store_ref();
154 let init_params = SessionConfig::default();
155
156 let session_param_ctl = SessionParamsController::new(
158 meta_store.clone(),
159 env.notification_manager_ref(),
160 init_params.clone(),
161 )
162 .await
163 .unwrap();
164 let params = session_param_ctl.get_params().await;
165 assert_eq!(params, init_params);
166
167 let new_params = session_param_ctl
169 .set_param("rw_implicit_flush", Some("true".into()))
170 .await
171 .unwrap();
172
173 let deprecated_param = session_parameter::ActiveModel {
175 name: Set("deprecated_param".into()),
176 value: Set("foo".into()),
177 description: Set(None),
178 };
179 deprecated_param
180 .insert(&session_param_ctl.db)
181 .await
182 .unwrap();
183
184 let session_param_ctl = SessionParamsController::new(
186 meta_store.clone(),
187 env.notification_manager_ref(),
188 init_params.clone(),
189 )
190 .await
191 .unwrap();
192 assert!(
194 SessionParameter::find_by_id("deprecated_param".to_owned())
195 .one(&session_param_ctl.db)
196 .await
197 .unwrap()
198 .is_none()
199 );
200 let params = session_param_ctl.get_params().await;
202 assert_eq!(params.get("rw_implicit_flush").unwrap(), new_params);
203 assert_eq!(
204 params.get("rw_implicit_flush").unwrap(),
205 params.get("implicit_flush").unwrap()
206 );
207 let models = SessionParameter::find()
210 .filter(session_parameter::Column::Name.eq("rw_implicit_flush"))
211 .one(&session_param_ctl.db)
212 .await
213 .unwrap();
214 assert!(models.is_none());
215 let models = SessionParameter::find()
216 .filter(session_parameter::Column::Name.eq("implicit_flush"))
217 .one(&session_param_ctl.db)
218 .await
219 .unwrap()
220 .unwrap();
221 assert_eq!(models.value, params.get("rw_implicit_flush").unwrap());
222 }
223}