risingwave_frontend/handler/
create_database.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
17use risingwave_sqlparser::ast::{ObjectName, SetVariableValue};
18
19use super::RwPgResponse;
20use crate::binder::Binder;
21use crate::catalog::CatalogError;
22use crate::error::ErrorCode::PermissionDenied;
23use crate::error::Result;
24use crate::handler::HandlerArgs;
25use crate::handler::alter_resource_group::resolve_resource_group;
26
27pub async fn handle_create_database(
28 handler_args: HandlerArgs,
29 database_name: ObjectName,
30 if_not_exist: bool,
31 owner: Option<ObjectName>,
32 resource_group: Option<SetVariableValue>,
33 barrier_interval_ms: Option<u32>,
34 checkpoint_frequency: Option<u64>,
35) -> Result<RwPgResponse> {
36 let session = handler_args.session;
37 let database_name = Binder::resolve_database_name(database_name)?;
38
39 {
40 let user_reader = session.env().user_info_reader();
41 let reader = user_reader.read_guard();
42 if let Some(info) = reader.get_user_by_name(&session.user_name()) {
43 if !info.can_create_db && !info.is_super {
44 return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
45 }
46 } else {
47 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
48 }
49 }
50
51 {
52 let catalog_reader = session.env().catalog_reader();
53 let reader = catalog_reader.read_guard();
54 if reader.get_database_by_name(&database_name).is_ok() {
55 return if if_not_exist {
57 Ok(PgResponse::builder(StatementType::CREATE_DATABASE)
58 .notice(format!("database \"{}\" exists, skipping", database_name))
59 .into())
60 } else {
61 Err(CatalogError::duplicated("database", database_name).into())
62 };
63 }
64 }
65
66 let database_owner = if let Some(owner) = owner {
67 let owner = Binder::resolve_user_name(owner)?;
68 session
69 .env()
70 .user_info_reader()
71 .read_guard()
72 .get_user_by_name(&owner)
73 .map(|u| u.id)
74 .ok_or_else(|| CatalogError::NotFound("user", owner.clone()))?
75 } else {
76 session.user_id()
77 };
78
79 let resource_group = resource_group
80 .map(resolve_resource_group)
81 .transpose()?
82 .flatten();
83
84 if resource_group.is_some() {
85 risingwave_common::license::Feature::ResourceGroup
86 .check_available()
87 .map_err(|e| anyhow::anyhow!(e))?;
88 }
89
90 let resource_group = resource_group.as_deref().unwrap_or(DEFAULT_RESOURCE_GROUP);
91
92 let catalog_writer = session.catalog_writer()?;
93 catalog_writer
94 .create_database(
95 &database_name,
96 database_owner,
97 resource_group,
98 barrier_interval_ms,
99 checkpoint_frequency,
100 )
101 .await?;
102
103 Ok(PgResponse::empty_result(StatementType::CREATE_DATABASE))
104}
105
106#[cfg(test)]
107mod tests {
108 use crate::test_utils::LocalFrontend;
109
110 #[tokio::test]
111 async fn test_create_database() {
112 let frontend = LocalFrontend::new(Default::default()).await;
113 let session = frontend.session_ref();
114 let catalog_reader = session.env().catalog_reader();
115
116 frontend.run_sql("CREATE DATABASE database").await.unwrap();
117 {
118 let reader = catalog_reader.read_guard();
119 assert!(reader.get_database_by_name("database").is_ok());
120 }
121
122 frontend.run_sql("CREATE USER user WITH NOSUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
123 let user_id = {
124 let user_reader = session.env().user_info_reader();
125 user_reader
126 .read_guard()
127 .get_user_by_name("user")
128 .unwrap()
129 .id
130 };
131 let res = frontend
132 .run_user_sql(
133 "CREATE DATABASE database2",
134 "dev".to_owned(),
135 "user".to_owned(),
136 user_id,
137 )
138 .await;
139 assert!(res.is_err());
140
141 frontend.run_sql("CREATE USER user2 WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
142 let user_id = {
143 let user_reader = session.env().user_info_reader();
144 user_reader
145 .read_guard()
146 .get_user_by_name("user2")
147 .unwrap()
148 .id
149 };
150 frontend
151 .run_user_sql(
152 "CREATE DATABASE database2",
153 "dev".to_owned(),
154 "user2".to_owned(),
155 user_id,
156 )
157 .await
158 .unwrap();
159 {
160 let reader = catalog_reader.read_guard();
161 assert!(reader.get_database_by_name("database2").is_ok());
162 }
163 }
164}