risingwave_frontend/user/
user_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17
18use risingwave_common::acl::{AclMode, AclModeSet};
19use risingwave_common::id::ObjectId;
20use risingwave_pb::user::grant_privilege::Object as GrantObject;
21use risingwave_pb::user::{PbAction, PbAuthInfo, PbGrantPrivilege, PbUserInfo};
22
23use crate::catalog::SchemaId;
24use crate::user::UserId;
25
26/// `UserCatalog` is responsible for managing user's information.
27#[derive(Clone, Debug)]
28pub struct UserCatalog {
29    pub id: UserId,
30    pub name: String,
31    pub is_super: bool,
32    pub can_create_db: bool,
33    pub can_create_user: bool,
34    pub can_login: bool,
35    pub is_admin: bool,
36    pub auth_info: Option<PbAuthInfo>,
37    pub grant_privileges: Vec<PbGrantPrivilege>,
38
39    // User owned acl mode set, group by object id.
40    // TODO: merge it after we fully migrate to sql-backend.
41    pub database_acls: HashMap<u32, AclModeSet>,
42    pub schema_acls: HashMap<u32, AclModeSet>,
43    pub object_acls: HashMap<u32, AclModeSet>,
44}
45
46impl From<PbUserInfo> for UserCatalog {
47    fn from(user: PbUserInfo) -> Self {
48        let mut user_catalog = Self {
49            id: user.id,
50            name: user.name,
51            is_super: user.is_super,
52            can_create_db: user.can_create_db,
53            can_create_user: user.can_create_user,
54            can_login: user.can_login,
55            is_admin: user.is_admin,
56            auth_info: user.auth_info,
57            grant_privileges: user.grant_privileges,
58            database_acls: Default::default(),
59            schema_acls: Default::default(),
60            object_acls: Default::default(),
61        };
62        user_catalog.refresh_acl_modes();
63
64        user_catalog
65    }
66}
67
68impl UserCatalog {
69    pub fn to_prost(&self) -> PbUserInfo {
70        PbUserInfo {
71            id: self.id,
72            name: self.name.clone(),
73            is_super: self.is_super,
74            can_create_db: self.can_create_db,
75            can_create_user: self.can_create_user,
76            can_login: self.can_login,
77            is_admin: self.is_admin,
78            auth_info: self.auth_info.clone(),
79            grant_privileges: self.grant_privileges.clone(),
80        }
81    }
82
83    fn get_acl_entry(&mut self, object: GrantObject) -> Entry<'_, u32, AclModeSet> {
84        match object {
85            GrantObject::DatabaseId(id) => self.database_acls.entry(id),
86            GrantObject::SchemaId(id) => self.schema_acls.entry(id),
87            GrantObject::TableId(id)
88            | GrantObject::SourceId(id)
89            | GrantObject::SinkId(id)
90            | GrantObject::ViewId(id)
91            | GrantObject::FunctionId(id)
92            | GrantObject::SubscriptionId(id)
93            | GrantObject::ConnectionId(id)
94            | GrantObject::SecretId(id) => self.object_acls.entry(id),
95        }
96    }
97
98    fn get_acl(&self, object: GrantObject) -> Option<&AclModeSet> {
99        match object {
100            GrantObject::DatabaseId(id) => self.database_acls.get(&id),
101            GrantObject::SchemaId(id) => self.schema_acls.get(&id),
102            GrantObject::TableId(id)
103            | GrantObject::SourceId(id)
104            | GrantObject::SinkId(id)
105            | GrantObject::ViewId(id)
106            | GrantObject::FunctionId(id)
107            | GrantObject::SubscriptionId(id)
108            | GrantObject::ConnectionId(id)
109            | GrantObject::SecretId(id) => self.object_acls.get(&id),
110        }
111    }
112
113    fn refresh_acl_modes(&mut self) {
114        self.database_acls.clear();
115        self.schema_acls.clear();
116        self.object_acls.clear();
117        let privileges = self.grant_privileges.clone();
118        for privilege in privileges {
119            let entry = self
120                .get_acl_entry(privilege.object.unwrap())
121                .or_insert(AclModeSet::empty());
122            for awo in privilege.action_with_opts {
123                entry
124                    .modes
125                    .insert::<AclMode>(awo.get_action().unwrap().into());
126            }
127        }
128    }
129
130    // Only for test, used in `MockUserInfoWriter`.
131    pub fn extend_privileges(&mut self, privileges: Vec<PbGrantPrivilege>) {
132        self.grant_privileges.extend(privileges);
133        self.refresh_acl_modes();
134    }
135
136    // Only for test, used in `MockUserInfoWriter`.
137    pub fn revoke_privileges(
138        &mut self,
139        privileges: Vec<PbGrantPrivilege>,
140        revoke_grant_option: bool,
141    ) {
142        self.grant_privileges.iter_mut().for_each(|p| {
143            for rp in &privileges {
144                if rp.object != p.object {
145                    continue;
146                }
147                if revoke_grant_option {
148                    for ao in &mut p.action_with_opts {
149                        if rp
150                            .action_with_opts
151                            .iter()
152                            .any(|rao| rao.action == ao.action)
153                        {
154                            ao.with_grant_option = false;
155                        }
156                    }
157                } else {
158                    p.action_with_opts.retain(|po| {
159                        rp.action_with_opts
160                            .iter()
161                            .all(|rao| rao.action != po.action)
162                    });
163                }
164            }
165        });
166        self.grant_privileges
167            .retain(|p| !p.action_with_opts.is_empty());
168        self.refresh_acl_modes();
169    }
170
171    pub fn has_privilege(&self, object: impl Into<GrantObject>, mode: AclMode) -> bool {
172        self.get_acl(object.into())
173            .is_some_and(|acl_set| acl_set.has_mode(mode))
174    }
175
176    pub fn has_schema_usage_privilege(&self, schema_id: SchemaId) -> bool {
177        self.has_privilege(schema_id, AclMode::Usage)
178    }
179
180    pub fn check_privilege_with_grant_option(
181        &self,
182        object: &GrantObject,
183        actions: &Vec<(PbAction, bool)>,
184    ) -> bool {
185        if self.is_super {
186            return true;
187        }
188        let mut action_map: HashMap<_, _> = actions.iter().map(|action| (action, false)).collect();
189
190        for privilege in &self.grant_privileges {
191            if privilege.get_object().unwrap() != object {
192                continue;
193            }
194            for awo in &privilege.action_with_opts {
195                let action = awo.get_action().unwrap();
196                let with_grant_option = awo.with_grant_option;
197
198                for (&key, found) in &mut action_map {
199                    let (required_action, required_grant_option) = *key;
200                    if action == required_action && (!required_grant_option | with_grant_option) {
201                        *found = true;
202                    }
203                }
204            }
205        }
206        action_map.values().all(|&found| found)
207    }
208
209    pub fn check_object_visibility(&self, obj_id: ObjectId) -> bool {
210        if self.is_super {
211            return true;
212        }
213
214        // `Select` and `Execute` are the minimum required privileges for object visibility.
215        // `Execute` is required for functions.
216        // `Usage` is required for connections and secrets.
217        self.object_acls
218            .get(&obj_id.as_raw_id())
219            .is_some_and(|acl_set| {
220                acl_set.has_mode(AclMode::Select)
221                    || acl_set.has_mode(AclMode::Execute)
222                    || acl_set.has_mode(AclMode::Usage)
223            })
224    }
225}