risingwave_frontend/user/
user_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17
18use risingwave_common::acl::{AclMode, AclModeSet};
19use risingwave_pb::user::grant_privilege::{Action, Object as GrantObject, Object};
20use risingwave_pb::user::{PbAuthInfo, PbGrantPrivilege, PbUserInfo};
21
22use crate::catalog::{DatabaseId, SchemaId};
23use crate::user::UserId;
24
25/// `UserCatalog` is responsible for managing user's information.
26#[derive(Clone, Debug)]
27pub struct UserCatalog {
28    pub id: UserId,
29    pub name: String,
30    pub is_super: bool,
31    pub can_create_db: bool,
32    pub can_create_user: bool,
33    pub can_login: bool,
34    pub auth_info: Option<PbAuthInfo>,
35    pub grant_privileges: Vec<PbGrantPrivilege>,
36
37    // User owned acl mode set, group by object id.
38    // TODO: merge it after we fully migrate to sql-backend.
39    pub database_acls: HashMap<DatabaseId, AclModeSet>,
40    pub schema_acls: HashMap<SchemaId, AclModeSet>,
41    pub object_acls: HashMap<u32, AclModeSet>,
42}
43
44impl From<PbUserInfo> for UserCatalog {
45    fn from(user: PbUserInfo) -> Self {
46        let mut user_catalog = Self {
47            id: user.id,
48            name: user.name,
49            is_super: user.is_super,
50            can_create_db: user.can_create_db,
51            can_create_user: user.can_create_user,
52            can_login: user.can_login,
53            auth_info: user.auth_info,
54            grant_privileges: user.grant_privileges,
55            database_acls: Default::default(),
56            schema_acls: Default::default(),
57            object_acls: Default::default(),
58        };
59        user_catalog.refresh_acl_modes();
60
61        user_catalog
62    }
63}
64
65impl UserCatalog {
66    pub fn to_prost(&self) -> PbUserInfo {
67        PbUserInfo {
68            id: self.id,
69            name: self.name.clone(),
70            is_super: self.is_super,
71            can_create_db: self.can_create_db,
72            can_create_user: self.can_create_user,
73            can_login: self.can_login,
74            auth_info: self.auth_info.clone(),
75            grant_privileges: self.grant_privileges.clone(),
76        }
77    }
78
79    fn get_acl_entry(&mut self, object: GrantObject) -> Entry<'_, u32, AclModeSet> {
80        match object {
81            Object::DatabaseId(id) => self.database_acls.entry(id),
82            Object::SchemaId(id) => self.schema_acls.entry(id),
83            Object::TableId(id)
84            | Object::SourceId(id)
85            | Object::SinkId(id)
86            | Object::ViewId(id)
87            | Object::FunctionId(id)
88            | Object::SubscriptionId(id)
89            | Object::ConnectionId(id)
90            | Object::SecretId(id) => self.object_acls.entry(id),
91        }
92    }
93
94    fn get_acl(&self, object: &GrantObject) -> Option<&AclModeSet> {
95        match object {
96            Object::DatabaseId(id) => self.database_acls.get(id),
97            Object::SchemaId(id) => self.schema_acls.get(id),
98            Object::TableId(id)
99            | Object::SourceId(id)
100            | Object::SinkId(id)
101            | Object::ViewId(id)
102            | Object::FunctionId(id)
103            | Object::SubscriptionId(id)
104            | Object::ConnectionId(id)
105            | Object::SecretId(id) => self.object_acls.get(id),
106        }
107    }
108
109    fn refresh_acl_modes(&mut self) {
110        self.database_acls.clear();
111        self.schema_acls.clear();
112        self.object_acls.clear();
113        let privileges = self.grant_privileges.clone();
114        for privilege in privileges {
115            let entry = self
116                .get_acl_entry(privilege.object.unwrap())
117                .or_insert(AclModeSet::empty());
118            for awo in privilege.action_with_opts {
119                entry
120                    .modes
121                    .insert::<AclMode>(awo.get_action().unwrap().into());
122            }
123        }
124    }
125
126    // Only for test, used in `MockUserInfoWriter`.
127    pub fn extend_privileges(&mut self, privileges: Vec<PbGrantPrivilege>) {
128        self.grant_privileges.extend(privileges);
129        self.refresh_acl_modes();
130    }
131
132    // Only for test, used in `MockUserInfoWriter`.
133    pub fn revoke_privileges(
134        &mut self,
135        privileges: Vec<PbGrantPrivilege>,
136        revoke_grant_option: bool,
137    ) {
138        self.grant_privileges.iter_mut().for_each(|p| {
139            for rp in &privileges {
140                if rp.object != p.object {
141                    continue;
142                }
143                if revoke_grant_option {
144                    for ao in &mut p.action_with_opts {
145                        if rp
146                            .action_with_opts
147                            .iter()
148                            .any(|rao| rao.action == ao.action)
149                        {
150                            ao.with_grant_option = false;
151                        }
152                    }
153                } else {
154                    p.action_with_opts.retain(|po| {
155                        rp.action_with_opts
156                            .iter()
157                            .all(|rao| rao.action != po.action)
158                    });
159                }
160            }
161        });
162        self.grant_privileges
163            .retain(|p| !p.action_with_opts.is_empty());
164        self.refresh_acl_modes();
165    }
166
167    pub fn has_privilege(&self, object: &GrantObject, mode: AclMode) -> bool {
168        self.get_acl(object)
169            .is_some_and(|acl_set| acl_set.has_mode(mode))
170    }
171
172    pub fn check_privilege_with_grant_option(
173        &self,
174        object: &GrantObject,
175        actions: &Vec<(Action, bool)>,
176    ) -> bool {
177        if self.is_super {
178            return true;
179        }
180        let mut action_map: HashMap<_, _> = actions.iter().map(|action| (action, false)).collect();
181
182        for privilege in &self.grant_privileges {
183            if privilege.get_object().unwrap() != object {
184                continue;
185            }
186            for awo in &privilege.action_with_opts {
187                let action = awo.get_action().unwrap();
188                let with_grant_option = awo.with_grant_option;
189
190                for (&key, found) in &mut action_map {
191                    let (required_action, required_grant_option) = *key;
192                    if action == required_action && (!required_grant_option | with_grant_option) {
193                        *found = true;
194                    }
195                }
196            }
197        }
198        action_map.values().all(|&found| found)
199    }
200
201    pub fn check_object_visibility(&self, obj_id: u32) -> bool {
202        if self.is_super {
203            return true;
204        }
205
206        // `Select` and `Execute` are the minimum required privileges for object visibility.
207        // `Execute` is required for functions.
208        self.object_acls.get(&obj_id).is_some_and(|acl_set| {
209            acl_set.has_mode(AclMode::Select) || acl_set.has_mode(AclMode::Execute)
210        })
211    }
212}