risingwave_frontend/handler/
create_schema.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::RESERVED_PG_SCHEMA_PREFIX;
18use risingwave_pb::user::grant_privilege::Object;
19use risingwave_sqlparser::ast::ObjectName;
20
21use super::RwPgResponse;
22use crate::binder::Binder;
23use crate::catalog::{CatalogError, OwnedByUserCatalog};
24use crate::error::{ErrorCode, Result};
25use crate::handler::HandlerArgs;
26use crate::handler::privilege::ObjectCheckItem;
27
28pub async fn handle_create_schema(
29 handler_args: HandlerArgs,
30 schema_name: ObjectName,
31 if_not_exist: bool,
32 owner: Option<ObjectName>,
33) -> Result<RwPgResponse> {
34 let session = handler_args.session;
35 let database_name = &session.database();
36 let schema_name = Binder::resolve_schema_name(schema_name)?;
37
38 if schema_name.starts_with(RESERVED_PG_SCHEMA_PREFIX) {
39 return Err(ErrorCode::ProtocolError(format!(
40 "unacceptable schema name \"{}\", The prefix \"{}\" is reserved for system schemas",
41 schema_name, RESERVED_PG_SCHEMA_PREFIX
42 ))
43 .into());
44 }
45
46 let (db_id, db_owner) = {
47 let catalog_reader = session.env().catalog_reader();
48 let reader = catalog_reader.read_guard();
49 if reader
50 .get_schema_by_name(database_name, &schema_name)
51 .is_ok()
52 {
53 return if if_not_exist {
55 Ok(PgResponse::builder(StatementType::CREATE_SCHEMA)
56 .notice(format!("schema \"{}\" exists, skipping", schema_name))
57 .into())
58 } else {
59 Err(CatalogError::duplicated("schema", schema_name).into())
60 };
61 }
62 let db = reader.get_database_by_name(database_name)?;
63 (db.id(), db.owner())
64 };
65
66 let schema_owner = if let Some(owner) = owner {
67 let owner = Binder::resolve_user_name(owner)?;
68 session
69 .env()
70 .user_info_reader()
71 .read_guard()
72 .get_user_by_name(&owner)
73 .map(|u| u.id)
74 .ok_or_else(|| CatalogError::NotFound("user", owner.clone()))?
75 } else {
76 session.user_id()
77 };
78
79 session.check_privileges(&[ObjectCheckItem::new(
80 db_owner,
81 AclMode::Create,
82 Object::DatabaseId(db_id),
83 )])?;
84
85 let catalog_writer = session.catalog_writer()?;
86 catalog_writer
87 .create_schema(db_id, &schema_name, schema_owner)
88 .await?;
89 Ok(PgResponse::empty_result(StatementType::CREATE_SCHEMA))
90}
91
92#[cfg(test)]
93mod tests {
94 use risingwave_common::catalog::DEFAULT_DATABASE_NAME;
95
96 use crate::test_utils::LocalFrontend;
97
98 #[tokio::test]
99 async fn test_create_schema() {
100 let frontend = LocalFrontend::new(Default::default()).await;
101 let session = frontend.session_ref();
102 let catalog_reader = session.env().catalog_reader();
103
104 frontend.run_sql("CREATE SCHEMA schema").await.unwrap();
105
106 let schema = catalog_reader
107 .read_guard()
108 .get_schema_by_name(DEFAULT_DATABASE_NAME, "schema")
109 .ok()
110 .cloned();
111 assert!(schema.is_some());
112 }
113}