risingwave_meta/controller/
session_params.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::SessionInitConfig;
20use risingwave_common::session_config::{SessionConfig, SessionConfigError};
21use risingwave_meta_model::prelude::SessionParameter;
22use risingwave_meta_model::session_parameter;
23use risingwave_pb::meta::SetSessionParamRequest;
24use risingwave_pb::meta::subscribe_response::{Info, Operation};
25use sea_orm::ActiveValue::Set;
26use sea_orm::{DatabaseConnection, EntityTrait, TransactionTrait};
27use thiserror_ext::AsReport;
28use tokio::sync::RwLock;
29use tracing::info;
30
31use crate::controller::SqlMetaStore;
32use crate::manager::{LocalNotification, NotificationManagerRef};
33use crate::{MetaError, MetaResult};
34
35pub type SessionParamsControllerRef = Arc<SessionParamsController>;
36
37/// Manages the global default session params on meta.
38/// Note that the session params in each session will be initialized from the default value here.
39pub struct SessionParamsController {
40    db: DatabaseConnection,
41    // Cached parameters.
42    params: RwLock<SessionConfig>,
43    notification_manager: NotificationManagerRef,
44}
45
46impl SessionParamsController {
47    pub async fn new(
48        sql_meta_store: SqlMetaStore,
49        notification_manager: NotificationManagerRef,
50        session_init: SessionInitConfig,
51    ) -> MetaResult<Self> {
52        let db = sql_meta_store.conn;
53
54        // Precedence (high to low):
55        // 1. Persisted value in Meta store (`session_parameter`)
56        // 2. Explicit value in `[session_init]`
57        // 3. Built-in `SessionConfig::default()`
58        let mut init_params = SessionConfig::default();
59
60        // Apply the explicitly-configured `[session_init]` values onto the built-in defaults.
61        // Record the normalized value of each so we can later detect when a persisted value
62        // takes precedence and warn the operator.
63        let mut session_init_values: HashMap<String, String> = HashMap::new();
64        for (name, value) in session_init.entries() {
65            let normalized = init_params.set(name, value.to_owned(), &mut ())?;
66            session_init_values.insert(name.to_owned(), normalized);
67        }
68        if !session_init_values.is_empty() {
69            info!(
70                "[session_init] seeds session parameters during cluster bootstrap only; \
71                 persisted values in the meta store take precedence on existing clusters"
72            );
73        }
74
75        // Persisted values take precedence over `[session_init]`.
76        let params = SessionParameter::find().all(&db).await?;
77        for param in params {
78            if let Some(configured) = session_init_values.get(&param.name)
79                && *configured != param.value
80            {
81                tracing::warn!(
82                    parameter = %param.name,
83                    session_init_value = %configured,
84                    persisted_value = %param.value,
85                    "session_init value differs from persisted session parameter, using persisted value"
86                );
87            }
88            if let Err(e) = init_params.set(&param.name, param.value, &mut ()) {
89                match e {
90                    SessionConfigError::InvalidValue { .. } => {
91                        tracing::error!(error = %e.as_report(), "failed to set parameter from meta database, using default value {}", init_params.get(&param.name)?)
92                    }
93                    SessionConfigError::UnrecognizedEntry(_) => {
94                        tracing::error!(error = %e.as_report(), "failed to set parameter from meta database")
95                    }
96                }
97            }
98        }
99
100        info!(?init_params, "session parameters");
101
102        let ctl = Self {
103            db,
104            params: RwLock::new(init_params.clone()),
105            notification_manager,
106        };
107        // flush to db.
108        ctl.flush_params().await?;
109
110        Ok(ctl)
111    }
112
113    pub async fn get_params(&self) -> SessionConfig {
114        self.params.read().await.clone()
115    }
116
117    async fn flush_params(&self) -> MetaResult<()> {
118        let params = self.params.read().await.list_all();
119        let models = params
120            .into_iter()
121            .map(|param| session_parameter::ActiveModel {
122                name: Set(param.name),
123                value: Set(param.setting),
124                description: Set(Some(param.description)),
125            })
126            .collect_vec();
127        let txn = self.db.begin().await?;
128        // delete all params first and then insert all params. It follows the same logic
129        // as the old code, we'd better change it to another way later to keep consistency.
130        SessionParameter::delete_many().exec(&txn).await?;
131        SessionParameter::insert_many(models).exec(&txn).await?;
132        txn.commit().await?;
133        Ok(())
134    }
135
136    pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<String> {
137        let mut params_guard = self.params.write().await;
138        let name = SessionConfig::alias_to_entry_name(name);
139        let Some(param) = SessionParameter::find_by_id(name.clone())
140            .one(&self.db)
141            .await?
142        else {
143            return Err(MetaError::system_params(format!(
144                "unrecognized session parameter {:?}",
145                name
146            )));
147        };
148        let old_batch_parallelism = params_guard.batch_parallelism();
149        // FIXME: use a real reporter
150        let reporter = &mut ();
151        let new_param = if let Some(value) = value {
152            params_guard.set(&name, value, reporter)?
153        } else {
154            params_guard.reset(&name, reporter)?
155        };
156
157        let mut param: session_parameter::ActiveModel = param.into();
158        param.value = Set(new_param.clone());
159        SessionParameter::update(param).exec(&self.db).await?;
160        let new_batch_parallelism = params_guard.batch_parallelism();
161        self.notify_workers(name.clone(), new_param.clone());
162        if old_batch_parallelism != new_batch_parallelism {
163            self.notification_manager
164                .notify_local_subscribers(LocalNotification::BatchParallelismChange);
165        }
166
167        Ok(new_param)
168    }
169
170    pub fn notify_workers(&self, name: String, value: String) {
171        self.notification_manager.notify_frontend_without_version(
172            Operation::Update,
173            Info::SessionParam(SetSessionParamRequest {
174                param: name,
175                value: Some(value),
176            }),
177        );
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use sea_orm::ColumnTrait;
184
185    use super::*;
186    use crate::manager::MetaSrvEnv;
187
188    #[tokio::test]
189    async fn test_session_params() {
190        use sea_orm::QueryFilter;
191
192        let env = MetaSrvEnv::for_test().await;
193        let meta_store = env.meta_store_ref();
194        let init_params = SessionConfig::default();
195
196        // init system parameter controller as first launch.
197        let session_param_ctl = SessionParamsController::new(
198            meta_store.clone(),
199            env.notification_manager_ref(),
200            SessionInitConfig::default(),
201        )
202        .await
203        .unwrap();
204        let params = session_param_ctl.get_params().await;
205        assert_eq!(params, init_params);
206
207        // set parameter.
208        let new_params = session_param_ctl
209            .set_param("rw_implicit_flush", Some("true".into()))
210            .await
211            .unwrap();
212
213        // insert deprecated params.
214        let deprecated_param = session_parameter::ActiveModel {
215            name: Set("deprecated_param".into()),
216            value: Set("foo".into()),
217            description: Set(None),
218        };
219        SessionParameter::insert(deprecated_param)
220            .exec(&session_param_ctl.db)
221            .await
222            .unwrap();
223
224        // init system parameter controller as not first launch.
225        let session_param_ctl = SessionParamsController::new(
226            meta_store.clone(),
227            env.notification_manager_ref(),
228            SessionInitConfig::default(),
229        )
230        .await
231        .unwrap();
232        // check deprecated params are cleaned up.
233        assert!(
234            SessionParameter::find_by_id("deprecated_param".to_owned())
235                .one(&session_param_ctl.db)
236                .await
237                .unwrap()
238                .is_none()
239        );
240        // check new params are set.
241        let params = session_param_ctl.get_params().await;
242        assert_eq!(params.get("rw_implicit_flush").unwrap(), new_params);
243        assert_eq!(
244            params.get("rw_implicit_flush").unwrap(),
245            params.get("implicit_flush").unwrap()
246        );
247        // check db consistency.
248        // rw_implicit_flush is alias to implicit_flush <https://github.com/risingwavelabs/risingwave/pull/18769>
249        let models = SessionParameter::find()
250            .filter(session_parameter::Column::Name.eq("rw_implicit_flush"))
251            .one(&session_param_ctl.db)
252            .await
253            .unwrap();
254        assert!(models.is_none());
255        let models = SessionParameter::find()
256            .filter(session_parameter::Column::Name.eq("implicit_flush"))
257            .one(&session_param_ctl.db)
258            .await
259            .unwrap()
260            .unwrap();
261        assert_eq!(models.value, params.get("rw_implicit_flush").unwrap());
262    }
263
264    /// Scenario 1: on a new cluster, `[session_init]` seeds values into the meta store, including
265    /// the `default` placeholder which must be persisted verbatim.
266    #[tokio::test]
267    async fn test_session_init_bootstrap() {
268        let env = MetaSrvEnv::for_test().await;
269        let meta_store = env.meta_store_ref().clone();
270        // Simulate an empty store: drop the rows seeded while constructing the test env.
271        SessionParameter::delete_many()
272            .exec(&meta_store.conn)
273            .await
274            .unwrap();
275
276        let session_init = SessionInitConfig {
277            streaming_parallelism: Some("bounded(8)".to_owned()),
278            streaming_parallelism_for_table: Some("default".to_owned()),
279            ..Default::default()
280        };
281        let ctl = SessionParamsController::new(
282            meta_store.clone(),
283            env.notification_manager_ref(),
284            session_init,
285        )
286        .await
287        .unwrap();
288
289        let params = ctl.get_params().await;
290        assert_eq!(params.get("streaming_parallelism").unwrap(), "bounded(8)");
291        // `default` must be persisted as-is, not materialized into a concrete value.
292        assert_eq!(
293            params.get("streaming_parallelism_for_table").unwrap(),
294            "default"
295        );
296
297        // Values are persisted to the meta store.
298        let persisted = SessionParameter::find_by_id("streaming_parallelism".to_owned())
299            .one(&meta_store.conn)
300            .await
301            .unwrap()
302            .unwrap();
303        assert_eq!(persisted.value, "bounded(8)");
304    }
305
306    /// Scenario 2: on an existing cluster, a persisted value takes precedence over `[session_init]`.
307    #[tokio::test]
308    async fn test_session_init_persisted_takes_precedence() {
309        let env = MetaSrvEnv::for_test().await;
310        let meta_store = env.meta_store_ref().clone();
311
312        // First launch: seed the store and then mimic an `ALTER SYSTEM SET`.
313        let ctl = SessionParamsController::new(
314            meta_store.clone(),
315            env.notification_manager_ref(),
316            SessionInitConfig::default(),
317        )
318        .await
319        .unwrap();
320        ctl.set_param("streaming_parallelism", Some("ratio(0.5)".to_owned()))
321            .await
322            .unwrap();
323
324        // Restart with a conflicting `[session_init]`: the persisted value must win.
325        let session_init = SessionInitConfig {
326            streaming_parallelism: Some("bounded(8)".to_owned()),
327            ..Default::default()
328        };
329        let ctl = SessionParamsController::new(
330            meta_store.clone(),
331            env.notification_manager_ref(),
332            session_init,
333        )
334        .await
335        .unwrap();
336        let params = ctl.get_params().await;
337        assert_eq!(params.get("streaming_parallelism").unwrap(), "ratio(0.5)");
338    }
339
340    /// Scenario 3: a newly supported field missing from the persisted `session_parameter` table is
341    /// seeded from `[session_init]` on restart.
342    #[tokio::test]
343    async fn test_session_init_seeds_missing_field() {
344        let env = MetaSrvEnv::for_test().await;
345        let meta_store = env.meta_store_ref().clone();
346
347        // First launch seeds and persists defaults for all fields.
348        SessionParamsController::new(
349            meta_store.clone(),
350            env.notification_manager_ref(),
351            SessionInitConfig::default(),
352        )
353        .await
354        .unwrap();
355        // Simulate an older cluster that has no row for this field.
356        SessionParameter::delete_by_id("streaming_parallelism_for_materialized_view".to_owned())
357            .exec(&meta_store.conn)
358            .await
359            .unwrap();
360
361        let session_init = SessionInitConfig {
362            streaming_parallelism_for_materialized_view: Some("bounded(4)".to_owned()),
363            ..Default::default()
364        };
365        let ctl = SessionParamsController::new(
366            meta_store.clone(),
367            env.notification_manager_ref(),
368            session_init,
369        )
370        .await
371        .unwrap();
372        let params = ctl.get_params().await;
373        assert_eq!(
374            params
375                .get("streaming_parallelism_for_materialized_view")
376                .unwrap(),
377            "bounded(4)"
378        );
379    }
380
381    #[tokio::test]
382    async fn test_session_init_invalid_value_fails_without_persisting() {
383        let env = MetaSrvEnv::for_test().await;
384        let meta_store = env.meta_store_ref().clone();
385        // Simulate an empty store: drop the rows seeded while constructing the test env.
386        SessionParameter::delete_many()
387            .exec(&meta_store.conn)
388            .await
389            .unwrap();
390
391        let session_init = SessionInitConfig {
392            streaming_parallelism_for_backfill: Some("bounded(2)".to_owned()),
393            ..Default::default()
394        };
395        let err = match SessionParamsController::new(
396            meta_store.clone(),
397            env.notification_manager_ref(),
398            session_init,
399        )
400        .await
401        {
402            Ok(_) => panic!("invalid [session_init] should fail"),
403            Err(err) => err,
404        };
405        assert!(err
406            .to_string()
407            .contains("SessionParams error: Invalid value `bounded(2)` for `streaming_parallelism_for_backfill`"));
408
409        let persisted = SessionParameter::find()
410            .all(&meta_store.conn)
411            .await
412            .unwrap();
413        assert!(persisted.is_empty());
414    }
415}