risingwave_frontend/handler/
alter_database_param.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::catalog::AlterDatabaseParam;
17use risingwave_common::system_param::{NOTICE_BARRIER_INTERVAL_MS, NOTICE_CHECKPOINT_FREQUENCY};
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::error::Result;
23
24pub async fn handle_alter_database_param(
25 handler_args: HandlerArgs,
26 database_name: ObjectName,
27 param: AlterDatabaseParam,
28) -> Result<RwPgResponse> {
29 let mut builder = RwPgResponse::builder(StatementType::ALTER_DATABASE);
30
31 let session = handler_args.session;
32
33 let database_name = Binder::resolve_database_name(database_name)?;
34 let database_id = {
35 let catalog_reader = session.env().catalog_reader().read_guard();
36 let database = catalog_reader.get_database_by_name(&database_name)?;
37
38 session.check_privilege_for_drop_alter_db_schema(database)?;
40
41 database.id()
42 };
43
44 match param {
45 AlterDatabaseParam::BarrierIntervalMs(Some(interval)) => {
46 if interval >= NOTICE_BARRIER_INTERVAL_MS {
47 builder = builder.notice(
48 format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
49 interval, NOTICE_BARRIER_INTERVAL_MS));
50 }
51 }
52 AlterDatabaseParam::CheckpointFrequency(Some(frequency)) => {
53 if frequency >= NOTICE_CHECKPOINT_FREQUENCY {
54 builder = builder.notice(
55 format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
56 frequency, NOTICE_CHECKPOINT_FREQUENCY));
57 }
58 }
59 _ => {}
60 }
61
62 let catalog_writer = session.catalog_writer()?;
63 catalog_writer
64 .alter_database_param(database_id, param)
65 .await?;
66
67 Ok(builder.into())
68}
69
70#[cfg(test)]
71mod tests {
72 use crate::test_utils::LocalFrontend;
73
74 #[tokio::test]
75 async fn test_alter_barrier() {
76 let frontend = LocalFrontend::new(Default::default()).await;
77 let session = frontend.session_ref();
78 let catalog_reader = session.env().catalog_reader();
79
80 frontend.run_sql("CREATE DATABASE test_db").await.unwrap();
81 {
82 let reader = catalog_reader.read_guard();
83 let db = reader.get_database_by_name("test_db").unwrap();
84 assert!(db.barrier_interval_ms.is_none());
85 assert!(db.checkpoint_frequency.is_none());
86 }
87
88 frontend
89 .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = 1000")
90 .await
91 .unwrap();
92 {
93 let reader = catalog_reader.read_guard();
94 let db = reader.get_database_by_name("test_db").unwrap();
95 assert_eq!(db.barrier_interval_ms, Some(1000));
96 assert!(db.checkpoint_frequency.is_none());
97 }
98
99 frontend
100 .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = 10")
101 .await
102 .unwrap();
103 {
104 let reader = catalog_reader.read_guard();
105 let db = reader.get_database_by_name("test_db").unwrap();
106 assert_eq!(db.barrier_interval_ms, Some(1000));
107 assert_eq!(db.checkpoint_frequency, Some(10));
108 }
109
110 frontend
111 .run_sql("ALTER DATABASE test_db SET BARRIER_INTERVAL_MS = DEFAULT")
112 .await
113 .unwrap();
114 {
115 let reader = catalog_reader.read_guard();
116 let db = reader.get_database_by_name("test_db").unwrap();
117 assert!(db.barrier_interval_ms.is_none());
118 assert_eq!(db.checkpoint_frequency, Some(10));
119 }
120
121 frontend
122 .run_sql("ALTER DATABASE test_db SET CHECKPOINT_FREQUENCY = DEFAULT")
123 .await
124 .unwrap();
125 {
126 let reader = catalog_reader.read_guard();
127 let db = reader.get_database_by_name("test_db").unwrap();
128 assert!(db.barrier_interval_ms.is_none());
129 assert!(db.checkpoint_frequency.is_none());
130 }
131 }
132}