risingwave_frontend/handler/
create_schema.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::RESERVED_PG_SCHEMA_PREFIX;
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::{CatalogError, OwnedByUserCatalog};
23use crate::error::{ErrorCode, Result};
24use crate::handler::HandlerArgs;
25use crate::handler::privilege::ObjectCheckItem;
26
27pub async fn handle_create_schema(
28 handler_args: HandlerArgs,
29 schema_name: ObjectName,
30 if_not_exist: bool,
31 owner: Option<ObjectName>,
32) -> Result<RwPgResponse> {
33 let session = handler_args.session;
34 let database_name = &session.database();
35 let schema_name = Binder::resolve_schema_name(schema_name)?;
36
37 if schema_name.starts_with(RESERVED_PG_SCHEMA_PREFIX) {
38 return Err(ErrorCode::ProtocolError(format!(
39 "unacceptable schema name \"{}\", The prefix \"{}\" is reserved for system schemas",
40 schema_name, RESERVED_PG_SCHEMA_PREFIX
41 ))
42 .into());
43 }
44
45 let (db_id, db_name, db_owner) = {
46 let catalog_reader = session.env().catalog_reader();
47 let reader = catalog_reader.read_guard();
48 if reader
49 .get_schema_by_name(database_name, &schema_name)
50 .is_ok()
51 {
52 return if if_not_exist {
54 Ok(PgResponse::builder(StatementType::CREATE_SCHEMA)
55 .notice(format!("schema \"{}\" exists, skipping", schema_name))
56 .into())
57 } else {
58 Err(CatalogError::duplicated("schema", schema_name).into())
59 };
60 }
61 let db = reader.get_database_by_name(database_name)?;
62 (db.id(), db.name.clone(), db.owner())
63 };
64
65 let schema_owner = if let Some(owner) = owner {
66 let owner = Binder::resolve_user_name(owner)?;
67 session
68 .env()
69 .user_info_reader()
70 .read_guard()
71 .get_user_by_name(&owner)
72 .map(|u| u.id)
73 .ok_or_else(|| CatalogError::NotFound("user", owner.clone()))?
74 } else {
75 session.user_id()
76 };
77
78 session.check_privileges(&[ObjectCheckItem::new(
79 db_owner,
80 AclMode::Create,
81 db_name,
82 db_id,
83 )])?;
84
85 let catalog_writer = session.catalog_writer()?;
86 catalog_writer
87 .create_schema(db_id, &schema_name, schema_owner)
88 .await?;
89 Ok(PgResponse::empty_result(StatementType::CREATE_SCHEMA))
90}
91
92#[cfg(test)]
93mod tests {
94 use risingwave_common::catalog::DEFAULT_DATABASE_NAME;
95
96 use crate::test_utils::LocalFrontend;
97
98 #[tokio::test]
99 async fn test_create_schema() {
100 let frontend = LocalFrontend::new(Default::default()).await;
101 let session = frontend.session_ref();
102 let catalog_reader = session.env().catalog_reader();
103
104 frontend.run_sql("CREATE SCHEMA schema").await.unwrap();
105
106 let schema = catalog_reader
107 .read_guard()
108 .get_schema_by_name(DEFAULT_DATABASE_NAME, "schema")
109 .ok()
110 .cloned();
111 assert!(schema.is_some());
112 }
113}