risingwave_frontend/expr/function_impl/
has_privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::id::{FunctionId, ObjectId};
18use risingwave_common::session_config::SearchPath;
19use risingwave_expr::{ExprError, Result, capture_context, function};
20use risingwave_pb::user::Action;
21use risingwave_sqlparser::parser::Parser;
22use thiserror_ext::AsReport;
23
24use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER};
25use crate::catalog::root_catalog::SchemaPath;
26use crate::catalog::system_catalog::is_system_catalog;
27use crate::catalog::{CatalogReader, DatabaseId, OwnedGrantObject, SchemaId};
28use crate::session::AuthContext;
29use crate::user::user_service::UserInfoReader;
30use crate::{Binder, bind_data_type};
31
32#[inline(always)]
33pub fn user_not_found_err(inner_err: &str) -> ExprError {
34    ExprError::InvalidParam {
35        name: "user",
36        reason: inner_err.into(),
37    }
38}
39
40#[function("has_table_privilege(int4, int4, varchar) -> boolean")]
41fn has_table_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
42    // does user have privilege for table
43    let user_name = get_user_name_by_id_captured(user_id)?;
44    has_table_privilege_1(user_name.as_str(), table_oid, privileges)
45}
46
47#[function("has_table_privilege(varchar, int4, varchar) -> boolean")]
48fn has_table_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
49    let allowed_actions = HashSet::new();
50    let actions = parse_privilege(privileges, &allowed_actions)?;
51    let table_oid = ObjectId::new(table_oid as _);
52    if is_system_catalog(table_oid) {
53        return Ok(true);
54    }
55    has_privilege_impl_captured(
56        user_name,
57        &get_grant_object_by_oid_captured(table_oid)?,
58        &actions,
59    )
60}
61
62#[function("has_any_column_privilege(int4, int4, varchar) -> boolean")]
63fn has_any_column_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
64    // does user have privilege for any column of table
65    let user_name = get_user_name_by_id_captured(user_id)?;
66    has_any_column_privilege_1(user_name.as_str(), table_oid, privileges)
67}
68
69#[function("has_any_column_privilege(varchar, int4, varchar) -> boolean")]
70fn has_any_column_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
71    let allowed_actions = HashSet::from_iter([Action::Select, Action::Insert, Action::Update]);
72    let actions = parse_privilege(privileges, &allowed_actions)?;
73    has_privilege_impl_captured(
74        user_name,
75        &get_grant_object_by_oid_captured(ObjectId::new(table_oid as _))?,
76        &actions,
77    )
78}
79
80#[function("has_database_privilege(varchar, int4, varchar) -> boolean")]
81fn has_database_privilege(user_name: &str, database_oid: i32, privileges: &str) -> Result<bool> {
82    has_database_privilege_impl(user_name, DatabaseId::new(database_oid as _), privileges)
83}
84
85fn has_database_privilege_impl(
86    user_name: &str,
87    database_oid: DatabaseId,
88    privileges: &str,
89) -> Result<bool> {
90    // does user have privilege for database
91    let database_owner = get_database_owner_by_id_captured(database_oid)?;
92    let allowed_actions = HashSet::from_iter([Action::Create, Action::Connect]);
93    let actions = parse_privilege(privileges, &allowed_actions)?;
94    has_privilege_impl_captured(
95        user_name,
96        &OwnedGrantObject {
97            object: database_oid.into(),
98            owner: database_owner as u32,
99        },
100        &actions,
101    )
102}
103
104#[function("has_database_privilege(int4, varchar, varchar) -> boolean")]
105fn has_database_privilege_1(user_id: i32, database_name: &str, privileges: &str) -> Result<bool> {
106    let user_name = get_user_name_by_id_captured(user_id)?;
107    let database_oid = get_database_id_by_name_captured(database_name)?;
108    has_database_privilege_impl(user_name.as_str(), database_oid, privileges)
109}
110
111#[function("has_database_privilege(int4, int4, varchar) -> boolean")]
112fn has_database_privilege_2(user_id: i32, database_oid: i32, privileges: &str) -> Result<bool> {
113    let user_name = get_user_name_by_id_captured(user_id)?;
114    has_database_privilege(user_name.as_str(), database_oid, privileges)
115}
116
117#[function("has_database_privilege(varchar, varchar, varchar) -> boolean")]
118fn has_database_privilege_3(
119    user_name: &str,
120    database_name: &str,
121    privileges: &str,
122) -> Result<bool> {
123    let database_oid = get_database_id_by_name_captured(database_name)?;
124    has_database_privilege_impl(user_name, database_oid, privileges)
125}
126
127#[function("has_schema_privilege(varchar, int4, varchar) -> boolean")]
128fn has_schema_privilege(user_name: &str, schema_oid: i32, privileges: &str) -> Result<bool> {
129    has_schema_privilege_impl(user_name, SchemaId::new(schema_oid as _), privileges)
130}
131
132fn has_schema_privilege_impl(
133    user_name: &str,
134    schema_oid: SchemaId,
135    privileges: &str,
136) -> Result<bool> {
137    // does user have privilege for schema
138    let schema_owner = get_schema_owner_by_id_captured(schema_oid)?;
139    let allowed_actions = HashSet::from_iter([Action::Create, Action::Usage]);
140    let actions = parse_privilege(privileges, &allowed_actions)?;
141    has_privilege_impl_captured(
142        user_name,
143        &OwnedGrantObject {
144            object: schema_oid.into(),
145            owner: schema_owner as u32,
146        },
147        &actions,
148    )
149}
150
151#[function("has_schema_privilege(int4, varchar, varchar) -> boolean")]
152fn has_schema_privilege_1(user_id: i32, schema_name: &str, privileges: &str) -> Result<bool> {
153    let user_name = get_user_name_by_id_captured(user_id)?;
154    let schema_oid = get_schema_id_by_name_captured(schema_name)?;
155    has_schema_privilege_impl(user_name.as_str(), schema_oid, privileges)
156}
157
158#[function("has_schema_privilege(int4, int4, varchar) -> boolean")]
159fn has_schema_privilege_2(user_id: i32, schema_oid: i32, privileges: &str) -> Result<bool> {
160    let user_name = get_user_name_by_id_captured(user_id)?;
161    has_schema_privilege(user_name.as_str(), schema_oid, privileges)
162}
163
164#[function("has_schema_privilege(varchar, varchar, varchar) -> boolean")]
165fn has_schema_privilege_3(user_name: &str, schema_name: &str, privileges: &str) -> Result<bool> {
166    let schema_oid = get_schema_id_by_name_captured(schema_name)?;
167    has_schema_privilege_impl(user_name, schema_oid, privileges)
168}
169
170#[function("has_function_privilege(varchar, int4, varchar) -> boolean")]
171fn has_function_privilege(user_name: &str, function_oid: i32, privileges: &str) -> Result<bool> {
172    has_function_privilege_impl(user_name, FunctionId::new(function_oid as _), privileges)
173}
174
175fn has_function_privilege_impl(
176    user_name: &str,
177    function_oid: FunctionId,
178    privileges: &str,
179) -> Result<bool> {
180    // does user have privilege for function
181    let func_obj = get_grant_object_by_oid_captured(function_oid.as_object_id())?;
182    let allowed_actions = HashSet::from_iter([Action::Execute]);
183    let actions = parse_privilege(privileges, &allowed_actions)?;
184    has_privilege_impl_captured(user_name, &func_obj, &actions)
185}
186
187#[function("has_function_privilege(int4, int4, varchar) -> boolean")]
188fn has_function_privilege_1(user_id: i32, function_oid: i32, privileges: &str) -> Result<bool> {
189    let user_name = get_user_name_by_id_captured(user_id)?;
190    has_function_privilege_impl(
191        user_name.as_str(),
192        FunctionId::new(function_oid as _),
193        privileges,
194    )
195}
196
197#[function("has_function_privilege(varchar, varchar, varchar) -> boolean")]
198fn has_function_privilege_2(
199    user_name: &str,
200    function_name: &str,
201    privileges: &str,
202) -> Result<bool> {
203    let function_oid = get_function_id_by_name_captured(function_name)?;
204    has_function_privilege_impl(user_name, function_oid, privileges)
205}
206
207#[function("has_function_privilege(int4, varchar, varchar) -> boolean")]
208fn has_function_privilege_3(user_id: i32, function_name: &str, privileges: &str) -> Result<bool> {
209    let user_name = get_user_name_by_id_captured(user_id)?;
210    let function_oid = get_function_id_by_name_captured(function_name)?;
211    has_function_privilege_impl(user_name.as_str(), function_oid, privileges)
212}
213
214#[capture_context(USER_INFO_READER)]
215fn has_privilege_impl(
216    user_info_reader: &UserInfoReader,
217    user_name: &str,
218    object: &OwnedGrantObject,
219    actions: &Vec<(Action, bool)>,
220) -> Result<bool> {
221    let user_info = &user_info_reader.read_guard();
222    let user_catalog = user_info
223        .get_user_by_name(user_name)
224        .ok_or(user_not_found_err(
225            format!("User {} not found", user_name).as_str(),
226        ))?;
227    if user_catalog.id == object.owner {
228        // if the user is the owner of the object, they have all privileges
229        return Ok(true);
230    }
231    Ok(user_catalog.check_privilege_with_grant_option(&object.object, actions))
232}
233
234#[capture_context(USER_INFO_READER)]
235fn get_user_name_by_id(user_info_reader: &UserInfoReader, user_id: i32) -> Result<String> {
236    let user_info = &user_info_reader.read_guard();
237    user_info
238        .get_user_name_by_id(user_id as u32)
239        .ok_or(user_not_found_err(
240            format!("User {} not found", user_id).as_str(),
241        ))
242}
243
244#[capture_context(CATALOG_READER, DB_NAME)]
245fn get_grant_object_by_oid(
246    catalog_reader: &CatalogReader,
247    db_name: &str,
248    oid: ObjectId,
249) -> Result<OwnedGrantObject> {
250    catalog_reader
251        .read_guard()
252        .get_database_by_name(db_name)
253        .map_err(|e| ExprError::InvalidParam {
254            name: "oid",
255            reason: e.to_report_string().into(),
256        })?
257        .get_grant_object_by_oid(oid)
258        .ok_or(ExprError::InvalidParam {
259            name: "oid",
260            reason: format!("Table {} not found", oid).as_str().into(),
261        })
262}
263
264#[capture_context(CATALOG_READER)]
265fn get_database_id_by_name(catalog_reader: &CatalogReader, db_name: &str) -> Result<DatabaseId> {
266    let reader = &catalog_reader.read_guard();
267    Ok(reader
268        .get_database_by_name(db_name)
269        .map_err(|e| ExprError::InvalidParam {
270            name: "database",
271            reason: e.to_report_string().into(),
272        })?
273        .id())
274}
275
276#[capture_context(CATALOG_READER)]
277fn get_database_owner_by_id(
278    catalog_reader: &CatalogReader,
279    database_id: DatabaseId,
280) -> Result<i32> {
281    let reader = &catalog_reader.read_guard();
282    let database = reader
283        .get_database_by_id(database_id)
284        .map_err(|e| ExprError::InvalidParam {
285            name: "database",
286            reason: e.to_report_string().into(),
287        })?;
288    Ok(database.owner as i32)
289}
290
291#[capture_context(CATALOG_READER, DB_NAME)]
292fn get_schema_owner_by_id(
293    catalog_reader: &CatalogReader,
294    db_name: &str,
295    schema_id: SchemaId,
296) -> Result<i32> {
297    let reader = &catalog_reader.read_guard();
298    let db_id = reader
299        .get_database_by_name(db_name)
300        .map_err(|e| ExprError::InvalidParam {
301            name: "database",
302            reason: e.to_report_string().into(),
303        })?
304        .id();
305    Ok(reader
306        .get_schema_by_id(db_id, schema_id)
307        .map_err(|e| ExprError::InvalidParam {
308            name: "schema",
309            reason: e.to_report_string().into(),
310        })?
311        .owner as i32)
312}
313
314#[capture_context(CATALOG_READER, DB_NAME)]
315fn get_schema_id_by_name(
316    catalog_reader: &CatalogReader,
317    db_name: &str,
318    schema_name: &str,
319) -> Result<SchemaId> {
320    let reader = &catalog_reader.read_guard();
321    Ok(reader
322        .get_schema_by_name(db_name, schema_name)
323        .map_err(|e| ExprError::InvalidParam {
324            name: "schema",
325            reason: e.to_report_string().into(),
326        })?
327        .id())
328}
329
330#[capture_context(CATALOG_READER, DB_NAME, AUTH_CONTEXT, SEARCH_PATH)]
331fn get_function_id_by_name(
332    catalog_reader: &CatalogReader,
333    db_name: &str,
334    auth_context: &AuthContext,
335    search_path: &SearchPath,
336    function_name: &str,
337) -> Result<FunctionId> {
338    let desc =
339        Parser::parse_function_desc_str(function_name).map_err(|e| ExprError::InvalidParam {
340            name: "function",
341            reason: e.to_report_string().into(),
342        })?;
343    let mut arg_types = vec![];
344    if let Some(args) = desc.args {
345        for arg in &args {
346            arg_types.push(bind_data_type(&arg.data_type).map_err(|e| {
347                ExprError::InvalidParam {
348                    name: "function",
349                    reason: e.to_report_string().into(),
350                }
351            })?);
352        }
353    }
354
355    let (schema, name) =
356        Binder::resolve_schema_qualified_name(db_name, &desc.name).map_err(|e| {
357            ExprError::InvalidParam {
358                name: "function",
359                reason: e.to_report_string().into(),
360            }
361        })?;
362    let schema_path = SchemaPath::new(schema.as_deref(), search_path, &auth_context.user_name);
363
364    let reader = &catalog_reader.read_guard();
365    Ok(reader
366        .get_function_by_name_args(db_name, schema_path, &name, &arg_types)
367        .map_err(|e| ExprError::InvalidParam {
368            name: "function",
369            reason: e.to_report_string().into(),
370        })?
371        .0
372        .id)
373}
374
375fn parse_privilege(
376    privilege_string: &str,
377    allowed_actions: &HashSet<Action>,
378) -> Result<Vec<(Action, bool)>> {
379    let mut privileges = Vec::new();
380    for part in privilege_string.split(',').map(str::trim) {
381        let (privilege_type, grant_option) = match part.rsplit_once(" WITH GRANT OPTION") {
382            Some((p, _)) => (p, true),
383            None => (part, false),
384        };
385        match Action::from_str_name(privilege_type.to_uppercase().as_str()) {
386            Some(Action::Unspecified) | None => {
387                return Err(ExprError::InvalidParam {
388                    name: "privilege",
389                    reason: format!("unrecognized privilege type: \"{}\"", part).into(),
390                });
391            }
392            Some(action) => {
393                if allowed_actions.is_empty() || allowed_actions.contains(&action) {
394                    privileges.push((action, grant_option))
395                } else {
396                    return Err(ExprError::InvalidParam {
397                        name: "privilege",
398                        reason: format!("unrecognized privilege type: \"{}\"", part).into(),
399                    });
400                }
401            }
402        }
403    }
404    Ok(privileges)
405}