1use std::collections::HashSet;
16
17use risingwave_common::session_config::SearchPath;
18use risingwave_expr::{ExprError, Result, capture_context, function};
19use risingwave_pb::user::Action;
20use risingwave_pb::user::grant_privilege::Object;
21use risingwave_sqlparser::parser::Parser;
22use thiserror_ext::AsReport;
23
24use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER};
25use crate::catalog::root_catalog::SchemaPath;
26use crate::catalog::system_catalog::is_system_catalog;
27use crate::catalog::{CatalogReader, DatabaseId, OwnedGrantObject, SchemaId};
28use crate::session::AuthContext;
29use crate::user::user_service::UserInfoReader;
30use crate::{Binder, bind_data_type};
31
32#[inline(always)]
33pub fn user_not_found_err(inner_err: &str) -> ExprError {
34 ExprError::InvalidParam {
35 name: "user",
36 reason: inner_err.into(),
37 }
38}
39
40#[function("has_table_privilege(int4, int4, varchar) -> boolean")]
41fn has_table_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
42 let user_name = get_user_name_by_id_captured(user_id)?;
44 has_table_privilege_1(user_name.as_str(), table_oid, privileges)
45}
46
47#[function("has_table_privilege(varchar, int4, varchar) -> boolean")]
48fn has_table_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
49 let allowed_actions = HashSet::new();
50 let actions = parse_privilege(privileges, &allowed_actions)?;
51 if is_system_catalog(table_oid as _) {
52 return Ok(true);
53 }
54 has_privilege_impl_captured(
55 user_name,
56 &get_grant_object_by_oid_captured(table_oid)?,
57 &actions,
58 )
59}
60
61#[function("has_any_column_privilege(int4, int4, varchar) -> boolean")]
62fn has_any_column_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
63 let user_name = get_user_name_by_id_captured(user_id)?;
65 has_any_column_privilege_1(user_name.as_str(), table_oid, privileges)
66}
67
68#[function("has_any_column_privilege(varchar, int4, varchar) -> boolean")]
69fn has_any_column_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
70 let allowed_actions = HashSet::from_iter([Action::Select, Action::Insert, Action::Update]);
71 let actions = parse_privilege(privileges, &allowed_actions)?;
72 has_privilege_impl_captured(
73 user_name,
74 &get_grant_object_by_oid_captured(table_oid)?,
75 &actions,
76 )
77}
78
79#[function("has_database_privilege(varchar, int4, varchar) -> boolean")]
80fn has_database_privilege(user_name: &str, database_oid: i32, privileges: &str) -> Result<bool> {
81 let database_owner = get_database_owner_by_id_captured(database_oid as DatabaseId)?;
83 let allowed_actions = HashSet::from_iter([Action::Create, Action::Connect]);
84 let actions = parse_privilege(privileges, &allowed_actions)?;
85 has_privilege_impl_captured(
86 user_name,
87 &OwnedGrantObject {
88 object: Object::DatabaseId(database_oid as u32),
89 owner: database_owner as u32,
90 },
91 &actions,
92 )
93}
94
95#[function("has_database_privilege(int4, varchar, varchar) -> boolean")]
96fn has_database_privilege_1(user_id: i32, database_name: &str, privileges: &str) -> Result<bool> {
97 let user_name = get_user_name_by_id_captured(user_id)?;
98 let database_oid = get_database_id_by_name_captured(database_name)?;
99 has_database_privilege(user_name.as_str(), database_oid, privileges)
100}
101
102#[function("has_database_privilege(int4, int4, varchar) -> boolean")]
103fn has_database_privilege_2(user_id: i32, database_oid: i32, privileges: &str) -> Result<bool> {
104 let user_name = get_user_name_by_id_captured(user_id)?;
105 has_database_privilege(user_name.as_str(), database_oid, privileges)
106}
107
108#[function("has_database_privilege(varchar, varchar, varchar) -> boolean")]
109fn has_database_privilege_3(
110 user_name: &str,
111 database_name: &str,
112 privileges: &str,
113) -> Result<bool> {
114 let database_oid = get_database_id_by_name_captured(database_name)?;
115 has_database_privilege(user_name, database_oid, privileges)
116}
117
118#[function("has_schema_privilege(varchar, int4, varchar) -> boolean")]
119fn has_schema_privilege(user_name: &str, schema_oid: i32, privileges: &str) -> Result<bool> {
120 let schema_owner = get_schema_owner_by_id_captured(schema_oid as SchemaId)?;
122 let allowed_actions = HashSet::from_iter([Action::Create, Action::Usage]);
123 let actions = parse_privilege(privileges, &allowed_actions)?;
124 has_privilege_impl_captured(
125 user_name,
126 &OwnedGrantObject {
127 object: Object::SchemaId(schema_oid as u32),
128 owner: schema_owner as u32,
129 },
130 &actions,
131 )
132}
133
134#[function("has_schema_privilege(int4, varchar, varchar) -> boolean")]
135fn has_schema_privilege_1(user_id: i32, schema_name: &str, privileges: &str) -> Result<bool> {
136 let user_name = get_user_name_by_id_captured(user_id)?;
137 let schema_oid = get_schema_id_by_name_captured(schema_name)?;
138 has_schema_privilege(user_name.as_str(), schema_oid, privileges)
139}
140
141#[function("has_schema_privilege(int4, int4, varchar) -> boolean")]
142fn has_schema_privilege_2(user_id: i32, schema_oid: i32, privileges: &str) -> Result<bool> {
143 let user_name = get_user_name_by_id_captured(user_id)?;
144 has_schema_privilege(user_name.as_str(), schema_oid, privileges)
145}
146
147#[function("has_schema_privilege(varchar, varchar, varchar) -> boolean")]
148fn has_schema_privilege_3(user_name: &str, schema_name: &str, privileges: &str) -> Result<bool> {
149 let schema_oid = get_schema_id_by_name_captured(schema_name)?;
150 has_schema_privilege(user_name, schema_oid, privileges)
151}
152
153#[function("has_function_privilege(varchar, int4, varchar) -> boolean")]
154fn has_function_privilege(user_name: &str, function_oid: i32, privileges: &str) -> Result<bool> {
155 let func_obj = get_grant_object_by_oid_captured(function_oid)?;
157 let allowed_actions = HashSet::from_iter([Action::Execute]);
158 let actions = parse_privilege(privileges, &allowed_actions)?;
159 has_privilege_impl_captured(user_name, &func_obj, &actions)
160}
161
162#[function("has_function_privilege(int4, int4, varchar) -> boolean")]
163fn has_function_privilege_1(user_id: i32, function_oid: i32, privileges: &str) -> Result<bool> {
164 let user_name = get_user_name_by_id_captured(user_id)?;
165 has_function_privilege(user_name.as_str(), function_oid, privileges)
166}
167
168#[function("has_function_privilege(varchar, varchar, varchar) -> boolean")]
169fn has_function_privilege_2(
170 user_name: &str,
171 function_name: &str,
172 privileges: &str,
173) -> Result<bool> {
174 let function_oid = get_function_id_by_name_captured(function_name)?;
175 has_function_privilege(user_name, function_oid, privileges)
176}
177
178#[function("has_function_privilege(int4, varchar, varchar) -> boolean")]
179fn has_function_privilege_3(user_id: i32, function_name: &str, privileges: &str) -> Result<bool> {
180 let user_name = get_user_name_by_id_captured(user_id)?;
181 let function_oid = get_function_id_by_name_captured(function_name)?;
182 has_function_privilege(user_name.as_str(), function_oid, privileges)
183}
184
185#[capture_context(USER_INFO_READER)]
186fn has_privilege_impl(
187 user_info_reader: &UserInfoReader,
188 user_name: &str,
189 object: &OwnedGrantObject,
190 actions: &Vec<(Action, bool)>,
191) -> Result<bool> {
192 let user_info = &user_info_reader.read_guard();
193 let user_catalog = user_info
194 .get_user_by_name(user_name)
195 .ok_or(user_not_found_err(
196 format!("User {} not found", user_name).as_str(),
197 ))?;
198 if user_catalog.id == object.owner {
199 return Ok(true);
201 }
202 Ok(user_catalog.check_privilege_with_grant_option(&object.object, actions))
203}
204
205#[capture_context(USER_INFO_READER)]
206fn get_user_name_by_id(user_info_reader: &UserInfoReader, user_id: i32) -> Result<String> {
207 let user_info = &user_info_reader.read_guard();
208 user_info
209 .get_user_name_by_id(user_id as u32)
210 .ok_or(user_not_found_err(
211 format!("User {} not found", user_id).as_str(),
212 ))
213}
214
215#[capture_context(CATALOG_READER, DB_NAME)]
216fn get_grant_object_by_oid(
217 catalog_reader: &CatalogReader,
218 db_name: &str,
219 oid: i32,
220) -> Result<OwnedGrantObject> {
221 catalog_reader
222 .read_guard()
223 .get_database_by_name(db_name)
224 .map_err(|e| ExprError::InvalidParam {
225 name: "oid",
226 reason: e.to_report_string().into(),
227 })?
228 .get_grant_object_by_oid(oid as u32)
229 .ok_or(ExprError::InvalidParam {
230 name: "oid",
231 reason: format!("Table {} not found", oid).as_str().into(),
232 })
233}
234
235#[capture_context(CATALOG_READER)]
236fn get_database_id_by_name(catalog_reader: &CatalogReader, db_name: &str) -> Result<i32> {
237 let reader = &catalog_reader.read_guard();
238 Ok(reader
239 .get_database_by_name(db_name)
240 .map_err(|e| ExprError::InvalidParam {
241 name: "database",
242 reason: e.to_report_string().into(),
243 })?
244 .id() as i32)
245}
246
247#[capture_context(CATALOG_READER)]
248fn get_database_owner_by_id(
249 catalog_reader: &CatalogReader,
250 database_id: DatabaseId,
251) -> Result<i32> {
252 let reader = &catalog_reader.read_guard();
253 let database =
254 reader
255 .get_database_by_id(&database_id)
256 .map_err(|e| ExprError::InvalidParam {
257 name: "database",
258 reason: e.to_report_string().into(),
259 })?;
260 Ok(database.owner as i32)
261}
262
263#[capture_context(CATALOG_READER, DB_NAME)]
264fn get_schema_owner_by_id(
265 catalog_reader: &CatalogReader,
266 db_name: &str,
267 schema_id: SchemaId,
268) -> Result<i32> {
269 let reader = &catalog_reader.read_guard();
270 let db_id = reader
271 .get_database_by_name(db_name)
272 .map_err(|e| ExprError::InvalidParam {
273 name: "database",
274 reason: e.to_report_string().into(),
275 })?
276 .id();
277 Ok(reader
278 .get_schema_by_id(&db_id, &schema_id)
279 .map_err(|e| ExprError::InvalidParam {
280 name: "schema",
281 reason: e.to_report_string().into(),
282 })?
283 .owner as i32)
284}
285
286#[capture_context(CATALOG_READER, DB_NAME)]
287fn get_schema_id_by_name(
288 catalog_reader: &CatalogReader,
289 db_name: &str,
290 schema_name: &str,
291) -> Result<i32> {
292 let reader = &catalog_reader.read_guard();
293 Ok(reader
294 .get_schema_by_name(db_name, schema_name)
295 .map_err(|e| ExprError::InvalidParam {
296 name: "schema",
297 reason: e.to_report_string().into(),
298 })?
299 .id() as i32)
300}
301
302#[capture_context(CATALOG_READER, DB_NAME, AUTH_CONTEXT, SEARCH_PATH)]
303fn get_function_id_by_name(
304 catalog_reader: &CatalogReader,
305 db_name: &str,
306 auth_context: &AuthContext,
307 search_path: &SearchPath,
308 function_name: &str,
309) -> Result<i32> {
310 let desc =
311 Parser::parse_function_desc_str(function_name).map_err(|e| ExprError::InvalidParam {
312 name: "function",
313 reason: e.to_report_string().into(),
314 })?;
315 let mut arg_types = vec![];
316 if let Some(args) = desc.args {
317 for arg in &args {
318 arg_types.push(bind_data_type(&arg.data_type).map_err(|e| {
319 ExprError::InvalidParam {
320 name: "function",
321 reason: e.to_report_string().into(),
322 }
323 })?);
324 }
325 }
326
327 let (schema, name) =
328 Binder::resolve_schema_qualified_name(db_name, desc.name).map_err(|e| {
329 ExprError::InvalidParam {
330 name: "function",
331 reason: e.to_report_string().into(),
332 }
333 })?;
334 let schema_path = SchemaPath::new(schema.as_deref(), search_path, &auth_context.user_name);
335
336 let reader = &catalog_reader.read_guard();
337 Ok(reader
338 .get_function_by_name_args(db_name, schema_path, &name, &arg_types)
339 .map_err(|e| ExprError::InvalidParam {
340 name: "function",
341 reason: e.to_report_string().into(),
342 })?
343 .0
344 .id
345 .function_id() as i32)
346}
347
348fn parse_privilege(
349 privilege_string: &str,
350 allowed_actions: &HashSet<Action>,
351) -> Result<Vec<(Action, bool)>> {
352 let mut privileges = Vec::new();
353 for part in privilege_string.split(',').map(str::trim) {
354 let (privilege_type, grant_option) = match part.rsplit_once(" WITH GRANT OPTION") {
355 Some((p, _)) => (p, true),
356 None => (part, false),
357 };
358 match Action::from_str_name(privilege_type.to_uppercase().as_str()) {
359 Some(Action::Unspecified) | None => {
360 return Err(ExprError::InvalidParam {
361 name: "privilege",
362 reason: format!("unrecognized privilege type: \"{}\"", part).into(),
363 });
364 }
365 Some(action) => {
366 if allowed_actions.is_empty() || allowed_actions.contains(&action) {
367 privileges.push((action, grant_option))
368 } else {
369 return Err(ExprError::InvalidParam {
370 name: "privilege",
371 reason: format!("unrecognized privilege type: \"{}\"", part).into(),
372 });
373 }
374 }
375 }
376 }
377 Ok(privileges)
378}