risingwave_frontend/handler/
privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::acl::AclMode;
16use risingwave_pb::user::grant_privilege::PbObject;
17
18use crate::catalog::OwnedByUserCatalog;
19use crate::error::ErrorCode::PermissionDenied;
20use crate::error::Result;
21use crate::session::SessionImpl;
22use crate::user::UserId;
23
24#[derive(Debug)]
25pub struct ObjectCheckItem {
26    owner: UserId,
27    mode: AclMode,
28    // todo: change it to object id.
29    object: PbObject,
30}
31
32impl ObjectCheckItem {
33    pub fn new(owner: UserId, mode: AclMode, object: PbObject) -> Self {
34        Self {
35            owner,
36            mode,
37            object,
38        }
39    }
40}
41
42impl SessionImpl {
43    /// Check whether the user of the current session has privileges in `items`.
44    pub fn check_privileges(&self, items: &[ObjectCheckItem]) -> Result<()> {
45        let user_reader = self.env().user_info_reader();
46        let reader = user_reader.read_guard();
47
48        if let Some(user) = reader.get_user_by_name(&self.user_name()) {
49            if user.is_super {
50                return Ok(());
51            }
52            for item in items {
53                if item.owner == user.id {
54                    continue;
55                }
56                let has_privilege = user.has_privilege(&item.object, item.mode);
57                if !has_privilege {
58                    return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
59                }
60            }
61        } else {
62            return Err(PermissionDenied("Session user is invalid".to_owned()).into());
63        }
64
65        Ok(())
66    }
67
68    /// Returns `true` if the user of the current session is a super user.
69    pub fn is_super_user(&self) -> bool {
70        let reader = self.env().user_info_reader().read_guard();
71
72        if let Some(info) = reader.get_user_by_name(&self.user_name()) {
73            info.is_super
74        } else {
75            false
76        }
77    }
78
79    /// Check whether the user of the current session has the privilege to drop or alter the
80    /// relation `relation` in the schema with name `schema_name`.
81    ///
82    /// Note that the right to drop or alter in PostgreSQL is special and not covered by the general
83    /// `GRANT`s.
84    ///
85    /// > The right to drop an object, or to alter its definition in any way, is not treated as a
86    /// > grantable privilege; it is inherent in the owner, and cannot be granted or revoked.
87    /// >
88    /// > Reference: <https://www.postgresql.org/docs/current/sql-grant.html>
89    pub fn check_privilege_for_drop_alter(
90        &self,
91        schema_name: &str,
92        relation: &impl OwnedByUserCatalog,
93    ) -> Result<()> {
94        let schema_owner = self
95            .env()
96            .catalog_reader()
97            .read_guard()
98            .get_schema_by_name(&self.database(), schema_name)
99            .unwrap()
100            .owner();
101
102        // https://www.postgresql.org/docs/current/sql-droptable.html
103        if self.user_id() != relation.owner()
104            && self.user_id() != schema_owner
105            && !self.is_super_user()
106        {
107            return Err(PermissionDenied(
108                "Only the relation owner, the schema owner, and superuser can drop or alter a relation.".to_owned(),
109            )
110            .into());
111        }
112
113        Ok(())
114    }
115
116    /// Check whether the user of the current session has the privilege to drop or alter the
117    /// `db_schema`, which is either a database or schema.
118    /// > Only the owner of the database, or a superuser, can drop a database.
119    /// >
120    /// > Reference: <https://www.postgresql.org/docs/current/manage-ag-dropdb.html>
121    /// >
122    /// > A schema can only be dropped by its owner or a superuser.
123    /// >
124    /// > Reference: <https://www.postgresql.org/docs/current/sql-dropschema.html>
125    pub fn check_privilege_for_drop_alter_db_schema(
126        &self,
127        db_schema: &impl OwnedByUserCatalog,
128    ) -> Result<()> {
129        if self.user_id() != db_schema.owner() && !self.is_super_user() {
130            return Err(PermissionDenied(
131                "Only the owner, and superuser can drop or alter a schema or database.".to_owned(),
132            )
133            .into());
134        }
135
136        Ok(())
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER_ID};
143
144    use super::*;
145    use crate::test_utils::LocalFrontend;
146
147    #[tokio::test]
148    async fn test_check_privileges() {
149        let frontend = LocalFrontend::new(Default::default()).await;
150        let session = frontend.session_ref();
151        let catalog_reader = session.env().catalog_reader();
152        frontend.run_sql("CREATE SCHEMA schema").await.unwrap();
153
154        let schema = catalog_reader
155            .read_guard()
156            .get_schema_by_name(DEFAULT_DATABASE_NAME, "schema")
157            .unwrap()
158            .clone();
159        let check_items = vec![ObjectCheckItem::new(
160            DEFAULT_SUPER_USER_ID,
161            AclMode::Create,
162            PbObject::SchemaId(schema.id()),
163        )];
164        assert!(&session.check_privileges(&check_items).is_ok());
165
166        frontend
167            .run_sql(
168                "CREATE USER user WITH NOSUPERUSER PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
169            )
170            .await
171            .unwrap();
172        let database = DEFAULT_DATABASE_NAME.to_owned();
173        let user_name = "user".to_owned();
174        let user_id = {
175            let user_reader = session.env().user_info_reader();
176            user_reader
177                .read_guard()
178                .get_user_by_name("user")
179                .unwrap()
180                .id
181        };
182        let session = frontend.session_user_ref(database, user_name, user_id);
183        assert!(&session.check_privileges(&check_items).is_err());
184
185        frontend
186            .run_sql("GRANT CREATE ON SCHEMA schema TO user")
187            .await
188            .unwrap();
189        assert!(&session.check_privileges(&check_items).is_ok());
190    }
191}