risingwave_frontend/handler/
privilege.rs1use risingwave_common::acl::AclMode;
16use risingwave_pb::user::PbAction;
17use risingwave_pb::user::grant_privilege::PbObject;
18
19use crate::catalog::OwnedByUserCatalog;
20use crate::error::ErrorCode::PermissionDenied;
21use crate::error::Result;
22use crate::session::SessionImpl;
23use crate::user::UserId;
24
25#[derive(Debug)]
26pub struct ObjectCheckItem {
27 pub(crate) owner: UserId,
28 pub(crate) mode: AclMode,
29 pub(crate) name: String,
30 pub(crate) object: PbObject,
32}
33
34impl ObjectCheckItem {
35 pub fn new(owner: UserId, mode: AclMode, name: String, object: PbObject) -> Self {
36 Self {
37 owner,
38 mode,
39 name,
40 object,
41 }
42 }
43
44 pub fn error_message(&self) -> String {
45 let object_type = match self.object {
46 PbObject::DatabaseId(_) => "database",
47 PbObject::SchemaId(_) => "schema",
48 PbObject::TableId(_) => "table or materialized view",
49 PbObject::ViewId(_) => "view",
50 PbObject::SourceId(_) => "source",
51 PbObject::SinkId(_) => "sink",
52 PbObject::FunctionId(_) => "function",
53 PbObject::SubscriptionId(_) => "subscription",
54 PbObject::ConnectionId(_) => "connection",
55 PbObject::SecretId(_) => "secret",
56 };
57 format!(
58 "permission denied for {} \"{}\": {:?}",
59 object_type,
60 self.name,
61 PbAction::from(self.mode).as_str_name()
62 )
63 }
64}
65
66impl SessionImpl {
67 pub fn check_privileges(&self, items: &[ObjectCheckItem]) -> Result<()> {
69 let user_reader = self.env().user_info_reader();
70 let reader = user_reader.read_guard();
71
72 if let Some(user) = reader.get_user_by_name(&self.user_name()) {
73 if user.is_super {
74 return Ok(());
75 }
76 for item in items {
77 if item.owner == user.id {
78 continue;
79 }
80 let has_privilege = user.has_privilege(&item.object, item.mode);
81 if !has_privilege {
82 return Err(PermissionDenied(item.error_message()).into());
83 }
84 }
85 } else {
86 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
87 }
88
89 Ok(())
90 }
91
92 pub fn is_super_user(&self) -> bool {
94 let reader = self.env().user_info_reader().read_guard();
95
96 if let Some(info) = reader.get_user_by_name(&self.user_name()) {
97 info.is_super
98 } else {
99 false
100 }
101 }
102
103 pub fn check_privilege_for_drop_alter(
114 &self,
115 schema_name: &str,
116 relation: &impl OwnedByUserCatalog,
117 ) -> Result<()> {
118 let schema_owner = self
119 .env()
120 .catalog_reader()
121 .read_guard()
122 .get_schema_by_name(&self.database(), schema_name)
123 .unwrap()
124 .owner();
125
126 if self.user_id() != relation.owner()
128 && self.user_id() != schema_owner
129 && !self.is_super_user()
130 {
131 return Err(PermissionDenied(
132 "Only the relation owner, the schema owner, and superuser can drop or alter a relation.".to_owned(),
133 )
134 .into());
135 }
136
137 Ok(())
138 }
139
140 pub fn check_privilege_for_drop_alter_db_schema(
150 &self,
151 db_schema: &impl OwnedByUserCatalog,
152 ) -> Result<()> {
153 if self.user_id() != db_schema.owner() && !self.is_super_user() {
154 return Err(PermissionDenied(
155 "Only the owner, and superuser can drop or alter a schema or database.".to_owned(),
156 )
157 .into());
158 }
159
160 Ok(())
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER_ID};
167
168 use super::*;
169 use crate::test_utils::LocalFrontend;
170
171 #[tokio::test]
172 async fn test_check_privileges() {
173 let frontend = LocalFrontend::new(Default::default()).await;
174 let session = frontend.session_ref();
175 let catalog_reader = session.env().catalog_reader();
176 frontend.run_sql("CREATE SCHEMA schema").await.unwrap();
177
178 let schema = catalog_reader
179 .read_guard()
180 .get_schema_by_name(DEFAULT_DATABASE_NAME, "schema")
181 .unwrap()
182 .clone();
183 let check_items = vec![ObjectCheckItem::new(
184 DEFAULT_SUPER_USER_ID,
185 AclMode::Create,
186 "schema".to_owned(),
187 PbObject::SchemaId(schema.id()),
188 )];
189 assert!(&session.check_privileges(&check_items).is_ok());
190
191 frontend
192 .run_sql(
193 "CREATE USER user WITH NOSUPERUSER PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'",
194 )
195 .await
196 .unwrap();
197 let database = DEFAULT_DATABASE_NAME.to_owned();
198 let user_name = "user".to_owned();
199 let user_id = {
200 let user_reader = session.env().user_info_reader();
201 user_reader
202 .read_guard()
203 .get_user_by_name("user")
204 .unwrap()
205 .id
206 };
207 let session = frontend.session_user_ref(database, user_name, user_id);
208 assert!(&session.check_privileges(&check_items).is_err());
209
210 frontend
211 .run_sql("GRANT CREATE ON SCHEMA schema TO user")
212 .await
213 .unwrap();
214 assert!(&session.check_privileges(&check_items).is_ok());
215 }
216}