risingwave_frontend/handler/
create_database.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
17use risingwave_sqlparser::ast::{ObjectName, SetVariableValue};
18
19use super::RwPgResponse;
20use crate::binder::Binder;
21use crate::catalog::CatalogError;
22use crate::error::ErrorCode::PermissionDenied;
23use crate::error::Result;
24use crate::handler::HandlerArgs;
25use crate::handler::alter_resource_group::resolve_resource_group;
26
27pub async fn handle_create_database(
28    handler_args: HandlerArgs,
29    database_name: ObjectName,
30    if_not_exist: bool,
31    owner: Option<ObjectName>,
32    resource_group: Option<SetVariableValue>,
33    barrier_interval_ms: Option<u32>,
34    checkpoint_frequency: Option<u64>,
35) -> Result<RwPgResponse> {
36    let session = handler_args.session;
37    let database_name = Binder::resolve_database_name(database_name)?;
38
39    {
40        let user_reader = session.env().user_info_reader();
41        let reader = user_reader.read_guard();
42        if let Some(info) = reader.get_user_by_name(&session.user_name()) {
43            if !info.can_create_db && !info.is_super {
44                return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
45            }
46        } else {
47            return Err(PermissionDenied("Session user is invalid".to_owned()).into());
48        }
49    }
50
51    {
52        let catalog_reader = session.env().catalog_reader();
53        let reader = catalog_reader.read_guard();
54        if reader.get_database_by_name(&database_name).is_ok() {
55            // If `if_not_exist` is true, not return error.
56            return if if_not_exist {
57                Ok(PgResponse::builder(StatementType::CREATE_DATABASE)
58                    .notice(format!("database \"{}\" exists, skipping", database_name))
59                    .into())
60            } else {
61                Err(CatalogError::duplicated("database", database_name).into())
62            };
63        }
64    }
65
66    let database_owner = if let Some(owner) = owner {
67        let owner = Binder::resolve_user_name(owner)?;
68        session
69            .env()
70            .user_info_reader()
71            .read_guard()
72            .get_user_by_name(&owner)
73            .map(|u| u.id)
74            .ok_or_else(|| CatalogError::NotFound("user", owner.clone()))?
75    } else {
76        session.user_id()
77    };
78
79    let resource_group = resource_group
80        .map(resolve_resource_group)
81        .transpose()?
82        .flatten();
83
84    if resource_group.is_some() {
85        risingwave_common::license::Feature::ResourceGroup
86            .check_available()
87            .map_err(|e| anyhow::anyhow!(e))?;
88    }
89
90    let resource_group = resource_group.as_deref().unwrap_or(DEFAULT_RESOURCE_GROUP);
91
92    let catalog_writer = session.catalog_writer()?;
93    catalog_writer
94        .create_database(
95            &database_name,
96            database_owner,
97            resource_group,
98            barrier_interval_ms,
99            checkpoint_frequency,
100        )
101        .await?;
102
103    Ok(PgResponse::empty_result(StatementType::CREATE_DATABASE))
104}
105
106#[cfg(test)]
107mod tests {
108    use crate::test_utils::LocalFrontend;
109
110    #[tokio::test]
111    async fn test_create_database() {
112        let frontend = LocalFrontend::new(Default::default()).await;
113        let session = frontend.session_ref();
114        let catalog_reader = session.env().catalog_reader();
115
116        frontend.run_sql("CREATE DATABASE database").await.unwrap();
117        {
118            let reader = catalog_reader.read_guard();
119            assert!(reader.get_database_by_name("database").is_ok());
120        }
121
122        frontend.run_sql("CREATE USER user WITH NOSUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
123        let user_id = {
124            let user_reader = session.env().user_info_reader();
125            user_reader
126                .read_guard()
127                .get_user_by_name("user")
128                .unwrap()
129                .id
130        };
131        let res = frontend
132            .run_user_sql(
133                "CREATE DATABASE database2",
134                "dev".to_owned(),
135                "user".to_owned(),
136                user_id,
137            )
138            .await;
139        assert!(res.is_err());
140
141        frontend.run_sql("CREATE USER user2 WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
142        let user_id = {
143            let user_reader = session.env().user_info_reader();
144            user_reader
145                .read_guard()
146                .get_user_by_name("user2")
147                .unwrap()
148                .id
149        };
150        frontend
151            .run_user_sql(
152                "CREATE DATABASE database2",
153                "dev".to_owned(),
154                "user2".to_owned(),
155                user_id,
156            )
157            .await
158            .unwrap();
159        {
160            let reader = catalog_reader.read_guard();
161            assert!(reader.get_database_by_name("database2").is_ok());
162        }
163    }
164}