risingwave_frontend/handler/
privilege.rs1use risingwave_common::acl::AclMode;
16use risingwave_pb::user::grant_privilege::PbObject;
17
18use crate::catalog::OwnedByUserCatalog;
19use crate::error::ErrorCode::PermissionDenied;
20use crate::error::Result;
21use crate::session::SessionImpl;
22use crate::user::UserId;
23
24#[derive(Debug)]
25pub struct ObjectCheckItem {
26 owner: UserId,
27 mode: AclMode,
28 object: PbObject,
30}
31
32impl ObjectCheckItem {
33 pub fn new(owner: UserId, mode: AclMode, object: PbObject) -> Self {
34 Self {
35 owner,
36 mode,
37 object,
38 }
39 }
40}
41
42impl SessionImpl {
43 pub fn check_privileges(&self, items: &[ObjectCheckItem]) -> Result<()> {
45 let user_reader = self.env().user_info_reader();
46 let reader = user_reader.read_guard();
47
48 if let Some(user) = reader.get_user_by_name(&self.user_name()) {
49 if user.is_super {
50 return Ok(());
51 }
52 for item in items {
53 if item.owner == user.id {
54 continue;
55 }
56 let has_privilege = user.has_privilege(&item.object, item.mode);
57 if !has_privilege {
58 return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
59 }
60 }
61 } else {
62 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
63 }
64
65 Ok(())
66 }
67
68 pub fn is_super_user(&self) -> bool {
70 let reader = self.env().user_info_reader().read_guard();
71
72 if let Some(info) = reader.get_user_by_name(&self.user_name()) {
73 info.is_super
74 } else {
75 false
76 }
77 }
78
79 pub fn check_privilege_for_drop_alter(
90 &self,
91 schema_name: &str,
92 relation: &impl OwnedByUserCatalog,
93 ) -> Result<()> {
94 let schema_owner = self
95 .env()
96 .catalog_reader()
97 .read_guard()
98 .get_schema_by_name(&self.database(), schema_name)
99 .unwrap()
100 .owner();
101
102 if self.user_id() != relation.owner()
104 && self.user_id() != schema_owner
105 && !self.is_super_user()
106 {
107 return Err(PermissionDenied(
108 "Only the relation owner, the schema owner, and superuser can drop or alter a relation.".to_owned(),
109 )
110 .into());
111 }
112
113 Ok(())
114 }
115
116 pub fn check_privilege_for_drop_alter_db_schema(
126 &self,
127 db_schema: &impl OwnedByUserCatalog,
128 ) -> Result<()> {
129 if self.user_id() != db_schema.owner() && !self.is_super_user() {
130 return Err(PermissionDenied(
131 "Only the owner, and superuser can drop or alter a schema or database.".to_owned(),
132 )
133 .into());
134 }
135
136 Ok(())
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER_ID};
143
144 use super::*;
145 use crate::test_utils::LocalFrontend;
146
147 #[tokio::test]
148 async fn test_check_privileges() {
149 let frontend = LocalFrontend::new(Default::default()).await;
150 let session = frontend.session_ref();
151 let catalog_reader = session.env().catalog_reader();
152 frontend.run_sql("CREATE SCHEMA schema").await.unwrap();
153
154 let schema = catalog_reader
155 .read_guard()
156 .get_schema_by_name(DEFAULT_DATABASE_NAME, "schema")
157 .unwrap()
158 .clone();
159 let check_items = vec![ObjectCheckItem::new(
160 DEFAULT_SUPER_USER_ID,
161 AclMode::Create,
162 PbObject::SchemaId(schema.id()),
163 )];
164 assert!(&session.check_privileges(&check_items).is_ok());
165
166 frontend
167 .run_sql(
168 "CREATE USER user WITH NOSUPERUSER PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
169 )
170 .await
171 .unwrap();
172 let database = DEFAULT_DATABASE_NAME.to_owned();
173 let user_name = "user".to_owned();
174 let user_id = {
175 let user_reader = session.env().user_info_reader();
176 user_reader
177 .read_guard()
178 .get_user_by_name("user")
179 .unwrap()
180 .id
181 };
182 let session = frontend.session_user_ref(database, user_name, user_id);
183 assert!(&session.check_privileges(&check_items).is_err());
184
185 frontend
186 .run_sql("GRANT CREATE ON SCHEMA schema TO user")
187 .await
188 .unwrap();
189 assert!(&session.check_privileges(&check_items).is_ok());
190 }
191}