risingwave_meta/controller/
session_params.rs1use std::sync::Arc;
16
17use itertools::Itertools;
18use risingwave_common::session_config::{SessionConfig, SessionConfigError};
19use risingwave_meta_model::prelude::SessionParameter;
20use risingwave_meta_model::session_parameter;
21use risingwave_pb::meta::SetSessionParamRequest;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use sea_orm::ActiveValue::Set;
24use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};
25use thiserror_ext::AsReport;
26use tokio::sync::RwLock;
27use tracing::info;
28
29use crate::controller::SqlMetaStore;
30use crate::manager::{LocalNotification, NotificationManagerRef};
31use crate::{MetaError, MetaResult};
32
33pub type SessionParamsControllerRef = Arc<SessionParamsController>;
34
35pub struct SessionParamsController {
38 db: DatabaseConnection,
39 params: RwLock<SessionConfig>,
41 notification_manager: NotificationManagerRef,
42}
43
44impl SessionParamsController {
45 pub async fn new(
46 sql_meta_store: SqlMetaStore,
47 notification_manager: NotificationManagerRef,
48 mut init_params: SessionConfig,
49 ) -> MetaResult<Self> {
50 let db = sql_meta_store.conn;
51 let params = SessionParameter::find().all(&db).await?;
52 for param in params {
53 if let Err(e) = init_params.set(¶m.name, param.value, &mut ()) {
54 match e {
55 SessionConfigError::InvalidValue { .. } => {
56 tracing::error!(error = %e.as_report(), "failed to set parameter from meta database, using default value {}", init_params.get(¶m.name)?)
57 }
58 SessionConfigError::UnrecognizedEntry(_) => {
59 tracing::error!(error = %e.as_report(), "failed to set parameter from meta database")
60 }
61 }
62 }
63 }
64
65 info!(?init_params, "session parameters");
66
67 let ctl = Self {
68 db,
69 params: RwLock::new(init_params.clone()),
70 notification_manager,
71 };
72 ctl.flush_params().await?;
74
75 Ok(ctl)
76 }
77
78 pub async fn get_params(&self) -> SessionConfig {
79 self.params.read().await.clone()
80 }
81
82 async fn flush_params(&self) -> MetaResult<()> {
83 let params = self.params.read().await.list_all();
84 let models = params
85 .into_iter()
86 .map(|param| session_parameter::ActiveModel {
87 name: Set(param.name),
88 value: Set(param.setting),
89 description: Set(Some(param.description)),
90 })
91 .collect_vec();
92 let txn = self.db.begin().await?;
93 SessionParameter::delete_many().exec(&txn).await?;
96 SessionParameter::insert_many(models).exec(&txn).await?;
97 txn.commit().await?;
98 Ok(())
99 }
100
101 pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<String> {
102 let mut params_guard = self.params.write().await;
103 let name = SessionConfig::alias_to_entry_name(name);
104 let Some(param) = SessionParameter::find_by_id(name.clone())
105 .one(&self.db)
106 .await?
107 else {
108 return Err(MetaError::system_params(format!(
109 "unrecognized session parameter {:?}",
110 name
111 )));
112 };
113 let old_batch_parallelism = params_guard.batch_parallelism();
114 let reporter = &mut ();
116 let new_param = if let Some(value) = value {
117 params_guard.set(&name, value, reporter)?
118 } else {
119 params_guard.reset(&name, reporter)?
120 };
121
122 let mut param: session_parameter::ActiveModel = param.into();
123 param.value = Set(new_param.clone());
124 param.update(&self.db).await?;
125 let new_batch_parallelism = params_guard.batch_parallelism();
126 self.notify_workers(name.clone(), new_param.clone());
127 if old_batch_parallelism != new_batch_parallelism {
128 self.notification_manager
129 .notify_local_subscribers(LocalNotification::BatchParallelismChange)
130 .await;
131 }
132
133 Ok(new_param)
134 }
135
136 pub fn notify_workers(&self, name: String, value: String) {
137 self.notification_manager.notify_frontend_without_version(
138 Operation::Update,
139 Info::SessionParam(SetSessionParamRequest {
140 param: name,
141 value: Some(value),
142 }),
143 );
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use sea_orm::ColumnTrait;
150
151 use super::*;
152 use crate::manager::MetaSrvEnv;
153
154 #[tokio::test]
155 async fn test_session_params() {
156 use sea_orm::QueryFilter;
157
158 let env = MetaSrvEnv::for_test().await;
159 let meta_store = env.meta_store_ref();
160 let init_params = SessionConfig::default();
161
162 let session_param_ctl = SessionParamsController::new(
164 meta_store.clone(),
165 env.notification_manager_ref(),
166 init_params.clone(),
167 )
168 .await
169 .unwrap();
170 let params = session_param_ctl.get_params().await;
171 assert_eq!(params, init_params);
172
173 let new_params = session_param_ctl
175 .set_param("rw_implicit_flush", Some("true".into()))
176 .await
177 .unwrap();
178
179 let deprecated_param = session_parameter::ActiveModel {
181 name: Set("deprecated_param".into()),
182 value: Set("foo".into()),
183 description: Set(None),
184 };
185 deprecated_param
186 .insert(&session_param_ctl.db)
187 .await
188 .unwrap();
189
190 let session_param_ctl = SessionParamsController::new(
192 meta_store.clone(),
193 env.notification_manager_ref(),
194 init_params.clone(),
195 )
196 .await
197 .unwrap();
198 assert!(
200 SessionParameter::find_by_id("deprecated_param".to_owned())
201 .one(&session_param_ctl.db)
202 .await
203 .unwrap()
204 .is_none()
205 );
206 let params = session_param_ctl.get_params().await;
208 assert_eq!(params.get("rw_implicit_flush").unwrap(), new_params);
209 assert_eq!(
210 params.get("rw_implicit_flush").unwrap(),
211 params.get("implicit_flush").unwrap()
212 );
213 let models = SessionParameter::find()
216 .filter(session_parameter::Column::Name.eq("rw_implicit_flush"))
217 .one(&session_param_ctl.db)
218 .await
219 .unwrap();
220 assert!(models.is_none());
221 let models = SessionParameter::find()
222 .filter(session_parameter::Column::Name.eq("implicit_flush"))
223 .one(&session_param_ctl.db)
224 .await
225 .unwrap()
226 .unwrap();
227 assert_eq!(models.value, params.get("rw_implicit_flush").unwrap());
228 }
229}