risingwave_frontend/handler/
alter_database_param.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::catalog::AlterDatabaseParam;
17use risingwave_common::system_param::{NOTICE_BARRIER_INTERVAL_MS, NOTICE_CHECKPOINT_FREQUENCY};
18use risingwave_sqlparser::ast::{ObjectName, SetVariableValue};
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::error::Result;
23use crate::handler::alter_resource_group::resolve_resource_group;
24
25pub async fn handle_alter_database_param(
26 handler_args: HandlerArgs,
27 database_name: ObjectName,
28 param: AlterDatabaseParam,
29) -> Result<RwPgResponse> {
30 let mut builder = RwPgResponse::builder(StatementType::ALTER_DATABASE);
31
32 let session = handler_args.session;
33
34 let database_name = Binder::resolve_database_name(database_name)?;
35 let database_id = {
36 let catalog_reader = session.env().catalog_reader().read_guard();
37 let database = catalog_reader.get_database_by_name(&database_name)?;
38
39 session.check_privilege_for_drop_alter_db_schema(database)?;
41
42 database.id()
43 };
44
45 match param {
46 AlterDatabaseParam::BarrierIntervalMs(Some(interval)) => {
47 if !cfg!(test) {
48 risingwave_common::license::Feature::ResourceGroup.check_available()?;
49 }
50 if interval >= NOTICE_BARRIER_INTERVAL_MS {
51 builder = builder.notice(
52 format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
53 interval, NOTICE_BARRIER_INTERVAL_MS));
54 }
55 }
56 AlterDatabaseParam::CheckpointFrequency(Some(frequency)) => {
57 if !cfg!(test) {
58 risingwave_common::license::Feature::ResourceGroup.check_available()?;
59 }
60 if frequency >= NOTICE_CHECKPOINT_FREQUENCY {
61 builder = builder.notice(
62 format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
63 frequency, NOTICE_CHECKPOINT_FREQUENCY));
64 }
65 }
66 _ => {}
67 }
68
69 let catalog_writer = session.catalog_writer()?;
70 catalog_writer
71 .alter_database_param(database_id, param)
72 .await?;
73
74 Ok(builder.into())
75}
76
77pub async fn handle_alter_database_resource_group(
78 handler_args: HandlerArgs,
79 database_name: ObjectName,
80 resource_group: Option<SetVariableValue>,
81 _deferred: bool,
82) -> Result<RwPgResponse> {
83 let mut builder = RwPgResponse::builder(StatementType::ALTER_DATABASE);
84
85 let session = handler_args.session;
86
87 let database_name = Binder::resolve_database_name(database_name)?;
88 let database_id = {
89 let catalog_reader = session.env().catalog_reader().read_guard();
90 let database = catalog_reader.get_database_by_name(&database_name)?;
91
92 session.check_privilege_for_drop_alter_db_schema(database)?;
94
95 database.id()
96 };
97
98 let resource_group = resource_group
99 .map(resolve_resource_group)
100 .transpose()?
101 .flatten();
102
103 if resource_group.is_some() {
104 risingwave_common::license::Feature::ResourceGroup.check_available()?;
105 }
106
107 let catalog_writer = session.catalog_writer()?;
108 catalog_writer
109 .alter_database_resource_group(database_id, resource_group, _deferred)
110 .await?;
111
112 builder = builder.notice(
113 "The database resource group metadata has been updated, but it will not take effect immediately. Manually trigger recovery for inherited streaming jobs to use the new database resource group."
114 .to_owned(),
115 );
116
117 Ok(builder.into())
118}
119
120#[cfg(test)]
121mod tests {
122 use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
123
124 use crate::test_utils::LocalFrontend;
125
126 #[tokio::test]
127 async fn test_alter_barrier() {
128 let frontend = LocalFrontend::new(Default::default()).await;
129 let session = frontend.session_ref();
130 let catalog_reader = session.env().catalog_reader();
131
132 frontend.run_sql("CREATE DATABASE test_db").await.unwrap();
133 {
134 let reader = catalog_reader.read_guard();
135 let db = reader.get_database_by_name("test_db").unwrap();
136 assert!(db.barrier_interval_ms.is_none());
137 assert!(db.checkpoint_frequency.is_none());
138 }
139
140 frontend
141 .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = 1000")
142 .await
143 .unwrap();
144 {
145 let reader = catalog_reader.read_guard();
146 let db = reader.get_database_by_name("test_db").unwrap();
147 assert_eq!(db.barrier_interval_ms, Some(1000));
148 assert!(db.checkpoint_frequency.is_none());
149 }
150
151 frontend
152 .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = 10")
153 .await
154 .unwrap();
155 {
156 let reader = catalog_reader.read_guard();
157 let db = reader.get_database_by_name("test_db").unwrap();
158 assert_eq!(db.barrier_interval_ms, Some(1000));
159 assert_eq!(db.checkpoint_frequency, Some(10));
160 }
161
162 frontend
163 .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = DEFAULT")
164 .await
165 .unwrap();
166 {
167 let reader = catalog_reader.read_guard();
168 let db = reader.get_database_by_name("test_db").unwrap();
169 assert!(db.barrier_interval_ms.is_none());
170 assert_eq!(db.checkpoint_frequency, Some(10));
171 }
172
173 frontend
174 .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = DEFAULT")
175 .await
176 .unwrap();
177 {
178 let reader = catalog_reader.read_guard();
179 let db = reader.get_database_by_name("test_db").unwrap();
180 assert!(db.barrier_interval_ms.is_none());
181 assert!(db.checkpoint_frequency.is_none());
182 }
183 }
184
185 #[tokio::test]
186 async fn test_alter_resource_group() {
187 let frontend = LocalFrontend::new(Default::default()).await;
188 let session = frontend.session_ref();
189 let catalog_reader = session.env().catalog_reader();
190
191 frontend.run_sql("CREATE DATABASE test_db").await.unwrap();
192
193 frontend
194 .run_sql("ALTER DATABASE test_db SET RESOURCE_GROUP = DEFAULT DEFERRED")
195 .await
196 .unwrap();
197 {
198 let reader = catalog_reader.read_guard();
199 let db = reader.get_database_by_name("test_db").unwrap();
200 assert_eq!(db.resource_group, DEFAULT_RESOURCE_GROUP);
201 }
202
203 frontend
204 .run_sql("ALTER DATABASE test_db RESET RESOURCE_GROUP DEFERRED")
205 .await
206 .unwrap();
207 {
208 let reader = catalog_reader.read_guard();
209 let db = reader.get_database_by_name("test_db").unwrap();
210 assert_eq!(db.resource_group, DEFAULT_RESOURCE_GROUP);
211 }
212
213 let err = frontend
214 .run_sql("ALTER DATABASE test_db SET RESOURCE_GROUP = 1 DEFERRED")
215 .await
216 .unwrap_err();
217 assert!(
218 err.to_string()
219 .contains("target resource group must be a valid string or default")
220 );
221 }
222}