risingwave_frontend/expr/function_impl/
has_privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::session_config::SearchPath;
18use risingwave_expr::{ExprError, Result, capture_context, function};
19use risingwave_pb::user::Action;
20use risingwave_pb::user::grant_privilege::Object;
21use risingwave_sqlparser::parser::Parser;
22use thiserror_ext::AsReport;
23
24use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER};
25use crate::catalog::root_catalog::SchemaPath;
26use crate::catalog::system_catalog::is_system_catalog;
27use crate::catalog::{CatalogReader, DatabaseId, OwnedGrantObject, SchemaId};
28use crate::session::AuthContext;
29use crate::user::user_service::UserInfoReader;
30use crate::{Binder, bind_data_type};
31
32#[inline(always)]
33pub fn user_not_found_err(inner_err: &str) -> ExprError {
34    ExprError::InvalidParam {
35        name: "user",
36        reason: inner_err.into(),
37    }
38}
39
40#[function("has_table_privilege(int4, int4, varchar) -> boolean")]
41fn has_table_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
42    // does user have privilege for table
43    let user_name = get_user_name_by_id_captured(user_id)?;
44    has_table_privilege_1(user_name.as_str(), table_oid, privileges)
45}
46
47#[function("has_table_privilege(varchar, int4, varchar) -> boolean")]
48fn has_table_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
49    let allowed_actions = HashSet::new();
50    let actions = parse_privilege(privileges, &allowed_actions)?;
51    if is_system_catalog(table_oid as _) {
52        return Ok(true);
53    }
54    has_privilege_impl_captured(
55        user_name,
56        &get_grant_object_by_oid_captured(table_oid)?,
57        &actions,
58    )
59}
60
61#[function("has_any_column_privilege(int4, int4, varchar) -> boolean")]
62fn has_any_column_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
63    // does user have privilege for any column of table
64    let user_name = get_user_name_by_id_captured(user_id)?;
65    has_any_column_privilege_1(user_name.as_str(), table_oid, privileges)
66}
67
68#[function("has_any_column_privilege(varchar, int4, varchar) -> boolean")]
69fn has_any_column_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
70    let allowed_actions = HashSet::from_iter([Action::Select, Action::Insert, Action::Update]);
71    let actions = parse_privilege(privileges, &allowed_actions)?;
72    has_privilege_impl_captured(
73        user_name,
74        &get_grant_object_by_oid_captured(table_oid)?,
75        &actions,
76    )
77}
78
79#[function("has_database_privilege(varchar, int4, varchar) -> boolean")]
80fn has_database_privilege(user_name: &str, database_oid: i32, privileges: &str) -> Result<bool> {
81    // does user have privilege for database
82    let database_owner = get_database_owner_by_id_captured(database_oid as DatabaseId)?;
83    let allowed_actions = HashSet::from_iter([Action::Create, Action::Connect]);
84    let actions = parse_privilege(privileges, &allowed_actions)?;
85    has_privilege_impl_captured(
86        user_name,
87        &OwnedGrantObject {
88            object: Object::DatabaseId(database_oid as u32),
89            owner: database_owner as u32,
90        },
91        &actions,
92    )
93}
94
95#[function("has_database_privilege(int4, varchar, varchar) -> boolean")]
96fn has_database_privilege_1(user_id: i32, database_name: &str, privileges: &str) -> Result<bool> {
97    let user_name = get_user_name_by_id_captured(user_id)?;
98    let database_oid = get_database_id_by_name_captured(database_name)?;
99    has_database_privilege(user_name.as_str(), database_oid, privileges)
100}
101
102#[function("has_database_privilege(int4, int4, varchar) -> boolean")]
103fn has_database_privilege_2(user_id: i32, database_oid: i32, privileges: &str) -> Result<bool> {
104    let user_name = get_user_name_by_id_captured(user_id)?;
105    has_database_privilege(user_name.as_str(), database_oid, privileges)
106}
107
108#[function("has_database_privilege(varchar, varchar, varchar) -> boolean")]
109fn has_database_privilege_3(
110    user_name: &str,
111    database_name: &str,
112    privileges: &str,
113) -> Result<bool> {
114    let database_oid = get_database_id_by_name_captured(database_name)?;
115    has_database_privilege(user_name, database_oid, privileges)
116}
117
118#[function("has_schema_privilege(varchar, int4, varchar) -> boolean")]
119fn has_schema_privilege(user_name: &str, schema_oid: i32, privileges: &str) -> Result<bool> {
120    // does user have privilege for schema
121    let schema_owner = get_schema_owner_by_id_captured(schema_oid as SchemaId)?;
122    let allowed_actions = HashSet::from_iter([Action::Create, Action::Usage]);
123    let actions = parse_privilege(privileges, &allowed_actions)?;
124    has_privilege_impl_captured(
125        user_name,
126        &OwnedGrantObject {
127            object: Object::SchemaId(schema_oid as u32),
128            owner: schema_owner as u32,
129        },
130        &actions,
131    )
132}
133
134#[function("has_schema_privilege(int4, varchar, varchar) -> boolean")]
135fn has_schema_privilege_1(user_id: i32, schema_name: &str, privileges: &str) -> Result<bool> {
136    let user_name = get_user_name_by_id_captured(user_id)?;
137    let schema_oid = get_schema_id_by_name_captured(schema_name)?;
138    has_schema_privilege(user_name.as_str(), schema_oid, privileges)
139}
140
141#[function("has_schema_privilege(int4, int4, varchar) -> boolean")]
142fn has_schema_privilege_2(user_id: i32, schema_oid: i32, privileges: &str) -> Result<bool> {
143    let user_name = get_user_name_by_id_captured(user_id)?;
144    has_schema_privilege(user_name.as_str(), schema_oid, privileges)
145}
146
147#[function("has_schema_privilege(varchar, varchar, varchar) -> boolean")]
148fn has_schema_privilege_3(user_name: &str, schema_name: &str, privileges: &str) -> Result<bool> {
149    let schema_oid = get_schema_id_by_name_captured(schema_name)?;
150    has_schema_privilege(user_name, schema_oid, privileges)
151}
152
153#[function("has_function_privilege(varchar, int4, varchar) -> boolean")]
154fn has_function_privilege(user_name: &str, function_oid: i32, privileges: &str) -> Result<bool> {
155    // does user have privilege for function
156    let func_obj = get_grant_object_by_oid_captured(function_oid)?;
157    let allowed_actions = HashSet::from_iter([Action::Execute]);
158    let actions = parse_privilege(privileges, &allowed_actions)?;
159    has_privilege_impl_captured(user_name, &func_obj, &actions)
160}
161
162#[function("has_function_privilege(int4, int4, varchar) -> boolean")]
163fn has_function_privilege_1(user_id: i32, function_oid: i32, privileges: &str) -> Result<bool> {
164    let user_name = get_user_name_by_id_captured(user_id)?;
165    has_function_privilege(user_name.as_str(), function_oid, privileges)
166}
167
168#[function("has_function_privilege(varchar, varchar, varchar) -> boolean")]
169fn has_function_privilege_2(
170    user_name: &str,
171    function_name: &str,
172    privileges: &str,
173) -> Result<bool> {
174    let function_oid = get_function_id_by_name_captured(function_name)?;
175    has_function_privilege(user_name, function_oid, privileges)
176}
177
178#[function("has_function_privilege(int4, varchar, varchar) -> boolean")]
179fn has_function_privilege_3(user_id: i32, function_name: &str, privileges: &str) -> Result<bool> {
180    let user_name = get_user_name_by_id_captured(user_id)?;
181    let function_oid = get_function_id_by_name_captured(function_name)?;
182    has_function_privilege(user_name.as_str(), function_oid, privileges)
183}
184
185#[capture_context(USER_INFO_READER)]
186fn has_privilege_impl(
187    user_info_reader: &UserInfoReader,
188    user_name: &str,
189    object: &OwnedGrantObject,
190    actions: &Vec<(Action, bool)>,
191) -> Result<bool> {
192    let user_info = &user_info_reader.read_guard();
193    let user_catalog = user_info
194        .get_user_by_name(user_name)
195        .ok_or(user_not_found_err(
196            format!("User {} not found", user_name).as_str(),
197        ))?;
198    if user_catalog.id == object.owner {
199        // if the user is the owner of the object, they have all privileges
200        return Ok(true);
201    }
202    Ok(user_catalog.check_privilege_with_grant_option(&object.object, actions))
203}
204
205#[capture_context(USER_INFO_READER)]
206fn get_user_name_by_id(user_info_reader: &UserInfoReader, user_id: i32) -> Result<String> {
207    let user_info = &user_info_reader.read_guard();
208    user_info
209        .get_user_name_by_id(user_id as u32)
210        .ok_or(user_not_found_err(
211            format!("User {} not found", user_id).as_str(),
212        ))
213}
214
215#[capture_context(CATALOG_READER, DB_NAME)]
216fn get_grant_object_by_oid(
217    catalog_reader: &CatalogReader,
218    db_name: &str,
219    oid: i32,
220) -> Result<OwnedGrantObject> {
221    catalog_reader
222        .read_guard()
223        .get_database_by_name(db_name)
224        .map_err(|e| ExprError::InvalidParam {
225            name: "oid",
226            reason: e.to_report_string().into(),
227        })?
228        .get_grant_object_by_oid(oid as u32)
229        .ok_or(ExprError::InvalidParam {
230            name: "oid",
231            reason: format!("Table {} not found", oid).as_str().into(),
232        })
233}
234
235#[capture_context(CATALOG_READER)]
236fn get_database_id_by_name(catalog_reader: &CatalogReader, db_name: &str) -> Result<i32> {
237    let reader = &catalog_reader.read_guard();
238    Ok(reader
239        .get_database_by_name(db_name)
240        .map_err(|e| ExprError::InvalidParam {
241            name: "database",
242            reason: e.to_report_string().into(),
243        })?
244        .id() as i32)
245}
246
247#[capture_context(CATALOG_READER)]
248fn get_database_owner_by_id(
249    catalog_reader: &CatalogReader,
250    database_id: DatabaseId,
251) -> Result<i32> {
252    let reader = &catalog_reader.read_guard();
253    let database =
254        reader
255            .get_database_by_id(&database_id)
256            .map_err(|e| ExprError::InvalidParam {
257                name: "database",
258                reason: e.to_report_string().into(),
259            })?;
260    Ok(database.owner as i32)
261}
262
263#[capture_context(CATALOG_READER, DB_NAME)]
264fn get_schema_owner_by_id(
265    catalog_reader: &CatalogReader,
266    db_name: &str,
267    schema_id: SchemaId,
268) -> Result<i32> {
269    let reader = &catalog_reader.read_guard();
270    let db_id = reader
271        .get_database_by_name(db_name)
272        .map_err(|e| ExprError::InvalidParam {
273            name: "database",
274            reason: e.to_report_string().into(),
275        })?
276        .id();
277    Ok(reader
278        .get_schema_by_id(&db_id, &schema_id)
279        .map_err(|e| ExprError::InvalidParam {
280            name: "schema",
281            reason: e.to_report_string().into(),
282        })?
283        .owner as i32)
284}
285
286#[capture_context(CATALOG_READER, DB_NAME)]
287fn get_schema_id_by_name(
288    catalog_reader: &CatalogReader,
289    db_name: &str,
290    schema_name: &str,
291) -> Result<i32> {
292    let reader = &catalog_reader.read_guard();
293    Ok(reader
294        .get_schema_by_name(db_name, schema_name)
295        .map_err(|e| ExprError::InvalidParam {
296            name: "schema",
297            reason: e.to_report_string().into(),
298        })?
299        .id() as i32)
300}
301
302#[capture_context(CATALOG_READER, DB_NAME, AUTH_CONTEXT, SEARCH_PATH)]
303fn get_function_id_by_name(
304    catalog_reader: &CatalogReader,
305    db_name: &str,
306    auth_context: &AuthContext,
307    search_path: &SearchPath,
308    function_name: &str,
309) -> Result<i32> {
310    let desc =
311        Parser::parse_function_desc_str(function_name).map_err(|e| ExprError::InvalidParam {
312            name: "function",
313            reason: e.to_report_string().into(),
314        })?;
315    let mut arg_types = vec![];
316    if let Some(args) = desc.args {
317        for arg in &args {
318            arg_types.push(bind_data_type(&arg.data_type).map_err(|e| {
319                ExprError::InvalidParam {
320                    name: "function",
321                    reason: e.to_report_string().into(),
322                }
323            })?);
324        }
325    }
326
327    let (schema, name) =
328        Binder::resolve_schema_qualified_name(db_name, desc.name).map_err(|e| {
329            ExprError::InvalidParam {
330                name: "function",
331                reason: e.to_report_string().into(),
332            }
333        })?;
334    let schema_path = SchemaPath::new(schema.as_deref(), search_path, &auth_context.user_name);
335
336    let reader = &catalog_reader.read_guard();
337    Ok(reader
338        .get_function_by_name_args(db_name, schema_path, &name, &arg_types)
339        .map_err(|e| ExprError::InvalidParam {
340            name: "function",
341            reason: e.to_report_string().into(),
342        })?
343        .0
344        .id
345        .function_id() as i32)
346}
347
348fn parse_privilege(
349    privilege_string: &str,
350    allowed_actions: &HashSet<Action>,
351) -> Result<Vec<(Action, bool)>> {
352    let mut privileges = Vec::new();
353    for part in privilege_string.split(',').map(str::trim) {
354        let (privilege_type, grant_option) = match part.rsplit_once(" WITH GRANT OPTION") {
355            Some((p, _)) => (p, true),
356            None => (part, false),
357        };
358        match Action::from_str_name(privilege_type.to_uppercase().as_str()) {
359            Some(Action::Unspecified) | None => {
360                return Err(ExprError::InvalidParam {
361                    name: "privilege",
362                    reason: format!("unrecognized privilege type: \"{}\"", part).into(),
363                });
364            }
365            Some(action) => {
366                if allowed_actions.is_empty() || allowed_actions.contains(&action) {
367                    privileges.push((action, grant_option))
368                } else {
369                    return Err(ExprError::InvalidParam {
370                        name: "privilege",
371                        reason: format!("unrecognized privilege type: \"{}\"", part).into(),
372                    });
373                }
374            }
375        }
376    }
377    Ok(privileges)
378}