risingwave_frontend/handler/
alter_database_param.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::catalog::AlterDatabaseParam;
17use risingwave_common::system_param::{NOTICE_BARRIER_INTERVAL_MS, NOTICE_CHECKPOINT_FREQUENCY};
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::error::Result;
23
24pub async fn handle_alter_database_param(
25    handler_args: HandlerArgs,
26    database_name: ObjectName,
27    param: AlterDatabaseParam,
28) -> Result<RwPgResponse> {
29    let mut builder = RwPgResponse::builder(StatementType::ALTER_DATABASE);
30
31    let session = handler_args.session;
32
33    let database_name = Binder::resolve_database_name(database_name)?;
34    let database_id = {
35        let catalog_reader = session.env().catalog_reader().read_guard();
36        let database = catalog_reader.get_database_by_name(&database_name)?;
37
38        // The user should be super user or owner to alter the database.
39        session.check_privilege_for_drop_alter_db_schema(database)?;
40
41        database.id()
42    };
43
44    match param {
45        AlterDatabaseParam::BarrierIntervalMs(Some(interval)) => {
46            if interval >= NOTICE_BARRIER_INTERVAL_MS {
47                builder = builder.notice(
48                    format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
49                             interval, NOTICE_BARRIER_INTERVAL_MS));
50            }
51        }
52        AlterDatabaseParam::CheckpointFrequency(Some(frequency)) => {
53            if frequency >= NOTICE_CHECKPOINT_FREQUENCY {
54                builder = builder.notice(
55                    format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
56                             frequency, NOTICE_CHECKPOINT_FREQUENCY));
57            }
58        }
59        _ => {}
60    }
61
62    let catalog_writer = session.catalog_writer()?;
63    catalog_writer
64        .alter_database_param(database_id, param)
65        .await?;
66
67    Ok(builder.into())
68}
69
70#[cfg(test)]
71mod tests {
72    use crate::test_utils::LocalFrontend;
73
74    #[tokio::test]
75    async fn test_alter_barrier() {
76        let frontend = LocalFrontend::new(Default::default()).await;
77        let session = frontend.session_ref();
78        let catalog_reader = session.env().catalog_reader();
79
80        frontend.run_sql("CREATE DATABASE test_db").await.unwrap();
81        {
82            let reader = catalog_reader.read_guard();
83            let db = reader.get_database_by_name("test_db").unwrap();
84            assert!(db.barrier_interval_ms.is_none());
85            assert!(db.checkpoint_frequency.is_none());
86        }
87
88        frontend
89            .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = 1000")
90            .await
91            .unwrap();
92        {
93            let reader = catalog_reader.read_guard();
94            let db = reader.get_database_by_name("test_db").unwrap();
95            assert_eq!(db.barrier_interval_ms, Some(1000));
96            assert!(db.checkpoint_frequency.is_none());
97        }
98
99        frontend
100            .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = 10")
101            .await
102            .unwrap();
103        {
104            let reader = catalog_reader.read_guard();
105            let db = reader.get_database_by_name("test_db").unwrap();
106            assert_eq!(db.barrier_interval_ms, Some(1000));
107            assert_eq!(db.checkpoint_frequency, Some(10));
108        }
109
110        frontend
111            .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = DEFAULT")
112            .await
113            .unwrap();
114        {
115            let reader = catalog_reader.read_guard();
116            let db = reader.get_database_by_name("test_db").unwrap();
117            assert!(db.barrier_interval_ms.is_none());
118            assert_eq!(db.checkpoint_frequency, Some(10));
119        }
120
121        frontend
122            .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = DEFAULT")
123            .await
124            .unwrap();
125        {
126            let reader = catalog_reader.read_guard();
127            let db = reader.get_database_by_name("test_db").unwrap();
128            assert!(db.barrier_interval_ms.is_none());
129            assert!(db.checkpoint_frequency.is_none());
130        }
131    }
132}