risingwave_frontend/handler/
privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::acl::AclMode;
16use risingwave_pb::user::PbAction;
17use risingwave_pb::user::grant_privilege::PbObject;
18
19use crate::catalog::OwnedByUserCatalog;
20use crate::error::ErrorCode::PermissionDenied;
21use crate::error::Result;
22use crate::session::SessionImpl;
23use crate::user::UserId;
24
25#[derive(Debug)]
26pub struct ObjectCheckItem {
27    pub(crate) owner: UserId,
28    pub(crate) mode: AclMode,
29    pub(crate) name: String,
30    // todo: change it to object id.
31    pub(crate) object: PbObject,
32}
33
34impl ObjectCheckItem {
35    pub fn new(owner: UserId, mode: AclMode, name: String, object: PbObject) -> Self {
36        Self {
37            owner,
38            mode,
39            name,
40            object,
41        }
42    }
43
44    pub fn error_message(&self) -> String {
45        let object_type = match self.object {
46            PbObject::DatabaseId(_) => "database",
47            PbObject::SchemaId(_) => "schema",
48            PbObject::TableId(_) => "table or materialized view",
49            PbObject::ViewId(_) => "view",
50            PbObject::SourceId(_) => "source",
51            PbObject::SinkId(_) => "sink",
52            PbObject::FunctionId(_) => "function",
53            PbObject::SubscriptionId(_) => "subscription",
54            PbObject::ConnectionId(_) => "connection",
55            PbObject::SecretId(_) => "secret",
56        };
57        format!(
58            "permission denied for {} \"{}\": {:?}",
59            object_type,
60            self.name,
61            PbAction::from(self.mode).as_str_name()
62        )
63    }
64}
65
66impl SessionImpl {
67    /// Check whether the user of the current session has privileges in `items`.
68    pub fn check_privileges(&self, items: &[ObjectCheckItem]) -> Result<()> {
69        let user_reader = self.env().user_info_reader();
70        let reader = user_reader.read_guard();
71
72        if let Some(user) = reader.get_user_by_name(&self.user_name()) {
73            if user.is_super {
74                return Ok(());
75            }
76            for item in items {
77                if item.owner == user.id {
78                    continue;
79                }
80                let has_privilege = user.has_privilege(&item.object, item.mode);
81                if !has_privilege {
82                    return Err(PermissionDenied(item.error_message()).into());
83                }
84            }
85        } else {
86            return Err(PermissionDenied("Session user is invalid".to_owned()).into());
87        }
88
89        Ok(())
90    }
91
92    /// Returns `true` if the user of the current session is a super user.
93    pub fn is_super_user(&self) -> bool {
94        let reader = self.env().user_info_reader().read_guard();
95
96        if let Some(info) = reader.get_user_by_name(&self.user_name()) {
97            info.is_super
98        } else {
99            false
100        }
101    }
102
103    /// Check whether the user of the current session has the privilege to drop or alter the
104    /// relation `relation` in the schema with name `schema_name`.
105    ///
106    /// Note that the right to drop or alter in PostgreSQL is special and not covered by the general
107    /// `GRANT`s.
108    ///
109    /// > The right to drop an object, or to alter its definition in any way, is not treated as a
110    /// > grantable privilege; it is inherent in the owner, and cannot be granted or revoked.
111    /// >
112    /// > Reference: <https://www.postgresql.org/docs/current/sql-grant.html>
113    pub fn check_privilege_for_drop_alter(
114        &self,
115        schema_name: &str,
116        relation: &impl OwnedByUserCatalog,
117    ) -> Result<()> {
118        let schema_owner = self
119            .env()
120            .catalog_reader()
121            .read_guard()
122            .get_schema_by_name(&self.database(), schema_name)
123            .unwrap()
124            .owner();
125
126        // https://www.postgresql.org/docs/current/sql-droptable.html
127        if self.user_id() != relation.owner()
128            && self.user_id() != schema_owner
129            && !self.is_super_user()
130        {
131            return Err(PermissionDenied(
132                "Only the relation owner, the schema owner, and superuser can drop or alter a relation.".to_owned(),
133            )
134            .into());
135        }
136
137        Ok(())
138    }
139
140    /// Check whether the user of the current session has the privilege to drop or alter the
141    /// `db_schema`, which is either a database or schema.
142    /// > Only the owner of the database, or a superuser, can drop a database.
143    /// >
144    /// > Reference: <https://www.postgresql.org/docs/current/manage-ag-dropdb.html>
145    /// >
146    /// > A schema can only be dropped by its owner or a superuser.
147    /// >
148    /// > Reference: <https://www.postgresql.org/docs/current/sql-dropschema.html>
149    pub fn check_privilege_for_drop_alter_db_schema(
150        &self,
151        db_schema: &impl OwnedByUserCatalog,
152    ) -> Result<()> {
153        if self.user_id() != db_schema.owner() && !self.is_super_user() {
154            return Err(PermissionDenied(
155                "Only the owner, and superuser can drop or alter a schema or database.".to_owned(),
156            )
157            .into());
158        }
159
160        Ok(())
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER_ID};
167
168    use super::*;
169    use crate::test_utils::LocalFrontend;
170
171    #[tokio::test]
172    async fn test_check_privileges() {
173        let frontend = LocalFrontend::new(Default::default()).await;
174        let session = frontend.session_ref();
175        let catalog_reader = session.env().catalog_reader();
176        frontend.run_sql("CREATE SCHEMA schema").await.unwrap();
177
178        let schema = catalog_reader
179            .read_guard()
180            .get_schema_by_name(DEFAULT_DATABASE_NAME, "schema")
181            .unwrap()
182            .clone();
183        let check_items = vec![ObjectCheckItem::new(
184            DEFAULT_SUPER_USER_ID,
185            AclMode::Create,
186            "schema".to_owned(),
187            PbObject::SchemaId(schema.id()),
188        )];
189        assert!(&session.check_privileges(&check_items).is_ok());
190
191        frontend
192            .run_sql(
193                "CREATE USER user WITH NOSUPERUSER PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
194            )
195            .await
196            .unwrap();
197        let database = DEFAULT_DATABASE_NAME.to_owned();
198        let user_name = "user".to_owned();
199        let user_id = {
200            let user_reader = session.env().user_info_reader();
201            user_reader
202                .read_guard()
203                .get_user_by_name("user")
204                .unwrap()
205                .id
206        };
207        let session = frontend.session_user_ref(database, user_name, user_id);
208        assert!(&session.check_privileges(&check_items).is_err());
209
210        frontend
211            .run_sql("GRANT CREATE ON SCHEMA schema TO user")
212            .await
213            .unwrap();
214        assert!(&session.check_privileges(&check_items).is_ok());
215    }
216}