risingwave_frontend/handler/
alter_database_param.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::catalog::AlterDatabaseParam;
17use risingwave_common::system_param::{NOTICE_BARRIER_INTERVAL_MS, NOTICE_CHECKPOINT_FREQUENCY};
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::error::Result;
23
24pub async fn handle_alter_database_param(
25 handler_args: HandlerArgs,
26 database_name: ObjectName,
27 param: AlterDatabaseParam,
28) -> Result<RwPgResponse> {
29 let mut builder = RwPgResponse::builder(StatementType::ALTER_DATABASE);
30
31 let session = handler_args.session;
32
33 let database_name = Binder::resolve_database_name(database_name)?;
34 let database_id = {
35 let catalog_reader = session.env().catalog_reader().read_guard();
36 let database = catalog_reader.get_database_by_name(&database_name)?;
37
38 session.check_privilege_for_drop_alter_db_schema(database)?;
40
41 database.id()
42 };
43
44 match param {
45 AlterDatabaseParam::BarrierIntervalMs(Some(interval)) => {
46 if !cfg!(test) {
47 risingwave_common::license::Feature::ResourceGroup.check_available()?;
48 }
49 if interval >= NOTICE_BARRIER_INTERVAL_MS {
50 builder = builder.notice(
51 format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
52 interval, NOTICE_BARRIER_INTERVAL_MS));
53 }
54 }
55 AlterDatabaseParam::CheckpointFrequency(Some(frequency)) => {
56 if !cfg!(test) {
57 risingwave_common::license::Feature::ResourceGroup.check_available()?;
58 }
59 if frequency >= NOTICE_CHECKPOINT_FREQUENCY {
60 builder = builder.notice(
61 format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
62 frequency, NOTICE_CHECKPOINT_FREQUENCY));
63 }
64 }
65 _ => {}
66 }
67
68 let catalog_writer = session.catalog_writer()?;
69 catalog_writer
70 .alter_database_param(database_id, param)
71 .await?;
72
73 Ok(builder.into())
74}
75
76#[cfg(test)]
77mod tests {
78 use crate::test_utils::LocalFrontend;
79
80 #[tokio::test]
81 async fn test_alter_barrier() {
82 let frontend = LocalFrontend::new(Default::default()).await;
83 let session = frontend.session_ref();
84 let catalog_reader = session.env().catalog_reader();
85
86 frontend.run_sql("CREATE DATABASE test_db").await.unwrap();
87 {
88 let reader = catalog_reader.read_guard();
89 let db = reader.get_database_by_name("test_db").unwrap();
90 assert!(db.barrier_interval_ms.is_none());
91 assert!(db.checkpoint_frequency.is_none());
92 }
93
94 frontend
95 .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = 1000")
96 .await
97 .unwrap();
98 {
99 let reader = catalog_reader.read_guard();
100 let db = reader.get_database_by_name("test_db").unwrap();
101 assert_eq!(db.barrier_interval_ms, Some(1000));
102 assert!(db.checkpoint_frequency.is_none());
103 }
104
105 frontend
106 .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = 10")
107 .await
108 .unwrap();
109 {
110 let reader = catalog_reader.read_guard();
111 let db = reader.get_database_by_name("test_db").unwrap();
112 assert_eq!(db.barrier_interval_ms, Some(1000));
113 assert_eq!(db.checkpoint_frequency, Some(10));
114 }
115
116 frontend
117 .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = DEFAULT")
118 .await
119 .unwrap();
120 {
121 let reader = catalog_reader.read_guard();
122 let db = reader.get_database_by_name("test_db").unwrap();
123 assert!(db.barrier_interval_ms.is_none());
124 assert_eq!(db.checkpoint_frequency, Some(10));
125 }
126
127 frontend
128 .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = DEFAULT")
129 .await
130 .unwrap();
131 {
132 let reader = catalog_reader.read_guard();
133 let db = reader.get_database_by_name("test_db").unwrap();
134 assert!(db.barrier_interval_ms.is_none());
135 assert!(db.checkpoint_frequency.is_none());
136 }
137 }
138}