risingwave_frontend/user/
user_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17
18use risingwave_common::acl::{AclMode, AclModeSet};
19use risingwave_pb::user::grant_privilege::Object as GrantObject;
20use risingwave_pb::user::{PbAction, PbAuthInfo, PbGrantPrivilege, PbUserInfo};
21
22use crate::catalog::{DatabaseId, SchemaId};
23use crate::user::UserId;
24
25/// `UserCatalog` is responsible for managing user's information.
26#[derive(Clone, Debug)]
27pub struct UserCatalog {
28    pub id: UserId,
29    pub name: String,
30    pub is_super: bool,
31    pub can_create_db: bool,
32    pub can_create_user: bool,
33    pub can_login: bool,
34    pub is_admin: bool,
35    pub auth_info: Option<PbAuthInfo>,
36    pub grant_privileges: Vec<PbGrantPrivilege>,
37
38    // User owned acl mode set, group by object id.
39    // TODO: merge it after we fully migrate to sql-backend.
40    pub database_acls: HashMap<DatabaseId, AclModeSet>,
41    pub schema_acls: HashMap<SchemaId, AclModeSet>,
42    pub object_acls: HashMap<u32, AclModeSet>,
43}
44
45impl From<PbUserInfo> for UserCatalog {
46    fn from(user: PbUserInfo) -> Self {
47        let mut user_catalog = Self {
48            id: user.id,
49            name: user.name,
50            is_super: user.is_super,
51            can_create_db: user.can_create_db,
52            can_create_user: user.can_create_user,
53            can_login: user.can_login,
54            is_admin: user.is_admin,
55            auth_info: user.auth_info,
56            grant_privileges: user.grant_privileges,
57            database_acls: Default::default(),
58            schema_acls: Default::default(),
59            object_acls: Default::default(),
60        };
61        user_catalog.refresh_acl_modes();
62
63        user_catalog
64    }
65}
66
67impl UserCatalog {
68    pub fn to_prost(&self) -> PbUserInfo {
69        PbUserInfo {
70            id: self.id,
71            name: self.name.clone(),
72            is_super: self.is_super,
73            can_create_db: self.can_create_db,
74            can_create_user: self.can_create_user,
75            can_login: self.can_login,
76            is_admin: self.is_admin,
77            auth_info: self.auth_info.clone(),
78            grant_privileges: self.grant_privileges.clone(),
79        }
80    }
81
82    fn get_acl_entry(&mut self, object: GrantObject) -> Entry<'_, u32, AclModeSet> {
83        match object {
84            GrantObject::DatabaseId(id) => self.database_acls.entry(id),
85            GrantObject::SchemaId(id) => self.schema_acls.entry(id),
86            GrantObject::TableId(id)
87            | GrantObject::SourceId(id)
88            | GrantObject::SinkId(id)
89            | GrantObject::ViewId(id)
90            | GrantObject::FunctionId(id)
91            | GrantObject::SubscriptionId(id)
92            | GrantObject::ConnectionId(id)
93            | GrantObject::SecretId(id) => self.object_acls.entry(id),
94        }
95    }
96
97    fn get_acl(&self, object: &GrantObject) -> Option<&AclModeSet> {
98        match object {
99            GrantObject::DatabaseId(id) => self.database_acls.get(id),
100            GrantObject::SchemaId(id) => self.schema_acls.get(id),
101            GrantObject::TableId(id)
102            | GrantObject::SourceId(id)
103            | GrantObject::SinkId(id)
104            | GrantObject::ViewId(id)
105            | GrantObject::FunctionId(id)
106            | GrantObject::SubscriptionId(id)
107            | GrantObject::ConnectionId(id)
108            | GrantObject::SecretId(id) => self.object_acls.get(id),
109        }
110    }
111
112    fn refresh_acl_modes(&mut self) {
113        self.database_acls.clear();
114        self.schema_acls.clear();
115        self.object_acls.clear();
116        let privileges = self.grant_privileges.clone();
117        for privilege in privileges {
118            let entry = self
119                .get_acl_entry(privilege.object.unwrap())
120                .or_insert(AclModeSet::empty());
121            for awo in privilege.action_with_opts {
122                entry
123                    .modes
124                    .insert::<AclMode>(awo.get_action().unwrap().into());
125            }
126        }
127    }
128
129    // Only for test, used in `MockUserInfoWriter`.
130    pub fn extend_privileges(&mut self, privileges: Vec<PbGrantPrivilege>) {
131        self.grant_privileges.extend(privileges);
132        self.refresh_acl_modes();
133    }
134
135    // Only for test, used in `MockUserInfoWriter`.
136    pub fn revoke_privileges(
137        &mut self,
138        privileges: Vec<PbGrantPrivilege>,
139        revoke_grant_option: bool,
140    ) {
141        self.grant_privileges.iter_mut().for_each(|p| {
142            for rp in &privileges {
143                if rp.object != p.object {
144                    continue;
145                }
146                if revoke_grant_option {
147                    for ao in &mut p.action_with_opts {
148                        if rp
149                            .action_with_opts
150                            .iter()
151                            .any(|rao| rao.action == ao.action)
152                        {
153                            ao.with_grant_option = false;
154                        }
155                    }
156                } else {
157                    p.action_with_opts.retain(|po| {
158                        rp.action_with_opts
159                            .iter()
160                            .all(|rao| rao.action != po.action)
161                    });
162                }
163            }
164        });
165        self.grant_privileges
166            .retain(|p| !p.action_with_opts.is_empty());
167        self.refresh_acl_modes();
168    }
169
170    pub fn has_privilege(&self, object: &GrantObject, mode: AclMode) -> bool {
171        self.get_acl(object)
172            .is_some_and(|acl_set| acl_set.has_mode(mode))
173    }
174
175    pub fn check_privilege_with_grant_option(
176        &self,
177        object: &GrantObject,
178        actions: &Vec<(PbAction, bool)>,
179    ) -> bool {
180        if self.is_super {
181            return true;
182        }
183        let mut action_map: HashMap<_, _> = actions.iter().map(|action| (action, false)).collect();
184
185        for privilege in &self.grant_privileges {
186            if privilege.get_object().unwrap() != object {
187                continue;
188            }
189            for awo in &privilege.action_with_opts {
190                let action = awo.get_action().unwrap();
191                let with_grant_option = awo.with_grant_option;
192
193                for (&key, found) in &mut action_map {
194                    let (required_action, required_grant_option) = *key;
195                    if action == required_action && (!required_grant_option | with_grant_option) {
196                        *found = true;
197                    }
198                }
199            }
200        }
201        action_map.values().all(|&found| found)
202    }
203
204    pub fn check_object_visibility(&self, obj_id: u32) -> bool {
205        if self.is_super {
206            return true;
207        }
208
209        // `Select` and `Execute` are the minimum required privileges for object visibility.
210        // `Execute` is required for functions.
211        self.object_acls.get(&obj_id).is_some_and(|acl_set| {
212            acl_set.has_mode(AclMode::Select) || acl_set.has_mode(AclMode::Execute)
213        })
214    }
215}