risingwave_frontend/handler/
create_database.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
17use risingwave_sqlparser::ast::{ObjectName, SetVariableValue};
18
19use super::RwPgResponse;
20use crate::binder::Binder;
21use crate::catalog::CatalogError;
22use crate::error::ErrorCode::PermissionDenied;
23use crate::error::Result;
24use crate::handler::HandlerArgs;
25use crate::handler::alter_resource_group::resolve_resource_group;
26
27pub async fn handle_create_database(
28 handler_args: HandlerArgs,
29 database_name: ObjectName,
30 if_not_exist: bool,
31 owner: Option<ObjectName>,
32 resource_group: Option<SetVariableValue>,
33) -> Result<RwPgResponse> {
34 let session = handler_args.session;
35 let database_name = Binder::resolve_database_name(database_name)?;
36
37 {
38 let user_reader = session.env().user_info_reader();
39 let reader = user_reader.read_guard();
40 if let Some(info) = reader.get_user_by_name(&session.user_name()) {
41 if !info.can_create_db && !info.is_super {
42 return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
43 }
44 } else {
45 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
46 }
47 }
48
49 {
50 let catalog_reader = session.env().catalog_reader();
51 let reader = catalog_reader.read_guard();
52 if reader.get_database_by_name(&database_name).is_ok() {
53 return if if_not_exist {
55 Ok(PgResponse::builder(StatementType::CREATE_DATABASE)
56 .notice(format!("database \"{}\" exists, skipping", database_name))
57 .into())
58 } else {
59 Err(CatalogError::Duplicated("database", database_name, false).into())
60 };
61 }
62 }
63
64 let database_owner = if let Some(owner) = owner {
65 let owner = Binder::resolve_user_name(owner)?;
66 session
67 .env()
68 .user_info_reader()
69 .read_guard()
70 .get_user_by_name(&owner)
71 .map(|u| u.id)
72 .ok_or_else(|| CatalogError::NotFound("user", owner.clone()))?
73 } else {
74 session.user_id()
75 };
76
77 let resource_group = resource_group
78 .map(resolve_resource_group)
79 .transpose()?
80 .flatten();
81
82 if resource_group.is_some() {
83 risingwave_common::license::Feature::ResourceGroup
84 .check_available()
85 .map_err(|e| anyhow::anyhow!(e))?;
86 }
87
88 let resource_group = resource_group.as_deref().unwrap_or(DEFAULT_RESOURCE_GROUP);
89
90 let catalog_writer = session.catalog_writer()?;
91 catalog_writer
92 .create_database(&database_name, database_owner, resource_group)
93 .await?;
94
95 Ok(PgResponse::empty_result(StatementType::CREATE_DATABASE))
96}
97
98#[cfg(test)]
99mod tests {
100 use crate::test_utils::LocalFrontend;
101
102 #[tokio::test]
103 async fn test_create_database() {
104 let frontend = LocalFrontend::new(Default::default()).await;
105 let session = frontend.session_ref();
106 let catalog_reader = session.env().catalog_reader();
107
108 frontend.run_sql("CREATE DATABASE database").await.unwrap();
109 {
110 let reader = catalog_reader.read_guard();
111 assert!(reader.get_database_by_name("database").is_ok());
112 }
113
114 frontend.run_sql("CREATE USER user WITH NOSUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
115 let user_id = {
116 let user_reader = session.env().user_info_reader();
117 user_reader
118 .read_guard()
119 .get_user_by_name("user")
120 .unwrap()
121 .id
122 };
123 let res = frontend
124 .run_user_sql(
125 "CREATE DATABASE database2",
126 "dev".to_owned(),
127 "user".to_owned(),
128 user_id,
129 )
130 .await;
131 assert!(res.is_err());
132
133 frontend.run_sql("CREATE USER user2 WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
134 let user_id = {
135 let user_reader = session.env().user_info_reader();
136 user_reader
137 .read_guard()
138 .get_user_by_name("user2")
139 .unwrap()
140 .id
141 };
142 frontend
143 .run_user_sql(
144 "CREATE DATABASE database2",
145 "dev".to_owned(),
146 "user2".to_owned(),
147 user_id,
148 )
149 .await
150 .unwrap();
151 {
152 let reader = catalog_reader.read_guard();
153 assert!(reader.get_database_by_name("database2").is_ok());
154 }
155 }
156}