risingwave_frontend/handler/
create_database.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::system_param::{NOTICE_BARRIER_INTERVAL_MS, NOTICE_CHECKPOINT_FREQUENCY};
17use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
18use risingwave_sqlparser::ast::{ObjectName, SetVariableValue};
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::CatalogError;
23use crate::error::ErrorCode::PermissionDenied;
24use crate::error::Result;
25use crate::handler::HandlerArgs;
26use crate::handler::alter_resource_group::resolve_resource_group;
27
28pub async fn handle_create_database(
29 handler_args: HandlerArgs,
30 database_name: ObjectName,
31 if_not_exist: bool,
32 owner: Option<ObjectName>,
33 resource_group: Option<SetVariableValue>,
34 barrier_interval_ms: Option<u32>,
35 checkpoint_frequency: Option<u64>,
36) -> Result<RwPgResponse> {
37 let mut builder = RwPgResponse::builder(StatementType::CREATE_DATABASE);
38
39 let session = handler_args.session;
40 let database_name = Binder::resolve_database_name(database_name)?;
41
42 {
43 let user_reader = session.env().user_info_reader();
44 let reader = user_reader.read_guard();
45 if let Some(info) = reader.get_user_by_name(&session.user_name()) {
46 if !info.can_create_db && !info.is_super {
47 return Err(
48 PermissionDenied("permission denied to create database".to_owned()).into(),
49 );
50 }
51 } else {
52 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
53 }
54 }
55
56 {
57 let catalog_reader = session.env().catalog_reader();
58 let reader = catalog_reader.read_guard();
59 if reader.get_database_by_name(&database_name).is_ok() {
60 return if if_not_exist {
62 Ok(builder
63 .notice(format!("database \"{}\" exists, skipping", database_name))
64 .into())
65 } else {
66 Err(CatalogError::duplicated("database", database_name).into())
67 };
68 }
69 }
70
71 let database_owner = if let Some(owner) = owner {
72 let owner = Binder::resolve_user_name(owner)?;
73 session
74 .env()
75 .user_info_reader()
76 .read_guard()
77 .get_user_by_name(&owner)
78 .map(|u| u.id)
79 .ok_or_else(|| CatalogError::NotFound("user", owner.clone()))?
80 } else {
81 session.user_id()
82 };
83
84 let resource_group = resource_group
85 .map(resolve_resource_group)
86 .transpose()?
87 .flatten();
88
89 if resource_group.is_some() {
90 risingwave_common::license::Feature::ResourceGroup.check_available()?;
91 }
92
93 let resource_group = resource_group.as_deref().unwrap_or(DEFAULT_RESOURCE_GROUP);
94
95 if barrier_interval_ms.is_some() || checkpoint_frequency.is_some() {
96 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()?;
97 }
98 if let Some(interval) = barrier_interval_ms
99 && interval >= NOTICE_BARRIER_INTERVAL_MS
100 {
101 builder = builder.notice(
102 format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
103 interval, NOTICE_BARRIER_INTERVAL_MS));
104 }
105 if let Some(frequency) = checkpoint_frequency
106 && frequency >= NOTICE_CHECKPOINT_FREQUENCY
107 {
108 builder = builder.notice(
109 format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
110 frequency, NOTICE_CHECKPOINT_FREQUENCY));
111 }
112
113 let catalog_writer = session.catalog_writer()?;
114 catalog_writer
115 .create_database(
116 &database_name,
117 database_owner,
118 resource_group,
119 barrier_interval_ms,
120 checkpoint_frequency,
121 )
122 .await?;
123
124 Ok(builder.into())
125}
126
127#[cfg(test)]
128mod tests {
129 use crate::test_utils::LocalFrontend;
130
131 #[tokio::test]
132 async fn test_create_database() {
133 let frontend = LocalFrontend::new(Default::default()).await;
134 let session = frontend.session_ref();
135 let catalog_reader = session.env().catalog_reader();
136
137 frontend.run_sql("CREATE DATABASE database").await.unwrap();
138 {
139 let reader = catalog_reader.read_guard();
140 assert!(reader.get_database_by_name("database").is_ok());
141 }
142
143 frontend.run_sql("CREATE USER user WITH NOSUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
144 let user_id = {
145 let user_reader = session.env().user_info_reader();
146 user_reader
147 .read_guard()
148 .get_user_by_name("user")
149 .unwrap()
150 .id
151 };
152 let res = frontend
153 .run_user_sql(
154 "CREATE DATABASE database2",
155 "dev".to_owned(),
156 "user".to_owned(),
157 user_id,
158 )
159 .await;
160 assert!(res.is_err());
161
162 frontend.run_sql("CREATE USER user2 WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
163 let user_id = {
164 let user_reader = session.env().user_info_reader();
165 user_reader
166 .read_guard()
167 .get_user_by_name("user2")
168 .unwrap()
169 .id
170 };
171 frontend
172 .run_user_sql(
173 "CREATE DATABASE database2",
174 "dev".to_owned(),
175 "user2".to_owned(),
176 user_id,
177 )
178 .await
179 .unwrap();
180 {
181 let reader = catalog_reader.read_guard();
182 assert!(reader.get_database_by_name("database2").is_ok());
183 }
184 }
185}