risingwave_frontend/expr/function_impl/
has_privilege.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::id::{FunctionId, ObjectId};
18use risingwave_common::session_config::SearchPath;
19use risingwave_expr::{ExprError, Result, capture_context, function};
20use risingwave_pb::id::UserId;
21use risingwave_pb::user::Action;
22use risingwave_sqlparser::parser::Parser;
23use thiserror_ext::AsReport;
24
25use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER};
26use crate::catalog::root_catalog::SchemaPath;
27use crate::catalog::system_catalog::is_system_catalog;
28use crate::catalog::{CatalogReader, DatabaseId, OwnedGrantObject, SchemaId};
29use crate::session::AuthContext;
30use crate::user::user_service::UserInfoReader;
31use crate::{Binder, bind_data_type};
32
33#[inline(always)]
34pub fn user_not_found_err(inner_err: &str) -> ExprError {
35    ExprError::InvalidParam {
36        name: "user",
37        reason: inner_err.into(),
38    }
39}
40
41#[function("has_table_privilege(int4, int4, varchar) -> boolean")]
42fn has_table_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
43    // does user have privilege for table
44    let user_name = get_user_name_by_id_captured(user_id)?;
45    has_table_privilege_1(user_name.as_str(), table_oid, privileges)
46}
47
48#[function("has_table_privilege(varchar, int4, varchar) -> boolean")]
49fn has_table_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
50    let allowed_actions = HashSet::new();
51    let actions = parse_privilege(privileges, &allowed_actions)?;
52    let table_oid = ObjectId::new(table_oid as _);
53    if is_system_catalog(table_oid) {
54        return Ok(true);
55    }
56    has_privilege_impl_captured(
57        user_name,
58        &get_grant_object_by_oid_captured(table_oid)?,
59        &actions,
60    )
61}
62
63#[function("has_any_column_privilege(int4, int4, varchar) -> boolean")]
64fn has_any_column_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
65    // does user have privilege for any column of table
66    let user_name = get_user_name_by_id_captured(user_id)?;
67    has_any_column_privilege_1(user_name.as_str(), table_oid, privileges)
68}
69
70#[function("has_any_column_privilege(varchar, int4, varchar) -> boolean")]
71fn has_any_column_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
72    let allowed_actions = HashSet::from_iter([Action::Select, Action::Insert, Action::Update]);
73    let actions = parse_privilege(privileges, &allowed_actions)?;
74    has_privilege_impl_captured(
75        user_name,
76        &get_grant_object_by_oid_captured(ObjectId::new(table_oid as _))?,
77        &actions,
78    )
79}
80
81#[function("has_database_privilege(varchar, int4, varchar) -> boolean")]
82fn has_database_privilege(user_name: &str, database_oid: i32, privileges: &str) -> Result<bool> {
83    has_database_privilege_impl(user_name, DatabaseId::new(database_oid as _), privileges)
84}
85
86fn has_database_privilege_impl(
87    user_name: &str,
88    database_oid: DatabaseId,
89    privileges: &str,
90) -> Result<bool> {
91    // does user have privilege for database
92    let database_owner = get_database_owner_by_id_captured(database_oid)?;
93    let allowed_actions = HashSet::from_iter([Action::Create, Action::Connect]);
94    let actions = parse_privilege(privileges, &allowed_actions)?;
95    has_privilege_impl_captured(
96        user_name,
97        &OwnedGrantObject {
98            object: database_oid.into(),
99            owner: database_owner,
100        },
101        &actions,
102    )
103}
104
105#[function("has_database_privilege(int4, varchar, varchar) -> boolean")]
106fn has_database_privilege_1(user_id: i32, database_name: &str, privileges: &str) -> Result<bool> {
107    let user_name = get_user_name_by_id_captured(user_id)?;
108    let database_oid = get_database_id_by_name_captured(database_name)?;
109    has_database_privilege_impl(user_name.as_str(), database_oid, privileges)
110}
111
112#[function("has_database_privilege(int4, int4, varchar) -> boolean")]
113fn has_database_privilege_2(user_id: i32, database_oid: i32, privileges: &str) -> Result<bool> {
114    let user_name = get_user_name_by_id_captured(user_id)?;
115    has_database_privilege(user_name.as_str(), database_oid, privileges)
116}
117
118#[function("has_database_privilege(varchar, varchar, varchar) -> boolean")]
119fn has_database_privilege_3(
120    user_name: &str,
121    database_name: &str,
122    privileges: &str,
123) -> Result<bool> {
124    let database_oid = get_database_id_by_name_captured(database_name)?;
125    has_database_privilege_impl(user_name, database_oid, privileges)
126}
127
128#[function("has_schema_privilege(varchar, int4, varchar) -> boolean")]
129fn has_schema_privilege(user_name: &str, schema_oid: i32, privileges: &str) -> Result<bool> {
130    has_schema_privilege_impl(user_name, SchemaId::new(schema_oid as _), privileges)
131}
132
133fn has_schema_privilege_impl(
134    user_name: &str,
135    schema_oid: SchemaId,
136    privileges: &str,
137) -> Result<bool> {
138    // does user have privilege for schema
139    let schema_owner = get_schema_owner_by_id_captured(schema_oid)?;
140    let allowed_actions = HashSet::from_iter([Action::Create, Action::Usage]);
141    let actions = parse_privilege(privileges, &allowed_actions)?;
142    has_privilege_impl_captured(
143        user_name,
144        &OwnedGrantObject {
145            object: schema_oid.into(),
146            owner: schema_owner,
147        },
148        &actions,
149    )
150}
151
152#[function("has_schema_privilege(int4, varchar, varchar) -> boolean")]
153fn has_schema_privilege_1(user_id: i32, schema_name: &str, privileges: &str) -> Result<bool> {
154    let user_name = get_user_name_by_id_captured(user_id)?;
155    let schema_oid = get_schema_id_by_name_captured(schema_name)?;
156    has_schema_privilege_impl(user_name.as_str(), schema_oid, privileges)
157}
158
159#[function("has_schema_privilege(int4, int4, varchar) -> boolean")]
160fn has_schema_privilege_2(user_id: i32, schema_oid: i32, privileges: &str) -> Result<bool> {
161    let user_name = get_user_name_by_id_captured(user_id)?;
162    has_schema_privilege(user_name.as_str(), schema_oid, privileges)
163}
164
165#[function("has_schema_privilege(varchar, varchar, varchar) -> boolean")]
166fn has_schema_privilege_3(user_name: &str, schema_name: &str, privileges: &str) -> Result<bool> {
167    let schema_oid = get_schema_id_by_name_captured(schema_name)?;
168    has_schema_privilege_impl(user_name, schema_oid, privileges)
169}
170
171#[function("has_function_privilege(varchar, int4, varchar) -> boolean")]
172fn has_function_privilege(user_name: &str, function_oid: i32, privileges: &str) -> Result<bool> {
173    has_function_privilege_impl(user_name, FunctionId::new(function_oid as _), privileges)
174}
175
176fn has_function_privilege_impl(
177    user_name: &str,
178    function_oid: FunctionId,
179    privileges: &str,
180) -> Result<bool> {
181    // does user have privilege for function
182    let func_obj = get_grant_object_by_oid_captured(function_oid.as_object_id())?;
183    let allowed_actions = HashSet::from_iter([Action::Execute]);
184    let actions = parse_privilege(privileges, &allowed_actions)?;
185    has_privilege_impl_captured(user_name, &func_obj, &actions)
186}
187
188#[function("has_function_privilege(int4, int4, varchar) -> boolean")]
189fn has_function_privilege_1(user_id: i32, function_oid: i32, privileges: &str) -> Result<bool> {
190    let user_name = get_user_name_by_id_captured(user_id)?;
191    has_function_privilege_impl(
192        user_name.as_str(),
193        FunctionId::new(function_oid as _),
194        privileges,
195    )
196}
197
198#[function("has_function_privilege(varchar, varchar, varchar) -> boolean")]
199fn has_function_privilege_2(
200    user_name: &str,
201    function_name: &str,
202    privileges: &str,
203) -> Result<bool> {
204    let function_oid = get_function_id_by_name_captured(function_name)?;
205    has_function_privilege_impl(user_name, function_oid, privileges)
206}
207
208#[function("has_function_privilege(int4, varchar, varchar) -> boolean")]
209fn has_function_privilege_3(user_id: i32, function_name: &str, privileges: &str) -> Result<bool> {
210    let user_name = get_user_name_by_id_captured(user_id)?;
211    let function_oid = get_function_id_by_name_captured(function_name)?;
212    has_function_privilege_impl(user_name.as_str(), function_oid, privileges)
213}
214
215#[capture_context(USER_INFO_READER)]
216fn has_privilege_impl(
217    user_info_reader: &UserInfoReader,
218    user_name: &str,
219    object: &OwnedGrantObject,
220    actions: &Vec<(Action, bool)>,
221) -> Result<bool> {
222    let user_info = &user_info_reader.read_guard();
223    let user_catalog = user_info
224        .get_user_by_name(user_name)
225        .ok_or(user_not_found_err(
226            format!("User {} not found", user_name).as_str(),
227        ))?;
228    if user_catalog.id == object.owner {
229        // if the user is the owner of the object, they have all privileges
230        return Ok(true);
231    }
232    Ok(user_catalog.check_privilege_with_grant_option(&object.object, actions))
233}
234
235#[capture_context(USER_INFO_READER)]
236fn get_user_name_by_id(user_info_reader: &UserInfoReader, user_id: i32) -> Result<String> {
237    let user_id: u32 = user_id.try_into().map_err(|_| ExprError::InvalidParam {
238        name: "get_user_name_by_id",
239        reason: format!("invalid user_id {user_id}").into_boxed_str(),
240    })?;
241    let user_info = &user_info_reader.read_guard();
242    user_info
243        .get_user_name_by_id(user_id.into())
244        .ok_or(user_not_found_err(
245            format!("User {} not found", user_id).as_str(),
246        ))
247}
248
249#[capture_context(CATALOG_READER, DB_NAME)]
250fn get_grant_object_by_oid(
251    catalog_reader: &CatalogReader,
252    db_name: &str,
253    oid: ObjectId,
254) -> Result<OwnedGrantObject> {
255    catalog_reader
256        .read_guard()
257        .get_database_by_name(db_name)
258        .map_err(|e| ExprError::InvalidParam {
259            name: "oid",
260            reason: e.to_report_string().into(),
261        })?
262        .get_grant_object_by_oid(oid)
263        .ok_or(ExprError::InvalidParam {
264            name: "oid",
265            reason: format!("Table {} not found", oid).as_str().into(),
266        })
267}
268
269#[capture_context(CATALOG_READER)]
270fn get_database_id_by_name(catalog_reader: &CatalogReader, db_name: &str) -> Result<DatabaseId> {
271    let reader = &catalog_reader.read_guard();
272    Ok(reader
273        .get_database_by_name(db_name)
274        .map_err(|e| ExprError::InvalidParam {
275            name: "database",
276            reason: e.to_report_string().into(),
277        })?
278        .id())
279}
280
281#[capture_context(CATALOG_READER)]
282fn get_database_owner_by_id(
283    catalog_reader: &CatalogReader,
284    database_id: DatabaseId,
285) -> Result<UserId> {
286    let reader = &catalog_reader.read_guard();
287    let database = reader
288        .get_database_by_id(database_id)
289        .map_err(|e| ExprError::InvalidParam {
290            name: "database",
291            reason: e.to_report_string().into(),
292        })?;
293    Ok(database.owner)
294}
295
296#[capture_context(CATALOG_READER, DB_NAME)]
297fn get_schema_owner_by_id(
298    catalog_reader: &CatalogReader,
299    db_name: &str,
300    schema_id: SchemaId,
301) -> Result<UserId> {
302    let reader = &catalog_reader.read_guard();
303    let db_id = reader
304        .get_database_by_name(db_name)
305        .map_err(|e| ExprError::InvalidParam {
306            name: "database",
307            reason: e.to_report_string().into(),
308        })?
309        .id();
310    Ok(reader
311        .get_schema_by_id(db_id, schema_id)
312        .map_err(|e| ExprError::InvalidParam {
313            name: "schema",
314            reason: e.to_report_string().into(),
315        })?
316        .owner)
317}
318
319#[capture_context(CATALOG_READER, DB_NAME)]
320fn get_schema_id_by_name(
321    catalog_reader: &CatalogReader,
322    db_name: &str,
323    schema_name: &str,
324) -> Result<SchemaId> {
325    let reader = &catalog_reader.read_guard();
326    Ok(reader
327        .get_schema_by_name(db_name, schema_name)
328        .map_err(|e| ExprError::InvalidParam {
329            name: "schema",
330            reason: e.to_report_string().into(),
331        })?
332        .id())
333}
334
335#[capture_context(CATALOG_READER, DB_NAME, AUTH_CONTEXT, SEARCH_PATH)]
336fn get_function_id_by_name(
337    catalog_reader: &CatalogReader,
338    db_name: &str,
339    auth_context: &AuthContext,
340    search_path: &SearchPath,
341    function_name: &str,
342) -> Result<FunctionId> {
343    let desc =
344        Parser::parse_function_desc_str(function_name).map_err(|e| ExprError::InvalidParam {
345            name: "function",
346            reason: e.to_report_string().into(),
347        })?;
348    let mut arg_types = vec![];
349    if let Some(args) = desc.args {
350        for arg in &args {
351            arg_types.push(bind_data_type(&arg.data_type).map_err(|e| {
352                ExprError::InvalidParam {
353                    name: "function",
354                    reason: e.to_report_string().into(),
355                }
356            })?);
357        }
358    }
359
360    let (schema, name) =
361        Binder::resolve_schema_qualified_name(db_name, &desc.name).map_err(|e| {
362            ExprError::InvalidParam {
363                name: "function",
364                reason: e.to_report_string().into(),
365            }
366        })?;
367    let schema_path = SchemaPath::new(schema.as_deref(), search_path, &auth_context.user_name);
368
369    let reader = &catalog_reader.read_guard();
370    Ok(reader
371        .get_function_by_name_args(db_name, schema_path, &name, &arg_types)
372        .map_err(|e| ExprError::InvalidParam {
373            name: "function",
374            reason: e.to_report_string().into(),
375        })?
376        .0
377        .id)
378}
379
380fn parse_privilege(
381    privilege_string: &str,
382    allowed_actions: &HashSet<Action>,
383) -> Result<Vec<(Action, bool)>> {
384    let mut privileges = Vec::new();
385    for part in privilege_string.split(',').map(str::trim) {
386        let (privilege_type, grant_option) = match part.rsplit_once(" WITH GRANT OPTION") {
387            Some((p, _)) => (p, true),
388            None => (part, false),
389        };
390        match Action::from_str_name(privilege_type.to_uppercase().as_str()) {
391            Some(Action::Unspecified) | None => {
392                return Err(ExprError::InvalidParam {
393                    name: "privilege",
394                    reason: format!("unrecognized privilege type: \"{}\"", part).into(),
395                });
396            }
397            Some(action) => {
398                if allowed_actions.is_empty() || allowed_actions.contains(&action) {
399                    privileges.push((action, grant_option))
400                } else {
401                    return Err(ExprError::InvalidParam {
402                        name: "privilege",
403                        reason: format!("unrecognized privilege type: \"{}\"", part).into(),
404                    });
405                }
406            }
407        }
408    }
409    Ok(privileges)
410}