1use std::collections::HashSet;
16
17use risingwave_common::session_config::SearchPath;
18use risingwave_expr::{ExprError, Result, capture_context, function};
19use risingwave_pb::user::Action;
20use risingwave_sqlparser::parser::Parser;
21use thiserror_ext::AsReport;
22
23use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER};
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::system_catalog::is_system_catalog;
26use crate::catalog::{CatalogReader, DatabaseId, OwnedGrantObject, SchemaId};
27use crate::session::AuthContext;
28use crate::user::user_service::UserInfoReader;
29use crate::{Binder, bind_data_type};
30
31#[inline(always)]
32pub fn user_not_found_err(inner_err: &str) -> ExprError {
33 ExprError::InvalidParam {
34 name: "user",
35 reason: inner_err.into(),
36 }
37}
38
39#[function("has_table_privilege(int4, int4, varchar) -> boolean")]
40fn has_table_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
41 let user_name = get_user_name_by_id_captured(user_id)?;
43 has_table_privilege_1(user_name.as_str(), table_oid, privileges)
44}
45
46#[function("has_table_privilege(varchar, int4, varchar) -> boolean")]
47fn has_table_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
48 let allowed_actions = HashSet::new();
49 let actions = parse_privilege(privileges, &allowed_actions)?;
50 if is_system_catalog(table_oid as _) {
51 return Ok(true);
52 }
53 has_privilege_impl_captured(
54 user_name,
55 &get_grant_object_by_oid_captured(table_oid)?,
56 &actions,
57 )
58}
59
60#[function("has_any_column_privilege(int4, int4, varchar) -> boolean")]
61fn has_any_column_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result<bool> {
62 let user_name = get_user_name_by_id_captured(user_id)?;
64 has_any_column_privilege_1(user_name.as_str(), table_oid, privileges)
65}
66
67#[function("has_any_column_privilege(varchar, int4, varchar) -> boolean")]
68fn has_any_column_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result<bool> {
69 let allowed_actions = HashSet::from_iter([Action::Select, Action::Insert, Action::Update]);
70 let actions = parse_privilege(privileges, &allowed_actions)?;
71 has_privilege_impl_captured(
72 user_name,
73 &get_grant_object_by_oid_captured(table_oid)?,
74 &actions,
75 )
76}
77
78#[function("has_database_privilege(varchar, int4, varchar) -> boolean")]
79fn has_database_privilege(user_name: &str, database_oid: i32, privileges: &str) -> Result<bool> {
80 has_database_privilege_impl(user_name, DatabaseId::new(database_oid as _), privileges)
81}
82
83fn has_database_privilege_impl(
84 user_name: &str,
85 database_oid: DatabaseId,
86 privileges: &str,
87) -> Result<bool> {
88 let database_owner = get_database_owner_by_id_captured(database_oid)?;
90 let allowed_actions = HashSet::from_iter([Action::Create, Action::Connect]);
91 let actions = parse_privilege(privileges, &allowed_actions)?;
92 has_privilege_impl_captured(
93 user_name,
94 &OwnedGrantObject {
95 object: database_oid.into(),
96 owner: database_owner as u32,
97 },
98 &actions,
99 )
100}
101
102#[function("has_database_privilege(int4, varchar, varchar) -> boolean")]
103fn has_database_privilege_1(user_id: i32, database_name: &str, privileges: &str) -> Result<bool> {
104 let user_name = get_user_name_by_id_captured(user_id)?;
105 let database_oid = get_database_id_by_name_captured(database_name)?;
106 has_database_privilege_impl(user_name.as_str(), database_oid, privileges)
107}
108
109#[function("has_database_privilege(int4, int4, varchar) -> boolean")]
110fn has_database_privilege_2(user_id: i32, database_oid: i32, privileges: &str) -> Result<bool> {
111 let user_name = get_user_name_by_id_captured(user_id)?;
112 has_database_privilege(user_name.as_str(), database_oid, privileges)
113}
114
115#[function("has_database_privilege(varchar, varchar, varchar) -> boolean")]
116fn has_database_privilege_3(
117 user_name: &str,
118 database_name: &str,
119 privileges: &str,
120) -> Result<bool> {
121 let database_oid = get_database_id_by_name_captured(database_name)?;
122 has_database_privilege_impl(user_name, database_oid, privileges)
123}
124
125#[function("has_schema_privilege(varchar, int4, varchar) -> boolean")]
126fn has_schema_privilege(user_name: &str, schema_oid: i32, privileges: &str) -> Result<bool> {
127 has_schema_privilege_impl(user_name, SchemaId::new(schema_oid as _), privileges)
128}
129
130fn has_schema_privilege_impl(
131 user_name: &str,
132 schema_oid: SchemaId,
133 privileges: &str,
134) -> Result<bool> {
135 let schema_owner = get_schema_owner_by_id_captured(schema_oid)?;
137 let allowed_actions = HashSet::from_iter([Action::Create, Action::Usage]);
138 let actions = parse_privilege(privileges, &allowed_actions)?;
139 has_privilege_impl_captured(
140 user_name,
141 &OwnedGrantObject {
142 object: schema_oid.into(),
143 owner: schema_owner as u32,
144 },
145 &actions,
146 )
147}
148
149#[function("has_schema_privilege(int4, varchar, varchar) -> boolean")]
150fn has_schema_privilege_1(user_id: i32, schema_name: &str, privileges: &str) -> Result<bool> {
151 let user_name = get_user_name_by_id_captured(user_id)?;
152 let schema_oid = get_schema_id_by_name_captured(schema_name)?;
153 has_schema_privilege_impl(user_name.as_str(), schema_oid, privileges)
154}
155
156#[function("has_schema_privilege(int4, int4, varchar) -> boolean")]
157fn has_schema_privilege_2(user_id: i32, schema_oid: i32, privileges: &str) -> Result<bool> {
158 let user_name = get_user_name_by_id_captured(user_id)?;
159 has_schema_privilege(user_name.as_str(), schema_oid, privileges)
160}
161
162#[function("has_schema_privilege(varchar, varchar, varchar) -> boolean")]
163fn has_schema_privilege_3(user_name: &str, schema_name: &str, privileges: &str) -> Result<bool> {
164 let schema_oid = get_schema_id_by_name_captured(schema_name)?;
165 has_schema_privilege_impl(user_name, schema_oid, privileges)
166}
167
168#[function("has_function_privilege(varchar, int4, varchar) -> boolean")]
169fn has_function_privilege(user_name: &str, function_oid: i32, privileges: &str) -> Result<bool> {
170 let func_obj = get_grant_object_by_oid_captured(function_oid)?;
172 let allowed_actions = HashSet::from_iter([Action::Execute]);
173 let actions = parse_privilege(privileges, &allowed_actions)?;
174 has_privilege_impl_captured(user_name, &func_obj, &actions)
175}
176
177#[function("has_function_privilege(int4, int4, varchar) -> boolean")]
178fn has_function_privilege_1(user_id: i32, function_oid: i32, privileges: &str) -> Result<bool> {
179 let user_name = get_user_name_by_id_captured(user_id)?;
180 has_function_privilege(user_name.as_str(), function_oid, privileges)
181}
182
183#[function("has_function_privilege(varchar, varchar, varchar) -> boolean")]
184fn has_function_privilege_2(
185 user_name: &str,
186 function_name: &str,
187 privileges: &str,
188) -> Result<bool> {
189 let function_oid = get_function_id_by_name_captured(function_name)?;
190 has_function_privilege(user_name, function_oid, privileges)
191}
192
193#[function("has_function_privilege(int4, varchar, varchar) -> boolean")]
194fn has_function_privilege_3(user_id: i32, function_name: &str, privileges: &str) -> Result<bool> {
195 let user_name = get_user_name_by_id_captured(user_id)?;
196 let function_oid = get_function_id_by_name_captured(function_name)?;
197 has_function_privilege(user_name.as_str(), function_oid, privileges)
198}
199
200#[capture_context(USER_INFO_READER)]
201fn has_privilege_impl(
202 user_info_reader: &UserInfoReader,
203 user_name: &str,
204 object: &OwnedGrantObject,
205 actions: &Vec<(Action, bool)>,
206) -> Result<bool> {
207 let user_info = &user_info_reader.read_guard();
208 let user_catalog = user_info
209 .get_user_by_name(user_name)
210 .ok_or(user_not_found_err(
211 format!("User {} not found", user_name).as_str(),
212 ))?;
213 if user_catalog.id == object.owner {
214 return Ok(true);
216 }
217 Ok(user_catalog.check_privilege_with_grant_option(&object.object, actions))
218}
219
220#[capture_context(USER_INFO_READER)]
221fn get_user_name_by_id(user_info_reader: &UserInfoReader, user_id: i32) -> Result<String> {
222 let user_info = &user_info_reader.read_guard();
223 user_info
224 .get_user_name_by_id(user_id as u32)
225 .ok_or(user_not_found_err(
226 format!("User {} not found", user_id).as_str(),
227 ))
228}
229
230#[capture_context(CATALOG_READER, DB_NAME)]
231fn get_grant_object_by_oid(
232 catalog_reader: &CatalogReader,
233 db_name: &str,
234 oid: i32,
235) -> Result<OwnedGrantObject> {
236 catalog_reader
237 .read_guard()
238 .get_database_by_name(db_name)
239 .map_err(|e| ExprError::InvalidParam {
240 name: "oid",
241 reason: e.to_report_string().into(),
242 })?
243 .get_grant_object_by_oid(oid as u32)
244 .ok_or(ExprError::InvalidParam {
245 name: "oid",
246 reason: format!("Table {} not found", oid).as_str().into(),
247 })
248}
249
250#[capture_context(CATALOG_READER)]
251fn get_database_id_by_name(catalog_reader: &CatalogReader, db_name: &str) -> Result<DatabaseId> {
252 let reader = &catalog_reader.read_guard();
253 Ok(reader
254 .get_database_by_name(db_name)
255 .map_err(|e| ExprError::InvalidParam {
256 name: "database",
257 reason: e.to_report_string().into(),
258 })?
259 .id())
260}
261
262#[capture_context(CATALOG_READER)]
263fn get_database_owner_by_id(
264 catalog_reader: &CatalogReader,
265 database_id: DatabaseId,
266) -> Result<i32> {
267 let reader = &catalog_reader.read_guard();
268 let database = reader
269 .get_database_by_id(database_id)
270 .map_err(|e| ExprError::InvalidParam {
271 name: "database",
272 reason: e.to_report_string().into(),
273 })?;
274 Ok(database.owner as i32)
275}
276
277#[capture_context(CATALOG_READER, DB_NAME)]
278fn get_schema_owner_by_id(
279 catalog_reader: &CatalogReader,
280 db_name: &str,
281 schema_id: SchemaId,
282) -> Result<i32> {
283 let reader = &catalog_reader.read_guard();
284 let db_id = reader
285 .get_database_by_name(db_name)
286 .map_err(|e| ExprError::InvalidParam {
287 name: "database",
288 reason: e.to_report_string().into(),
289 })?
290 .id();
291 Ok(reader
292 .get_schema_by_id(db_id, schema_id)
293 .map_err(|e| ExprError::InvalidParam {
294 name: "schema",
295 reason: e.to_report_string().into(),
296 })?
297 .owner as i32)
298}
299
300#[capture_context(CATALOG_READER, DB_NAME)]
301fn get_schema_id_by_name(
302 catalog_reader: &CatalogReader,
303 db_name: &str,
304 schema_name: &str,
305) -> Result<SchemaId> {
306 let reader = &catalog_reader.read_guard();
307 Ok(reader
308 .get_schema_by_name(db_name, schema_name)
309 .map_err(|e| ExprError::InvalidParam {
310 name: "schema",
311 reason: e.to_report_string().into(),
312 })?
313 .id())
314}
315
316#[capture_context(CATALOG_READER, DB_NAME, AUTH_CONTEXT, SEARCH_PATH)]
317fn get_function_id_by_name(
318 catalog_reader: &CatalogReader,
319 db_name: &str,
320 auth_context: &AuthContext,
321 search_path: &SearchPath,
322 function_name: &str,
323) -> Result<i32> {
324 let desc =
325 Parser::parse_function_desc_str(function_name).map_err(|e| ExprError::InvalidParam {
326 name: "function",
327 reason: e.to_report_string().into(),
328 })?;
329 let mut arg_types = vec![];
330 if let Some(args) = desc.args {
331 for arg in &args {
332 arg_types.push(bind_data_type(&arg.data_type).map_err(|e| {
333 ExprError::InvalidParam {
334 name: "function",
335 reason: e.to_report_string().into(),
336 }
337 })?);
338 }
339 }
340
341 let (schema, name) =
342 Binder::resolve_schema_qualified_name(db_name, &desc.name).map_err(|e| {
343 ExprError::InvalidParam {
344 name: "function",
345 reason: e.to_report_string().into(),
346 }
347 })?;
348 let schema_path = SchemaPath::new(schema.as_deref(), search_path, &auth_context.user_name);
349
350 let reader = &catalog_reader.read_guard();
351 Ok(reader
352 .get_function_by_name_args(db_name, schema_path, &name, &arg_types)
353 .map_err(|e| ExprError::InvalidParam {
354 name: "function",
355 reason: e.to_report_string().into(),
356 })?
357 .0
358 .id
359 .function_id() as i32)
360}
361
362fn parse_privilege(
363 privilege_string: &str,
364 allowed_actions: &HashSet<Action>,
365) -> Result<Vec<(Action, bool)>> {
366 let mut privileges = Vec::new();
367 for part in privilege_string.split(',').map(str::trim) {
368 let (privilege_type, grant_option) = match part.rsplit_once(" WITH GRANT OPTION") {
369 Some((p, _)) => (p, true),
370 None => (part, false),
371 };
372 match Action::from_str_name(privilege_type.to_uppercase().as_str()) {
373 Some(Action::Unspecified) | None => {
374 return Err(ExprError::InvalidParam {
375 name: "privilege",
376 reason: format!("unrecognized privilege type: \"{}\"", part).into(),
377 });
378 }
379 Some(action) => {
380 if allowed_actions.is_empty() || allowed_actions.contains(&action) {
381 privileges.push((action, grant_option))
382 } else {
383 return Err(ExprError::InvalidParam {
384 name: "privilege",
385 reason: format!("unrecognized privilege type: \"{}\"", part).into(),
386 });
387 }
388 }
389 }
390 }
391 Ok(privileges)
392}