risingwave_meta/controller/
session_params.rs1use std::sync::Arc;
16
17use itertools::Itertools;
18use risingwave_common::session_config::{SessionConfig, SessionConfigError};
19use risingwave_meta_model::prelude::SessionParameter;
20use risingwave_meta_model::session_parameter;
21use risingwave_pb::meta::SetSessionParamRequest;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use sea_orm::ActiveValue::Set;
24use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
25use thiserror_ext::AsReport;
26use tokio::sync::RwLock;
27use tracing::info;
28
29use crate::controller::SqlMetaStore;
30use crate::manager::{LocalNotification, NotificationManagerRef};
31use crate::{MetaError, MetaResult};
32
33pub type SessionParamsControllerRef = Arc<SessionParamsController>;
34
35pub struct SessionParamsController {
38 db: DatabaseConnection,
39 params: RwLock<SessionConfig>,
41 notification_manager: NotificationManagerRef,
42}
43
44impl SessionParamsController {
45 pub async fn new(
46 sql_meta_store: SqlMetaStore,
47 notification_manager: NotificationManagerRef,
48 mut init_params: SessionConfig,
49 ) -> MetaResult<Self> {
50 let db = sql_meta_store.conn;
51 let params = SessionParameter::find().all(&db).await?;
52 for param in params {
53 if let Err(e) = init_params.set(¶m.name, param.value, &mut ()) {
54 match e {
55 SessionConfigError::InvalidValue { .. } => {
56 tracing::error!(error = %e.as_report(), "failed to set parameter from meta database, using default value {}", init_params.get(¶m.name)?)
57 }
58 SessionConfigError::UnrecognizedEntry(_) => {
59 tracing::error!(error = %e.as_report(), "failed to set parameter from meta database")
60 }
61 }
62 }
63 }
64
65 info!(?init_params, "session parameters");
66
67 let ctl = Self {
68 db,
69 params: RwLock::new(init_params.clone()),
70 notification_manager,
71 };
72 ctl.flush_params().await?;
74
75 Ok(ctl)
76 }
77
78 pub async fn get_params(&self) -> SessionConfig {
79 self.params.read().await.clone()
80 }
81
82 async fn flush_params(&self) -> MetaResult<()> {
83 let params = self.params.read().await.list_all();
84 let models = params
85 .into_iter()
86 .map(|param| session_parameter::ActiveModel {
87 name: Set(param.name),
88 value: Set(param.setting),
89 description: Set(Some(param.description)),
90 })
91 .collect_vec();
92 let txn = self.db.begin().await?;
93 SessionParameter::delete_many().exec(&txn).await?;
96 SessionParameter::insert_many(models).exec(&txn).await?;
97 txn.commit().await?;
98 Ok(())
99 }
100
101 pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<String> {
102 let mut params_guard = self.params.write().await;
103 let name = SessionConfig::alias_to_entry_name(name);
104 let Some(param) = SessionParameter::find_by_id(name.clone())
105 .one(&self.db)
106 .await?
107 else {
108 return Err(MetaError::system_params(format!(
109 "unrecognized session parameter {:?}",
110 name
111 )));
112 };
113 let old_batch_parallelism = params_guard.batch_parallelism();
114 let reporter = &mut ();
116 let new_param = if let Some(value) = value {
117 params_guard.set(&name, value, reporter)?
118 } else {
119 params_guard.reset(&name, reporter)?
120 };
121
122 let mut param: session_parameter::ActiveModel = param.into();
123 param.value = Set(new_param.clone());
124 param.update(&self.db).await?;
125 let new_batch_parallelism = params_guard.batch_parallelism();
126 self.notify_workers(name.clone(), new_param.clone());
127 if old_batch_parallelism != new_batch_parallelism {
128 self.notification_manager
129 .notify_local_subscribers(LocalNotification::BatchParallelismChange);
130 }
131
132 Ok(new_param)
133 }
134
135 pub fn notify_workers(&self, name: String, value: String) {
136 self.notification_manager.notify_frontend_without_version(
137 Operation::Update,
138 Info::SessionParam(SetSessionParamRequest {
139 param: name,
140 value: Some(value),
141 }),
142 );
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use sea_orm::ColumnTrait;
149
150 use super::*;
151 use crate::manager::MetaSrvEnv;
152
153 #[tokio::test]
154 async fn test_session_params() {
155 use sea_orm::QueryFilter;
156
157 let env = MetaSrvEnv::for_test().await;
158 let meta_store = env.meta_store_ref();
159 let init_params = SessionConfig::default();
160
161 let session_param_ctl = SessionParamsController::new(
163 meta_store.clone(),
164 env.notification_manager_ref(),
165 init_params.clone(),
166 )
167 .await
168 .unwrap();
169 let params = session_param_ctl.get_params().await;
170 assert_eq!(params, init_params);
171
172 let new_params = session_param_ctl
174 .set_param("rw_implicit_flush", Some("true".into()))
175 .await
176 .unwrap();
177
178 let deprecated_param = session_parameter::ActiveModel {
180 name: Set("deprecated_param".into()),
181 value: Set("foo".into()),
182 description: Set(None),
183 };
184 deprecated_param
185 .insert(&session_param_ctl.db)
186 .await
187 .unwrap();
188
189 let session_param_ctl = SessionParamsController::new(
191 meta_store.clone(),
192 env.notification_manager_ref(),
193 init_params.clone(),
194 )
195 .await
196 .unwrap();
197 assert!(
199 SessionParameter::find_by_id("deprecated_param".to_owned())
200 .one(&session_param_ctl.db)
201 .await
202 .unwrap()
203 .is_none()
204 );
205 let params = session_param_ctl.get_params().await;
207 assert_eq!(params.get("rw_implicit_flush").unwrap(), new_params);
208 assert_eq!(
209 params.get("rw_implicit_flush").unwrap(),
210 params.get("implicit_flush").unwrap()
211 );
212 let models = SessionParameter::find()
215 .filter(session_parameter::Column::Name.eq("rw_implicit_flush"))
216 .one(&session_param_ctl.db)
217 .await
218 .unwrap();
219 assert!(models.is_none());
220 let models = SessionParameter::find()
221 .filter(session_parameter::Column::Name.eq("implicit_flush"))
222 .one(&session_param_ctl.db)
223 .await
224 .unwrap()
225 .unwrap();
226 assert_eq!(models.value, params.get("rw_implicit_flush").unwrap());
227 }
228}