risingwave_frontend/user/
user_catalog.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::acl::{AclMode, AclModeSet};
18use risingwave_common::catalog::{DatabaseId, ObjectId, SchemaId};
19use risingwave_pb::user::grant_privilege::Object as GrantObject;
20use risingwave_pb::user::{PbAction, PbAuthInfo, PbGrantPrivilege, PbUserInfo};
21
22use crate::user::UserId;
23
24/// `UserCatalog` is responsible for managing user's information.
25#[derive(Clone, Debug)]
26pub struct UserCatalog {
27    pub id: UserId,
28    pub name: String,
29    pub is_super: bool,
30    pub can_create_db: bool,
31    pub can_create_user: bool,
32    pub can_login: bool,
33    pub is_admin: bool,
34    pub auth_info: Option<PbAuthInfo>,
35    pub grant_privileges: Vec<PbGrantPrivilege>,
36
37    // User owned acl mode set, group by object id.
38    // TODO: merge it after we fully migrate to sql-backend.
39    pub database_acls: HashMap<DatabaseId, AclModeSet>,
40    pub schema_acls: HashMap<SchemaId, AclModeSet>,
41    pub object_acls: HashMap<ObjectId, AclModeSet>,
42}
43
44impl From<PbUserInfo> for UserCatalog {
45    fn from(user: PbUserInfo) -> Self {
46        let mut user_catalog = Self {
47            id: user.id,
48            name: user.name,
49            is_super: user.is_super,
50            can_create_db: user.can_create_db,
51            can_create_user: user.can_create_user,
52            can_login: user.can_login,
53            is_admin: user.is_admin,
54            auth_info: user.auth_info,
55            grant_privileges: user.grant_privileges,
56            database_acls: Default::default(),
57            schema_acls: Default::default(),
58            object_acls: Default::default(),
59        };
60        user_catalog.refresh_acl_modes();
61
62        user_catalog
63    }
64}
65
66impl UserCatalog {
67    pub fn to_prost(&self) -> PbUserInfo {
68        PbUserInfo {
69            id: self.id,
70            name: self.name.clone(),
71            is_super: self.is_super,
72            can_create_db: self.can_create_db,
73            can_create_user: self.can_create_user,
74            can_login: self.can_login,
75            is_admin: self.is_admin,
76            auth_info: self.auth_info.clone(),
77            grant_privileges: self.grant_privileges.clone(),
78        }
79    }
80
81    fn get_or_insert_acl(&mut self, object: GrantObject) -> &mut AclModeSet {
82        match object {
83            GrantObject::DatabaseId(id) => {
84                self.database_acls.entry(id).or_insert(AclModeSet::empty())
85            }
86            GrantObject::SchemaId(id) => self.schema_acls.entry(id).or_insert(AclModeSet::empty()),
87            GrantObject::TableId(id) => self
88                .object_acls
89                .entry(id.as_object_id())
90                .or_insert(AclModeSet::empty()),
91            GrantObject::SourceId(id) => self
92                .object_acls
93                .entry(id.as_object_id())
94                .or_insert(AclModeSet::empty()),
95            GrantObject::SinkId(id) => self
96                .object_acls
97                .entry(id.as_object_id())
98                .or_insert(AclModeSet::empty()),
99            GrantObject::ViewId(id) => self
100                .object_acls
101                .entry(id.as_object_id())
102                .or_insert(AclModeSet::empty()),
103            GrantObject::FunctionId(id) => self
104                .object_acls
105                .entry(id.as_object_id())
106                .or_insert(AclModeSet::empty()),
107            GrantObject::SubscriptionId(id) => self
108                .object_acls
109                .entry(id.as_object_id())
110                .or_insert(AclModeSet::empty()),
111            GrantObject::ConnectionId(id) => self
112                .object_acls
113                .entry(id.as_object_id())
114                .or_insert(AclModeSet::empty()),
115            GrantObject::SecretId(id) => self
116                .object_acls
117                .entry(id.as_object_id())
118                .or_insert(AclModeSet::empty()),
119        }
120    }
121
122    fn get_acl(&self, object: GrantObject) -> Option<&AclModeSet> {
123        match object {
124            GrantObject::DatabaseId(id) => self.database_acls.get(&id),
125            GrantObject::SchemaId(id) => self.schema_acls.get(&id),
126            GrantObject::TableId(id) => self.object_acls.get(&id.as_object_id()),
127            GrantObject::SourceId(id) => self.object_acls.get(&id.as_object_id()),
128            GrantObject::SinkId(id) => self.object_acls.get(&id.as_object_id()),
129            GrantObject::ViewId(id) => self.object_acls.get(&id.as_object_id()),
130            GrantObject::FunctionId(id) => self.object_acls.get(&id.as_object_id()),
131            GrantObject::SubscriptionId(id) => self.object_acls.get(&id.as_object_id()),
132            GrantObject::ConnectionId(id) => self.object_acls.get(&id.as_object_id()),
133            GrantObject::SecretId(id) => self.object_acls.get(&id.as_object_id()),
134        }
135    }
136
137    fn refresh_acl_modes(&mut self) {
138        self.database_acls.clear();
139        self.schema_acls.clear();
140        self.object_acls.clear();
141        let privileges = self.grant_privileges.clone();
142        for privilege in privileges {
143            let entry = self.get_or_insert_acl(privilege.object.unwrap());
144            for awo in privilege.action_with_opts {
145                entry
146                    .modes
147                    .insert::<AclMode>(awo.get_action().unwrap().into());
148            }
149        }
150    }
151
152    // Only for test, used in `MockUserInfoWriter`.
153    pub fn extend_privileges(&mut self, privileges: Vec<PbGrantPrivilege>) {
154        self.grant_privileges.extend(privileges);
155        self.refresh_acl_modes();
156    }
157
158    // Only for test, used in `MockUserInfoWriter`.
159    pub fn revoke_privileges(
160        &mut self,
161        privileges: Vec<PbGrantPrivilege>,
162        revoke_grant_option: bool,
163    ) {
164        self.grant_privileges.iter_mut().for_each(|p| {
165            for rp in &privileges {
166                if rp.object != p.object {
167                    continue;
168                }
169                if revoke_grant_option {
170                    for ao in &mut p.action_with_opts {
171                        if rp
172                            .action_with_opts
173                            .iter()
174                            .any(|rao| rao.action == ao.action)
175                        {
176                            ao.with_grant_option = false;
177                        }
178                    }
179                } else {
180                    p.action_with_opts.retain(|po| {
181                        rp.action_with_opts
182                            .iter()
183                            .all(|rao| rao.action != po.action)
184                    });
185                }
186            }
187        });
188        self.grant_privileges
189            .retain(|p| !p.action_with_opts.is_empty());
190        self.refresh_acl_modes();
191    }
192
193    pub fn has_privilege(&self, object: impl Into<GrantObject>, mode: AclMode) -> bool {
194        self.get_acl(object.into())
195            .is_some_and(|acl_set| acl_set.has_mode(mode))
196    }
197
198    pub fn has_schema_usage_privilege(&self, schema_id: SchemaId) -> bool {
199        self.is_super || self.has_privilege(schema_id, AclMode::Usage)
200    }
201
202    pub fn check_privilege_with_grant_option(
203        &self,
204        object: &GrantObject,
205        actions: &Vec<(PbAction, bool)>,
206    ) -> bool {
207        if self.is_super {
208            return true;
209        }
210        let mut action_map: HashMap<_, _> = actions.iter().map(|action| (action, false)).collect();
211
212        for privilege in &self.grant_privileges {
213            if privilege.get_object().unwrap() != object {
214                continue;
215            }
216            for awo in &privilege.action_with_opts {
217                let action = awo.get_action().unwrap();
218                let with_grant_option = awo.with_grant_option;
219
220                for (&key, found) in &mut action_map {
221                    let (required_action, required_grant_option) = *key;
222                    if action == required_action && (!required_grant_option | with_grant_option) {
223                        *found = true;
224                    }
225                }
226            }
227        }
228        action_map.values().all(|&found| found)
229    }
230
231    pub fn check_object_visibility(&self, obj_id: ObjectId) -> bool {
232        if self.is_super {
233            return true;
234        }
235
236        // `Select` and `Execute` are the minimum required privileges for object visibility.
237        // `Execute` is required for functions.
238        // `Usage` is required for connections and secrets.
239        self.object_acls.get(&obj_id).is_some_and(|acl_set| {
240            acl_set.has_mode(AclMode::Select)
241                || acl_set.has_mode(AclMode::Execute)
242                || acl_set.has_mode(AclMode::Usage)
243        })
244    }
245}