risingwave_frontend/handler/
create_database.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
17use risingwave_sqlparser::ast::{ObjectName, SetVariableValue};
18
19use super::RwPgResponse;
20use crate::binder::Binder;
21use crate::catalog::CatalogError;
22use crate::error::ErrorCode::PermissionDenied;
23use crate::error::Result;
24use crate::handler::HandlerArgs;
25use crate::handler::alter_resource_group::resolve_resource_group;
26
27pub async fn handle_create_database(
28 handler_args: HandlerArgs,
29 database_name: ObjectName,
30 if_not_exist: bool,
31 owner: Option<ObjectName>,
32 resource_group: Option<SetVariableValue>,
33) -> Result<RwPgResponse> {
34 let session = handler_args.session;
35 let database_name = Binder::resolve_database_name(database_name)?;
36
37 {
38 let user_reader = session.env().user_info_reader();
39 let reader = user_reader.read_guard();
40 if let Some(info) = reader.get_user_by_name(&session.user_name()) {
41 if !info.can_create_db && !info.is_super {
42 return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
43 }
44 } else {
45 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
46 }
47 }
48
49 {
50 let catalog_reader = session.env().catalog_reader();
51 let reader = catalog_reader.read_guard();
52 if reader.get_database_by_name(&database_name).is_ok() {
53 return if if_not_exist {
55 Ok(PgResponse::builder(StatementType::CREATE_DATABASE)
56 .notice(format!("database \"{}\" exists, skipping", database_name))
57 .into())
58 } else {
59 Err(CatalogError::Duplicated("database", database_name, false).into())
60 };
61 }
62 }
63
64 let database_owner = if let Some(owner) = owner {
65 let owner = Binder::resolve_user_name(owner)?;
66 session
67 .env()
68 .user_info_reader()
69 .read_guard()
70 .get_user_by_name(&owner)
71 .map(|u| u.id)
72 .ok_or_else(|| CatalogError::NotFound("user", owner.clone()))?
73 } else {
74 session.user_id()
75 };
76
77 let resource_group = resource_group.map(resolve_resource_group).transpose()?;
78
79 let resource_group = resource_group.as_deref().unwrap_or(DEFAULT_RESOURCE_GROUP);
80
81 let catalog_writer = session.catalog_writer()?;
82 catalog_writer
83 .create_database(&database_name, database_owner, resource_group)
84 .await?;
85
86 Ok(PgResponse::empty_result(StatementType::CREATE_DATABASE))
87}
88
89#[cfg(test)]
90mod tests {
91 use crate::test_utils::LocalFrontend;
92
93 #[tokio::test]
94 async fn test_create_database() {
95 let frontend = LocalFrontend::new(Default::default()).await;
96 let session = frontend.session_ref();
97 let catalog_reader = session.env().catalog_reader();
98
99 frontend.run_sql("CREATE DATABASE database").await.unwrap();
100 {
101 let reader = catalog_reader.read_guard();
102 assert!(reader.get_database_by_name("database").is_ok());
103 }
104
105 frontend.run_sql("CREATE USER user WITH NOSUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
106 let user_id = {
107 let user_reader = session.env().user_info_reader();
108 user_reader
109 .read_guard()
110 .get_user_by_name("user")
111 .unwrap()
112 .id
113 };
114 let res = frontend
115 .run_user_sql(
116 "CREATE DATABASE database2",
117 "dev".to_owned(),
118 "user".to_owned(),
119 user_id,
120 )
121 .await;
122 assert!(res.is_err());
123
124 frontend.run_sql("CREATE USER user2 WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
125 let user_id = {
126 let user_reader = session.env().user_info_reader();
127 user_reader
128 .read_guard()
129 .get_user_by_name("user2")
130 .unwrap()
131 .id
132 };
133 frontend
134 .run_user_sql(
135 "CREATE DATABASE database2",
136 "dev".to_owned(),
137 "user2".to_owned(),
138 user_id,
139 )
140 .await
141 .unwrap();
142 {
143 let reader = catalog_reader.read_guard();
144 assert!(reader.get_database_by_name("database2").is_ok());
145 }
146 }
147}