risingwave_frontend/handler/
create_database.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::system_param::{NOTICE_BARRIER_INTERVAL_MS, NOTICE_CHECKPOINT_FREQUENCY};
17use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
18use risingwave_sqlparser::ast::{ObjectName, SetVariableValue};
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::CatalogError;
23use crate::error::ErrorCode::PermissionDenied;
24use crate::error::Result;
25use crate::handler::HandlerArgs;
26use crate::handler::alter_resource_group::resolve_resource_group;
27
28pub async fn handle_create_database(
29    handler_args: HandlerArgs,
30    database_name: ObjectName,
31    if_not_exist: bool,
32    owner: Option<ObjectName>,
33    resource_group: Option<SetVariableValue>,
34    barrier_interval_ms: Option<u32>,
35    checkpoint_frequency: Option<u64>,
36) -> Result<RwPgResponse> {
37    let mut builder = RwPgResponse::builder(StatementType::CREATE_DATABASE);
38
39    let session = handler_args.session;
40    let database_name = Binder::resolve_database_name(database_name)?;
41
42    {
43        let user_reader = session.env().user_info_reader();
44        let reader = user_reader.read_guard();
45        if let Some(info) = reader.get_user_by_name(&session.user_name()) {
46            if !info.can_create_db && !info.is_super {
47                return Err(
48                    PermissionDenied("permission denied to create database".to_owned()).into(),
49                );
50            }
51        } else {
52            return Err(PermissionDenied("Session user is invalid".to_owned()).into());
53        }
54    }
55
56    {
57        let catalog_reader = session.env().catalog_reader();
58        let reader = catalog_reader.read_guard();
59        if reader.get_database_by_name(&database_name).is_ok() {
60            // If `if_not_exist` is true, not return error.
61            return if if_not_exist {
62                Ok(builder
63                    .notice(format!("database \"{}\" exists, skipping", database_name))
64                    .into())
65            } else {
66                Err(CatalogError::duplicated("database", database_name).into())
67            };
68        }
69    }
70
71    let database_owner = if let Some(owner) = owner {
72        let owner = Binder::resolve_user_name(owner)?;
73        session
74            .env()
75            .user_info_reader()
76            .read_guard()
77            .get_user_by_name(&owner)
78            .map(|u| u.id)
79            .ok_or_else(|| CatalogError::NotFound("user", owner.clone()))?
80    } else {
81        session.user_id()
82    };
83
84    let resource_group = resource_group
85        .map(resolve_resource_group)
86        .transpose()?
87        .flatten();
88
89    if resource_group.is_some() {
90        risingwave_common::license::Feature::ResourceGroup.check_available()?;
91    }
92
93    let resource_group = resource_group.as_deref().unwrap_or(DEFAULT_RESOURCE_GROUP);
94
95    if barrier_interval_ms.is_some() || checkpoint_frequency.is_some() {
96        risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()?;
97    }
98    if let Some(interval) = barrier_interval_ms
99        && interval >= NOTICE_BARRIER_INTERVAL_MS
100    {
101        builder = builder.notice(
102                    format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
103                             interval, NOTICE_BARRIER_INTERVAL_MS));
104    }
105    if let Some(frequency) = checkpoint_frequency
106        && frequency >= NOTICE_CHECKPOINT_FREQUENCY
107    {
108        builder = builder.notice(
109                    format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
110                             frequency, NOTICE_CHECKPOINT_FREQUENCY));
111    }
112
113    let catalog_writer = session.catalog_writer()?;
114    catalog_writer
115        .create_database(
116            &database_name,
117            database_owner,
118            resource_group,
119            barrier_interval_ms,
120            checkpoint_frequency,
121        )
122        .await?;
123
124    Ok(builder.into())
125}
126
127#[cfg(test)]
128mod tests {
129    use crate::test_utils::LocalFrontend;
130
131    #[tokio::test]
132    async fn test_create_database() {
133        let frontend = LocalFrontend::new(Default::default()).await;
134        let session = frontend.session_ref();
135        let catalog_reader = session.env().catalog_reader();
136
137        frontend.run_sql("CREATE DATABASE database").await.unwrap();
138        {
139            let reader = catalog_reader.read_guard();
140            assert!(reader.get_database_by_name("database").is_ok());
141        }
142
143        frontend.run_sql("CREATE USER user WITH NOSUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
144        let user_id = {
145            let user_reader = session.env().user_info_reader();
146            user_reader
147                .read_guard()
148                .get_user_by_name("user")
149                .unwrap()
150                .id
151        };
152        let res = frontend
153            .run_user_sql(
154                "CREATE DATABASE database2",
155                "dev".to_owned(),
156                "user".to_owned(),
157                user_id,
158            )
159            .await;
160        assert!(res.is_err());
161
162        frontend.run_sql("CREATE USER user2 WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
163        let user_id = {
164            let user_reader = session.env().user_info_reader();
165            user_reader
166                .read_guard()
167                .get_user_by_name("user2")
168                .unwrap()
169                .id
170        };
171        frontend
172            .run_user_sql(
173                "CREATE DATABASE database2",
174                "dev".to_owned(),
175                "user2".to_owned(),
176                user_id,
177            )
178            .await
179            .unwrap();
180        {
181            let reader = catalog_reader.read_guard();
182            assert!(reader.get_database_by_name("database2").is_ok());
183        }
184    }
185}