risingwave_frontend/handler/
create_database.rsuse pgwire::pg_response::{PgResponse, StatementType};
use risingwave_sqlparser::ast::ObjectName;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::CatalogError;
use crate::error::ErrorCode::PermissionDenied;
use crate::error::Result;
use crate::handler::HandlerArgs;
pub async fn handle_create_database(
handler_args: HandlerArgs,
database_name: ObjectName,
if_not_exist: bool,
owner: Option<ObjectName>,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let database_name = Binder::resolve_database_name(database_name)?;
{
let user_reader = session.env().user_info_reader();
let reader = user_reader.read_guard();
if let Some(info) = reader.get_user_by_name(session.user_name()) {
if !info.can_create_db && !info.is_super {
return Err(PermissionDenied("Do not have the privilege".to_string()).into());
}
} else {
return Err(PermissionDenied("Session user is invalid".to_string()).into());
}
}
{
let catalog_reader = session.env().catalog_reader();
let reader = catalog_reader.read_guard();
if reader.get_database_by_name(&database_name).is_ok() {
return if if_not_exist {
Ok(PgResponse::builder(StatementType::CREATE_DATABASE)
.notice(format!("database \"{}\" exists, skipping", database_name))
.into())
} else {
Err(CatalogError::Duplicated("database", database_name).into())
};
}
}
let database_owner = if let Some(owner) = owner {
let owner = Binder::resolve_user_name(owner)?;
session
.env()
.user_info_reader()
.read_guard()
.get_user_by_name(&owner)
.map(|u| u.id)
.ok_or_else(|| CatalogError::NotFound("user", owner.to_string()))?
} else {
session.user_id()
};
let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_database(&database_name, database_owner)
.await?;
Ok(PgResponse::empty_result(StatementType::CREATE_DATABASE))
}
#[cfg(test)]
mod tests {
use crate::test_utils::LocalFrontend;
#[tokio::test]
async fn test_create_database() {
let frontend = LocalFrontend::new(Default::default()).await;
let session = frontend.session_ref();
let catalog_reader = session.env().catalog_reader();
frontend.run_sql("CREATE DATABASE database").await.unwrap();
{
let reader = catalog_reader.read_guard();
assert!(reader.get_database_by_name("database").is_ok());
}
frontend.run_sql("CREATE USER user WITH NOSUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
let user_id = {
let user_reader = session.env().user_info_reader();
user_reader
.read_guard()
.get_user_by_name("user")
.unwrap()
.id
};
let res = frontend
.run_user_sql(
"CREATE DATABASE database2",
"dev".to_string(),
"user".to_string(),
user_id,
)
.await;
assert!(res.is_err());
frontend.run_sql("CREATE USER user2 WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
let user_id = {
let user_reader = session.env().user_info_reader();
user_reader
.read_guard()
.get_user_by_name("user2")
.unwrap()
.id
};
frontend
.run_user_sql(
"CREATE DATABASE database2",
"dev".to_string(),
"user2".to_string(),
user_id,
)
.await
.unwrap();
{
let reader = catalog_reader.read_guard();
assert!(reader.get_database_by_name("database2").is_ok());
}
}
}