risingwave_frontend/expr/function_impl/
has_privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::session_config::SearchPath;
18use risingwave_expr::{ExprError, Result, capture_context, function};
19use risingwave_pb::user::Action;
20use risingwave_sqlparser::parser::Parser;
21use thiserror_ext::AsReport;
22
23use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER};
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::system_catalog::is_system_catalog;
26use crate::catalog::{CatalogReader, DatabaseId, OwnedGrantObject, SchemaId};
27use crate::session::AuthContext;
28use crate::user::user_service::UserInfoReader;
29use crate::{Binder, bind_data_type};
30
31#[inline(always)]
32pub fn user_not_found_err(inner_err: &str) -> ExprError {
33    ExprError::InvalidParam {
34        name: "user",
35        reason: inner_err.into(),
36    }
37}
38
39#[function("has_table_privilege(int4, int4, varchar) -> boolean")]
40fn has_table_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
41    // does user have privilege for table
42    let user_name = get_user_name_by_id_captured(user_id)?;
43    has_table_privilege_1(user_name.as_str(), table_oid, privileges)
44}
45
46#[function("has_table_privilege(varchar, int4, varchar) -> boolean")]
47fn has_table_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
48    let allowed_actions = HashSet::new();
49    let actions = parse_privilege(privileges, &allowed_actions)?;
50    if is_system_catalog(table_oid as _) {
51        return Ok(true);
52    }
53    has_privilege_impl_captured(
54        user_name,
55        &get_grant_object_by_oid_captured(table_oid)?,
56        &actions,
57    )
58}
59
60#[function("has_any_column_privilege(int4, int4, varchar) -> boolean")]
61fn has_any_column_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
62    // does user have privilege for any column of table
63    let user_name = get_user_name_by_id_captured(user_id)?;
64    has_any_column_privilege_1(user_name.as_str(), table_oid, privileges)
65}
66
67#[function("has_any_column_privilege(varchar, int4, varchar) -> boolean")]
68fn has_any_column_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
69    let allowed_actions = HashSet::from_iter([Action::Select, Action::Insert, Action::Update]);
70    let actions = parse_privilege(privileges, &allowed_actions)?;
71    has_privilege_impl_captured(
72        user_name,
73        &get_grant_object_by_oid_captured(table_oid)?,
74        &actions,
75    )
76}
77
78#[function("has_database_privilege(varchar, int4, varchar) -> boolean")]
79fn has_database_privilege(user_name: &str, database_oid: i32, privileges: &str) -> Result<bool> {
80    has_database_privilege_impl(user_name, DatabaseId::new(database_oid as _), privileges)
81}
82
83fn has_database_privilege_impl(
84    user_name: &str,
85    database_oid: DatabaseId,
86    privileges: &str,
87) -> Result<bool> {
88    // does user have privilege for database
89    let database_owner = get_database_owner_by_id_captured(database_oid)?;
90    let allowed_actions = HashSet::from_iter([Action::Create, Action::Connect]);
91    let actions = parse_privilege(privileges, &allowed_actions)?;
92    has_privilege_impl_captured(
93        user_name,
94        &OwnedGrantObject {
95            object: database_oid.into(),
96            owner: database_owner as u32,
97        },
98        &actions,
99    )
100}
101
102#[function("has_database_privilege(int4, varchar, varchar) -> boolean")]
103fn has_database_privilege_1(user_id: i32, database_name: &str, privileges: &str) -> Result<bool> {
104    let user_name = get_user_name_by_id_captured(user_id)?;
105    let database_oid = get_database_id_by_name_captured(database_name)?;
106    has_database_privilege_impl(user_name.as_str(), database_oid, privileges)
107}
108
109#[function("has_database_privilege(int4, int4, varchar) -> boolean")]
110fn has_database_privilege_2(user_id: i32, database_oid: i32, privileges: &str) -> Result<bool> {
111    let user_name = get_user_name_by_id_captured(user_id)?;
112    has_database_privilege(user_name.as_str(), database_oid, privileges)
113}
114
115#[function("has_database_privilege(varchar, varchar, varchar) -> boolean")]
116fn has_database_privilege_3(
117    user_name: &str,
118    database_name: &str,
119    privileges: &str,
120) -> Result<bool> {
121    let database_oid = get_database_id_by_name_captured(database_name)?;
122    has_database_privilege_impl(user_name, database_oid, privileges)
123}
124
125#[function("has_schema_privilege(varchar, int4, varchar) -> boolean")]
126fn has_schema_privilege(user_name: &str, schema_oid: i32, privileges: &str) -> Result<bool> {
127    has_schema_privilege_impl(user_name, SchemaId::new(schema_oid as _), privileges)
128}
129
130fn has_schema_privilege_impl(
131    user_name: &str,
132    schema_oid: SchemaId,
133    privileges: &str,
134) -> Result<bool> {
135    // does user have privilege for schema
136    let schema_owner = get_schema_owner_by_id_captured(schema_oid)?;
137    let allowed_actions = HashSet::from_iter([Action::Create, Action::Usage]);
138    let actions = parse_privilege(privileges, &allowed_actions)?;
139    has_privilege_impl_captured(
140        user_name,
141        &OwnedGrantObject {
142            object: schema_oid.into(),
143            owner: schema_owner as u32,
144        },
145        &actions,
146    )
147}
148
149#[function("has_schema_privilege(int4, varchar, varchar) -> boolean")]
150fn has_schema_privilege_1(user_id: i32, schema_name: &str, privileges: &str) -> Result<bool> {
151    let user_name = get_user_name_by_id_captured(user_id)?;
152    let schema_oid = get_schema_id_by_name_captured(schema_name)?;
153    has_schema_privilege_impl(user_name.as_str(), schema_oid, privileges)
154}
155
156#[function("has_schema_privilege(int4, int4, varchar) -> boolean")]
157fn has_schema_privilege_2(user_id: i32, schema_oid: i32, privileges: &str) -> Result<bool> {
158    let user_name = get_user_name_by_id_captured(user_id)?;
159    has_schema_privilege(user_name.as_str(), schema_oid, privileges)
160}
161
162#[function("has_schema_privilege(varchar, varchar, varchar) -> boolean")]
163fn has_schema_privilege_3(user_name: &str, schema_name: &str, privileges: &str) -> Result<bool> {
164    let schema_oid = get_schema_id_by_name_captured(schema_name)?;
165    has_schema_privilege_impl(user_name, schema_oid, privileges)
166}
167
168#[function("has_function_privilege(varchar, int4, varchar) -> boolean")]
169fn has_function_privilege(user_name: &str, function_oid: i32, privileges: &str) -> Result<bool> {
170    // does user have privilege for function
171    let func_obj = get_grant_object_by_oid_captured(function_oid)?;
172    let allowed_actions = HashSet::from_iter([Action::Execute]);
173    let actions = parse_privilege(privileges, &allowed_actions)?;
174    has_privilege_impl_captured(user_name, &func_obj, &actions)
175}
176
177#[function("has_function_privilege(int4, int4, varchar) -> boolean")]
178fn has_function_privilege_1(user_id: i32, function_oid: i32, privileges: &str) -> Result<bool> {
179    let user_name = get_user_name_by_id_captured(user_id)?;
180    has_function_privilege(user_name.as_str(), function_oid, privileges)
181}
182
183#[function("has_function_privilege(varchar, varchar, varchar) -> boolean")]
184fn has_function_privilege_2(
185    user_name: &str,
186    function_name: &str,
187    privileges: &str,
188) -> Result<bool> {
189    let function_oid = get_function_id_by_name_captured(function_name)?;
190    has_function_privilege(user_name, function_oid, privileges)
191}
192
193#[function("has_function_privilege(int4, varchar, varchar) -> boolean")]
194fn has_function_privilege_3(user_id: i32, function_name: &str, privileges: &str) -> Result<bool> {
195    let user_name = get_user_name_by_id_captured(user_id)?;
196    let function_oid = get_function_id_by_name_captured(function_name)?;
197    has_function_privilege(user_name.as_str(), function_oid, privileges)
198}
199
200#[capture_context(USER_INFO_READER)]
201fn has_privilege_impl(
202    user_info_reader: &UserInfoReader,
203    user_name: &str,
204    object: &OwnedGrantObject,
205    actions: &Vec<(Action, bool)>,
206) -> Result<bool> {
207    let user_info = &user_info_reader.read_guard();
208    let user_catalog = user_info
209        .get_user_by_name(user_name)
210        .ok_or(user_not_found_err(
211            format!("User {} not found", user_name).as_str(),
212        ))?;
213    if user_catalog.id == object.owner {
214        // if the user is the owner of the object, they have all privileges
215        return Ok(true);
216    }
217    Ok(user_catalog.check_privilege_with_grant_option(&object.object, actions))
218}
219
220#[capture_context(USER_INFO_READER)]
221fn get_user_name_by_id(user_info_reader: &UserInfoReader, user_id: i32) -> Result<String> {
222    let user_info = &user_info_reader.read_guard();
223    user_info
224        .get_user_name_by_id(user_id as u32)
225        .ok_or(user_not_found_err(
226            format!("User {} not found", user_id).as_str(),
227        ))
228}
229
230#[capture_context(CATALOG_READER, DB_NAME)]
231fn get_grant_object_by_oid(
232    catalog_reader: &CatalogReader,
233    db_name: &str,
234    oid: i32,
235) -> Result<OwnedGrantObject> {
236    catalog_reader
237        .read_guard()
238        .get_database_by_name(db_name)
239        .map_err(|e| ExprError::InvalidParam {
240            name: "oid",
241            reason: e.to_report_string().into(),
242        })?
243        .get_grant_object_by_oid(oid as u32)
244        .ok_or(ExprError::InvalidParam {
245            name: "oid",
246            reason: format!("Table {} not found", oid).as_str().into(),
247        })
248}
249
250#[capture_context(CATALOG_READER)]
251fn get_database_id_by_name(catalog_reader: &CatalogReader, db_name: &str) -> Result<DatabaseId> {
252    let reader = &catalog_reader.read_guard();
253    Ok(reader
254        .get_database_by_name(db_name)
255        .map_err(|e| ExprError::InvalidParam {
256            name: "database",
257            reason: e.to_report_string().into(),
258        })?
259        .id())
260}
261
262#[capture_context(CATALOG_READER)]
263fn get_database_owner_by_id(
264    catalog_reader: &CatalogReader,
265    database_id: DatabaseId,
266) -> Result<i32> {
267    let reader = &catalog_reader.read_guard();
268    let database = reader
269        .get_database_by_id(database_id)
270        .map_err(|e| ExprError::InvalidParam {
271            name: "database",
272            reason: e.to_report_string().into(),
273        })?;
274    Ok(database.owner as i32)
275}
276
277#[capture_context(CATALOG_READER, DB_NAME)]
278fn get_schema_owner_by_id(
279    catalog_reader: &CatalogReader,
280    db_name: &str,
281    schema_id: SchemaId,
282) -> Result<i32> {
283    let reader = &catalog_reader.read_guard();
284    let db_id = reader
285        .get_database_by_name(db_name)
286        .map_err(|e| ExprError::InvalidParam {
287            name: "database",
288            reason: e.to_report_string().into(),
289        })?
290        .id();
291    Ok(reader
292        .get_schema_by_id(db_id, schema_id)
293        .map_err(|e| ExprError::InvalidParam {
294            name: "schema",
295            reason: e.to_report_string().into(),
296        })?
297        .owner as i32)
298}
299
300#[capture_context(CATALOG_READER, DB_NAME)]
301fn get_schema_id_by_name(
302    catalog_reader: &CatalogReader,
303    db_name: &str,
304    schema_name: &str,
305) -> Result<SchemaId> {
306    let reader = &catalog_reader.read_guard();
307    Ok(reader
308        .get_schema_by_name(db_name, schema_name)
309        .map_err(|e| ExprError::InvalidParam {
310            name: "schema",
311            reason: e.to_report_string().into(),
312        })?
313        .id())
314}
315
316#[capture_context(CATALOG_READER, DB_NAME, AUTH_CONTEXT, SEARCH_PATH)]
317fn get_function_id_by_name(
318    catalog_reader: &CatalogReader,
319    db_name: &str,
320    auth_context: &AuthContext,
321    search_path: &SearchPath,
322    function_name: &str,
323) -> Result<i32> {
324    let desc =
325        Parser::parse_function_desc_str(function_name).map_err(|e| ExprError::InvalidParam {
326            name: "function",
327            reason: e.to_report_string().into(),
328        })?;
329    let mut arg_types = vec![];
330    if let Some(args) = desc.args {
331        for arg in &args {
332            arg_types.push(bind_data_type(&arg.data_type).map_err(|e| {
333                ExprError::InvalidParam {
334                    name: "function",
335                    reason: e.to_report_string().into(),
336                }
337            })?);
338        }
339    }
340
341    let (schema, name) =
342        Binder::resolve_schema_qualified_name(db_name, &desc.name).map_err(|e| {
343            ExprError::InvalidParam {
344                name: "function",
345                reason: e.to_report_string().into(),
346            }
347        })?;
348    let schema_path = SchemaPath::new(schema.as_deref(), search_path, &auth_context.user_name);
349
350    let reader = &catalog_reader.read_guard();
351    Ok(reader
352        .get_function_by_name_args(db_name, schema_path, &name, &arg_types)
353        .map_err(|e| ExprError::InvalidParam {
354            name: "function",
355            reason: e.to_report_string().into(),
356        })?
357        .0
358        .id
359        .function_id() as i32)
360}
361
362fn parse_privilege(
363    privilege_string: &str,
364    allowed_actions: &HashSet<Action>,
365) -> Result<Vec<(Action, bool)>> {
366    let mut privileges = Vec::new();
367    for part in privilege_string.split(',').map(str::trim) {
368        let (privilege_type, grant_option) = match part.rsplit_once(" WITH GRANT OPTION") {
369            Some((p, _)) => (p, true),
370            None => (part, false),
371        };
372        match Action::from_str_name(privilege_type.to_uppercase().as_str()) {
373            Some(Action::Unspecified) | None => {
374                return Err(ExprError::InvalidParam {
375                    name: "privilege",
376                    reason: format!("unrecognized privilege type: \"{}\"", part).into(),
377                });
378            }
379            Some(action) => {
380                if allowed_actions.is_empty() || allowed_actions.contains(&action) {
381                    privileges.push((action, grant_option))
382                } else {
383                    return Err(ExprError::InvalidParam {
384                        name: "privilege",
385                        reason: format!("unrecognized privilege type: \"{}\"", part).into(),
386                    });
387                }
388            }
389        }
390    }
391    Ok(privileges)
392}