risingwave_meta/controller/
session_params.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::SessionInitConfig;
20use risingwave_common::session_config::{SessionConfig, SessionConfigError};
21use risingwave_meta_model::prelude::SessionParameter;
22use risingwave_meta_model::session_parameter;
23use risingwave_pb::meta::SetSessionParamRequest;
24use risingwave_pb::meta::subscribe_response::{Info, Operation};
25use sea_orm::ActiveValue::Set;
26use sea_orm::{DatabaseConnection, EntityTrait, TransactionTrait};
27use thiserror_ext::AsReport;
28use tokio::sync::RwLock;
29use tracing::info;
30
31use crate::controller::SqlMetaStore;
32use crate::manager::{LocalNotification, NotificationManagerRef};
33use crate::{MetaError, MetaResult};
34
35pub type SessionParamsControllerRef = Arc<SessionParamsController>;
36
37pub struct SessionParamsController {
40 db: DatabaseConnection,
41 params: RwLock<SessionConfig>,
43 notification_manager: NotificationManagerRef,
44}
45
46impl SessionParamsController {
47 pub async fn new(
48 sql_meta_store: SqlMetaStore,
49 notification_manager: NotificationManagerRef,
50 session_init: SessionInitConfig,
51 ) -> MetaResult<Self> {
52 let db = sql_meta_store.conn;
53
54 let mut init_params = SessionConfig::default();
59
60 let mut session_init_values: HashMap<String, String> = HashMap::new();
64 for (name, value) in session_init.entries() {
65 let normalized = init_params.set(name, value.to_owned(), &mut ())?;
66 session_init_values.insert(name.to_owned(), normalized);
67 }
68 if !session_init_values.is_empty() {
69 info!(
70 "[session_init] seeds session parameters during cluster bootstrap only; \
71 persisted values in the meta store take precedence on existing clusters"
72 );
73 }
74
75 let params = SessionParameter::find().all(&db).await?;
77 for param in params {
78 if let Some(configured) = session_init_values.get(¶m.name)
79 && *configured != param.value
80 {
81 tracing::warn!(
82 parameter = %param.name,
83 session_init_value = %configured,
84 persisted_value = %param.value,
85 "session_init value differs from persisted session parameter, using persisted value"
86 );
87 }
88 if let Err(e) = init_params.set(¶m.name, param.value, &mut ()) {
89 match e {
90 SessionConfigError::InvalidValue { .. } => {
91 tracing::error!(error = %e.as_report(), "failed to set parameter from meta database, using default value {}", init_params.get(¶m.name)?)
92 }
93 SessionConfigError::UnrecognizedEntry(_) => {
94 tracing::error!(error = %e.as_report(), "failed to set parameter from meta database")
95 }
96 }
97 }
98 }
99
100 info!(?init_params, "session parameters");
101
102 let ctl = Self {
103 db,
104 params: RwLock::new(init_params.clone()),
105 notification_manager,
106 };
107 ctl.flush_params().await?;
109
110 Ok(ctl)
111 }
112
113 pub async fn get_params(&self) -> SessionConfig {
114 self.params.read().await.clone()
115 }
116
117 async fn flush_params(&self) -> MetaResult<()> {
118 let params = self.params.read().await.list_all();
119 let models = params
120 .into_iter()
121 .map(|param| session_parameter::ActiveModel {
122 name: Set(param.name),
123 value: Set(param.setting),
124 description: Set(Some(param.description)),
125 })
126 .collect_vec();
127 let txn = self.db.begin().await?;
128 SessionParameter::delete_many().exec(&txn).await?;
131 SessionParameter::insert_many(models).exec(&txn).await?;
132 txn.commit().await?;
133 Ok(())
134 }
135
136 pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<String> {
137 let mut params_guard = self.params.write().await;
138 let name = SessionConfig::alias_to_entry_name(name);
139 let Some(param) = SessionParameter::find_by_id(name.clone())
140 .one(&self.db)
141 .await?
142 else {
143 return Err(MetaError::system_params(format!(
144 "unrecognized session parameter {:?}",
145 name
146 )));
147 };
148 let old_batch_parallelism = params_guard.batch_parallelism();
149 let reporter = &mut ();
151 let new_param = if let Some(value) = value {
152 params_guard.set(&name, value, reporter)?
153 } else {
154 params_guard.reset(&name, reporter)?
155 };
156
157 let mut param: session_parameter::ActiveModel = param.into();
158 param.value = Set(new_param.clone());
159 SessionParameter::update(param).exec(&self.db).await?;
160 let new_batch_parallelism = params_guard.batch_parallelism();
161 self.notify_workers(name.clone(), new_param.clone());
162 if old_batch_parallelism != new_batch_parallelism {
163 self.notification_manager
164 .notify_local_subscribers(LocalNotification::BatchParallelismChange);
165 }
166
167 Ok(new_param)
168 }
169
170 pub fn notify_workers(&self, name: String, value: String) {
171 self.notification_manager.notify_frontend_without_version(
172 Operation::Update,
173 Info::SessionParam(SetSessionParamRequest {
174 param: name,
175 value: Some(value),
176 }),
177 );
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use sea_orm::ColumnTrait;
184
185 use super::*;
186 use crate::manager::MetaSrvEnv;
187
188 #[tokio::test]
189 async fn test_session_params() {
190 use sea_orm::QueryFilter;
191
192 let env = MetaSrvEnv::for_test().await;
193 let meta_store = env.meta_store_ref();
194 let init_params = SessionConfig::default();
195
196 let session_param_ctl = SessionParamsController::new(
198 meta_store.clone(),
199 env.notification_manager_ref(),
200 SessionInitConfig::default(),
201 )
202 .await
203 .unwrap();
204 let params = session_param_ctl.get_params().await;
205 assert_eq!(params, init_params);
206
207 let new_params = session_param_ctl
209 .set_param("rw_implicit_flush", Some("true".into()))
210 .await
211 .unwrap();
212
213 let deprecated_param = session_parameter::ActiveModel {
215 name: Set("deprecated_param".into()),
216 value: Set("foo".into()),
217 description: Set(None),
218 };
219 SessionParameter::insert(deprecated_param)
220 .exec(&session_param_ctl.db)
221 .await
222 .unwrap();
223
224 let session_param_ctl = SessionParamsController::new(
226 meta_store.clone(),
227 env.notification_manager_ref(),
228 SessionInitConfig::default(),
229 )
230 .await
231 .unwrap();
232 assert!(
234 SessionParameter::find_by_id("deprecated_param".to_owned())
235 .one(&session_param_ctl.db)
236 .await
237 .unwrap()
238 .is_none()
239 );
240 let params = session_param_ctl.get_params().await;
242 assert_eq!(params.get("rw_implicit_flush").unwrap(), new_params);
243 assert_eq!(
244 params.get("rw_implicit_flush").unwrap(),
245 params.get("implicit_flush").unwrap()
246 );
247 let models = SessionParameter::find()
250 .filter(session_parameter::Column::Name.eq("rw_implicit_flush"))
251 .one(&session_param_ctl.db)
252 .await
253 .unwrap();
254 assert!(models.is_none());
255 let models = SessionParameter::find()
256 .filter(session_parameter::Column::Name.eq("implicit_flush"))
257 .one(&session_param_ctl.db)
258 .await
259 .unwrap()
260 .unwrap();
261 assert_eq!(models.value, params.get("rw_implicit_flush").unwrap());
262 }
263
264 #[tokio::test]
267 async fn test_session_init_bootstrap() {
268 let env = MetaSrvEnv::for_test().await;
269 let meta_store = env.meta_store_ref().clone();
270 SessionParameter::delete_many()
272 .exec(&meta_store.conn)
273 .await
274 .unwrap();
275
276 let session_init = SessionInitConfig {
277 streaming_parallelism: Some("bounded(8)".to_owned()),
278 streaming_parallelism_for_table: Some("default".to_owned()),
279 ..Default::default()
280 };
281 let ctl = SessionParamsController::new(
282 meta_store.clone(),
283 env.notification_manager_ref(),
284 session_init,
285 )
286 .await
287 .unwrap();
288
289 let params = ctl.get_params().await;
290 assert_eq!(params.get("streaming_parallelism").unwrap(), "bounded(8)");
291 assert_eq!(
293 params.get("streaming_parallelism_for_table").unwrap(),
294 "default"
295 );
296
297 let persisted = SessionParameter::find_by_id("streaming_parallelism".to_owned())
299 .one(&meta_store.conn)
300 .await
301 .unwrap()
302 .unwrap();
303 assert_eq!(persisted.value, "bounded(8)");
304 }
305
306 #[tokio::test]
308 async fn test_session_init_persisted_takes_precedence() {
309 let env = MetaSrvEnv::for_test().await;
310 let meta_store = env.meta_store_ref().clone();
311
312 let ctl = SessionParamsController::new(
314 meta_store.clone(),
315 env.notification_manager_ref(),
316 SessionInitConfig::default(),
317 )
318 .await
319 .unwrap();
320 ctl.set_param("streaming_parallelism", Some("ratio(0.5)".to_owned()))
321 .await
322 .unwrap();
323
324 let session_init = SessionInitConfig {
326 streaming_parallelism: Some("bounded(8)".to_owned()),
327 ..Default::default()
328 };
329 let ctl = SessionParamsController::new(
330 meta_store.clone(),
331 env.notification_manager_ref(),
332 session_init,
333 )
334 .await
335 .unwrap();
336 let params = ctl.get_params().await;
337 assert_eq!(params.get("streaming_parallelism").unwrap(), "ratio(0.5)");
338 }
339
340 #[tokio::test]
343 async fn test_session_init_seeds_missing_field() {
344 let env = MetaSrvEnv::for_test().await;
345 let meta_store = env.meta_store_ref().clone();
346
347 SessionParamsController::new(
349 meta_store.clone(),
350 env.notification_manager_ref(),
351 SessionInitConfig::default(),
352 )
353 .await
354 .unwrap();
355 SessionParameter::delete_by_id("streaming_parallelism_for_materialized_view".to_owned())
357 .exec(&meta_store.conn)
358 .await
359 .unwrap();
360
361 let session_init = SessionInitConfig {
362 streaming_parallelism_for_materialized_view: Some("bounded(4)".to_owned()),
363 ..Default::default()
364 };
365 let ctl = SessionParamsController::new(
366 meta_store.clone(),
367 env.notification_manager_ref(),
368 session_init,
369 )
370 .await
371 .unwrap();
372 let params = ctl.get_params().await;
373 assert_eq!(
374 params
375 .get("streaming_parallelism_for_materialized_view")
376 .unwrap(),
377 "bounded(4)"
378 );
379 }
380
381 #[tokio::test]
382 async fn test_session_init_invalid_value_fails_without_persisting() {
383 let env = MetaSrvEnv::for_test().await;
384 let meta_store = env.meta_store_ref().clone();
385 SessionParameter::delete_many()
387 .exec(&meta_store.conn)
388 .await
389 .unwrap();
390
391 let session_init = SessionInitConfig {
392 streaming_parallelism_for_backfill: Some("bounded(2)".to_owned()),
393 ..Default::default()
394 };
395 let err = match SessionParamsController::new(
396 meta_store.clone(),
397 env.notification_manager_ref(),
398 session_init,
399 )
400 .await
401 {
402 Ok(_) => panic!("invalid [session_init] should fail"),
403 Err(err) => err,
404 };
405 assert!(err
406 .to_string()
407 .contains("SessionParams error: Invalid value `bounded(2)` for `streaming_parallelism_for_backfill`"));
408
409 let persisted = SessionParameter::find()
410 .all(&meta_store.conn)
411 .await
412 .unwrap();
413 assert!(persisted.is_empty());
414 }
415}