risingwave_frontend/handler/
alter_database_param.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::catalog::AlterDatabaseParam;
17use risingwave_common::system_param::{NOTICE_BARRIER_INTERVAL_MS, NOTICE_CHECKPOINT_FREQUENCY};
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::error::Result;
23
24pub async fn handle_alter_database_param(
25    handler_args: HandlerArgs,
26    database_name: ObjectName,
27    param: AlterDatabaseParam,
28) -> Result<RwPgResponse> {
29    let mut builder = RwPgResponse::builder(StatementType::ALTER_DATABASE);
30
31    let session = handler_args.session;
32
33    let database_name = Binder::resolve_database_name(database_name)?;
34    let database_id = {
35        let catalog_reader = session.env().catalog_reader().read_guard();
36        let database = catalog_reader.get_database_by_name(&database_name)?;
37
38        // The user should be super user or owner to alter the database.
39        session.check_privilege_for_drop_alter_db_schema(database)?;
40
41        database.id()
42    };
43
44    match param {
45        AlterDatabaseParam::BarrierIntervalMs(Some(interval)) => {
46            if !cfg!(test) {
47                risingwave_common::license::Feature::ResourceGroup.check_available()?;
48            }
49            if interval >= NOTICE_BARRIER_INTERVAL_MS {
50                builder = builder.notice(
51                    format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
52                             interval, NOTICE_BARRIER_INTERVAL_MS));
53            }
54        }
55        AlterDatabaseParam::CheckpointFrequency(Some(frequency)) => {
56            if !cfg!(test) {
57                risingwave_common::license::Feature::ResourceGroup.check_available()?;
58            }
59            if frequency >= NOTICE_CHECKPOINT_FREQUENCY {
60                builder = builder.notice(
61                    format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
62                             frequency, NOTICE_CHECKPOINT_FREQUENCY));
63            }
64        }
65        _ => {}
66    }
67
68    let catalog_writer = session.catalog_writer()?;
69    catalog_writer
70        .alter_database_param(database_id, param)
71        .await?;
72
73    Ok(builder.into())
74}
75
76#[cfg(test)]
77mod tests {
78    use crate::test_utils::LocalFrontend;
79
80    #[tokio::test]
81    async fn test_alter_barrier() {
82        let frontend = LocalFrontend::new(Default::default()).await;
83        let session = frontend.session_ref();
84        let catalog_reader = session.env().catalog_reader();
85
86        frontend.run_sql("CREATE DATABASE test_db").await.unwrap();
87        {
88            let reader = catalog_reader.read_guard();
89            let db = reader.get_database_by_name("test_db").unwrap();
90            assert!(db.barrier_interval_ms.is_none());
91            assert!(db.checkpoint_frequency.is_none());
92        }
93
94        frontend
95            .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = 1000")
96            .await
97            .unwrap();
98        {
99            let reader = catalog_reader.read_guard();
100            let db = reader.get_database_by_name("test_db").unwrap();
101            assert_eq!(db.barrier_interval_ms, Some(1000));
102            assert!(db.checkpoint_frequency.is_none());
103        }
104
105        frontend
106            .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = 10")
107            .await
108            .unwrap();
109        {
110            let reader = catalog_reader.read_guard();
111            let db = reader.get_database_by_name("test_db").unwrap();
112            assert_eq!(db.barrier_interval_ms, Some(1000));
113            assert_eq!(db.checkpoint_frequency, Some(10));
114        }
115
116        frontend
117            .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = DEFAULT")
118            .await
119            .unwrap();
120        {
121            let reader = catalog_reader.read_guard();
122            let db = reader.get_database_by_name("test_db").unwrap();
123            assert!(db.barrier_interval_ms.is_none());
124            assert_eq!(db.checkpoint_frequency, Some(10));
125        }
126
127        frontend
128            .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = DEFAULT")
129            .await
130            .unwrap();
131        {
132            let reader = catalog_reader.read_guard();
133            let db = reader.get_database_by_name("test_db").unwrap();
134            assert!(db.barrier_interval_ms.is_none());
135            assert!(db.checkpoint_frequency.is_none());
136        }
137    }
138}