Skip to main content

risingwave_frontend/handler/
alter_database_param.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::catalog::AlterDatabaseParam;
17use risingwave_common::system_param::{NOTICE_BARRIER_INTERVAL_MS, NOTICE_CHECKPOINT_FREQUENCY};
18use risingwave_sqlparser::ast::{ObjectName, SetVariableValue};
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::error::Result;
23use crate::handler::alter_resource_group::resolve_resource_group;
24
25pub async fn handle_alter_database_param(
26    handler_args: HandlerArgs,
27    database_name: ObjectName,
28    param: AlterDatabaseParam,
29) -> Result<RwPgResponse> {
30    let mut builder = RwPgResponse::builder(StatementType::ALTER_DATABASE);
31
32    let session = handler_args.session;
33
34    let database_name = Binder::resolve_database_name(database_name)?;
35    let database_id = {
36        let catalog_reader = session.env().catalog_reader().read_guard();
37        let database = catalog_reader.get_database_by_name(&database_name)?;
38
39        // The user should be super user or owner to alter the database.
40        session.check_privilege_for_drop_alter_db_schema(database)?;
41
42        database.id()
43    };
44
45    match param {
46        AlterDatabaseParam::BarrierIntervalMs(Some(interval)) => {
47            if !cfg!(test) {
48                risingwave_common::license::Feature::ResourceGroup.check_available()?;
49            }
50            if interval >= NOTICE_BARRIER_INTERVAL_MS {
51                builder = builder.notice(
52                    format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
53                             interval, NOTICE_BARRIER_INTERVAL_MS));
54            }
55        }
56        AlterDatabaseParam::CheckpointFrequency(Some(frequency)) => {
57            if !cfg!(test) {
58                risingwave_common::license::Feature::ResourceGroup.check_available()?;
59            }
60            if frequency >= NOTICE_CHECKPOINT_FREQUENCY {
61                builder = builder.notice(
62                    format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
63                             frequency, NOTICE_CHECKPOINT_FREQUENCY));
64            }
65        }
66        _ => {}
67    }
68
69    let catalog_writer = session.catalog_writer()?;
70    catalog_writer
71        .alter_database_param(database_id, param)
72        .await?;
73
74    Ok(builder.into())
75}
76
77pub async fn handle_alter_database_resource_group(
78    handler_args: HandlerArgs,
79    database_name: ObjectName,
80    resource_group: Option<SetVariableValue>,
81    _deferred: bool,
82) -> Result<RwPgResponse> {
83    let mut builder = RwPgResponse::builder(StatementType::ALTER_DATABASE);
84
85    let session = handler_args.session;
86
87    let database_name = Binder::resolve_database_name(database_name)?;
88    let database_id = {
89        let catalog_reader = session.env().catalog_reader().read_guard();
90        let database = catalog_reader.get_database_by_name(&database_name)?;
91
92        // The user should be super user or owner to alter the database.
93        session.check_privilege_for_drop_alter_db_schema(database)?;
94
95        database.id()
96    };
97
98    let resource_group = resource_group
99        .map(resolve_resource_group)
100        .transpose()?
101        .flatten();
102
103    if resource_group.is_some() {
104        risingwave_common::license::Feature::ResourceGroup.check_available()?;
105    }
106
107    let catalog_writer = session.catalog_writer()?;
108    catalog_writer
109        .alter_database_resource_group(database_id, resource_group, _deferred)
110        .await?;
111
112    builder = builder.notice(
113        "The database resource group metadata has been updated, but it will not take effect immediately. Manually trigger recovery for inherited streaming jobs to use the new database resource group."
114            .to_owned(),
115    );
116
117    Ok(builder.into())
118}
119
120#[cfg(test)]
121mod tests {
122    use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
123
124    use crate::test_utils::LocalFrontend;
125
126    #[tokio::test]
127    async fn test_alter_barrier() {
128        let frontend = LocalFrontend::new(Default::default()).await;
129        let session = frontend.session_ref();
130        let catalog_reader = session.env().catalog_reader();
131
132        frontend.run_sql("CREATE DATABASE test_db").await.unwrap();
133        {
134            let reader = catalog_reader.read_guard();
135            let db = reader.get_database_by_name("test_db").unwrap();
136            assert!(db.barrier_interval_ms.is_none());
137            assert!(db.checkpoint_frequency.is_none());
138        }
139
140        frontend
141            .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = 1000")
142            .await
143            .unwrap();
144        {
145            let reader = catalog_reader.read_guard();
146            let db = reader.get_database_by_name("test_db").unwrap();
147            assert_eq!(db.barrier_interval_ms, Some(1000));
148            assert!(db.checkpoint_frequency.is_none());
149        }
150
151        frontend
152            .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = 10")
153            .await
154            .unwrap();
155        {
156            let reader = catalog_reader.read_guard();
157            let db = reader.get_database_by_name("test_db").unwrap();
158            assert_eq!(db.barrier_interval_ms, Some(1000));
159            assert_eq!(db.checkpoint_frequency, Some(10));
160        }
161
162        frontend
163            .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = DEFAULT")
164            .await
165            .unwrap();
166        {
167            let reader = catalog_reader.read_guard();
168            let db = reader.get_database_by_name("test_db").unwrap();
169            assert!(db.barrier_interval_ms.is_none());
170            assert_eq!(db.checkpoint_frequency, Some(10));
171        }
172
173        frontend
174            .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = DEFAULT")
175            .await
176            .unwrap();
177        {
178            let reader = catalog_reader.read_guard();
179            let db = reader.get_database_by_name("test_db").unwrap();
180            assert!(db.barrier_interval_ms.is_none());
181            assert!(db.checkpoint_frequency.is_none());
182        }
183    }
184
185    #[tokio::test]
186    async fn test_alter_resource_group() {
187        let frontend = LocalFrontend::new(Default::default()).await;
188        let session = frontend.session_ref();
189        let catalog_reader = session.env().catalog_reader();
190
191        frontend.run_sql("CREATE DATABASE test_db").await.unwrap();
192
193        frontend
194            .run_sql("ALTER DATABASE test_db SET RESOURCE_GROUP = DEFAULT DEFERRED")
195            .await
196            .unwrap();
197        {
198            let reader = catalog_reader.read_guard();
199            let db = reader.get_database_by_name("test_db").unwrap();
200            assert_eq!(db.resource_group, DEFAULT_RESOURCE_GROUP);
201        }
202
203        frontend
204            .run_sql("ALTER DATABASE test_db RESET RESOURCE_GROUP DEFERRED")
205            .await
206            .unwrap();
207        {
208            let reader = catalog_reader.read_guard();
209            let db = reader.get_database_by_name("test_db").unwrap();
210            assert_eq!(db.resource_group, DEFAULT_RESOURCE_GROUP);
211        }
212
213        let err = frontend
214            .run_sql("ALTER DATABASE test_db SET RESOURCE_GROUP = 1 DEFERRED")
215            .await
216            .unwrap_err();
217        assert!(
218            err.to_string()
219                .contains("target resource group must be a valid string or default")
220        );
221    }
222}