risingwave_frontend/handler/
drop_database.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_sqlparser::ast::ObjectName;
17
18use super::RwPgResponse;
19use crate::binder::Binder;
20use crate::catalog::OwnedByUserCatalog;
21use crate::error::{ErrorCode, Result};
22use crate::handler::HandlerArgs;
23
24pub async fn handle_drop_database(
25    handler_args: HandlerArgs,
26    database_name: ObjectName,
27    if_exists: bool,
28) -> Result<RwPgResponse> {
29    let session = handler_args.session;
30    let catalog_reader = session.env().catalog_reader();
31    let database_name = Binder::resolve_database_name(database_name)?;
32    if session.database() == database_name {
33        return Err(ErrorCode::PermissionDenied(
34            "cannot drop the currently open database".to_owned(),
35        )
36        .into());
37    }
38    let database = {
39        let reader = catalog_reader.read_guard();
40        match reader.get_database_by_name(&database_name) {
41            Ok(db) => db.clone(),
42            Err(err) => {
43                // Unable to find this database. If `if_exists` is true,
44                // we can just return success.
45                return if if_exists {
46                    Ok(PgResponse::builder(StatementType::DROP_DATABASE)
47                        .notice(format!(
48                            "database \"{}\" does not exist, skipping",
49                            database_name
50                        ))
51                        .into())
52                } else {
53                    Err(err.into())
54                };
55            }
56        }
57    };
58
59    // Check if database was created by an admin user, if so, require admin privilege to drop
60    {
61        let user_reader = session.env().user_info_reader().read_guard();
62        let current_user = user_reader
63            .get_user_by_name(&session.user_name())
64            .ok_or_else(|| ErrorCode::PermissionDenied("Session user is invalid".to_owned()))?;
65
66        // If the database owner was an admin, only admin users can delete it
67        if let Some(database_owner) = user_reader.get_user_by_id(&database.owner())
68            && database_owner.is_admin
69            && !current_user.is_admin
70        {
71            return Err(ErrorCode::PermissionDenied(
72                "only admin users can drop databases created by admin users".to_owned(),
73            )
74            .into());
75        }
76    }
77
78    session.check_privilege_for_drop_alter_db_schema(&database)?;
79
80    let catalog_writer = session.catalog_writer()?;
81    catalog_writer.drop_database(database.id()).await?;
82    Ok(PgResponse::empty_result(StatementType::DROP_DATABASE))
83}
84
85#[cfg(test)]
86mod tests {
87    use crate::test_utils::LocalFrontend;
88
89    #[tokio::test]
90    async fn test_drop_database() {
91        let frontend = LocalFrontend::new(Default::default()).await;
92        let session = frontend.session_ref();
93        let catalog_reader = session.env().catalog_reader();
94
95        frontend.run_sql("CREATE DATABASE database").await.unwrap();
96
97        frontend.run_sql("CREATE SCHEMA schema").await.unwrap();
98
99        frontend.run_sql("DROP SCHEMA public").await.unwrap();
100
101        frontend.run_sql("CREATE USER user WITH NOSUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
102        let user_id = {
103            let user_reader = session.env().user_info_reader();
104            user_reader
105                .read_guard()
106                .get_user_by_name("user")
107                .unwrap()
108                .id
109        };
110        let res = frontend
111            .run_user_sql(
112                "DROP DATABASE database",
113                "dev".to_owned(),
114                "user".to_owned(),
115                user_id,
116            )
117            .await;
118        assert!(res.is_err());
119
120        frontend.run_sql("DROP DATABASE database").await.unwrap();
121
122        let database = catalog_reader
123            .read_guard()
124            .get_database_by_name("database")
125            .ok()
126            .cloned();
127        assert!(database.is_none());
128    }
129}