1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_pb::user::PbGrantPrivilege;
17use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, PbObject};
18use risingwave_sqlparser::ast::{GrantObjects, Privileges, Statement};
19
20use super::RwPgResponse;
21use crate::bind_data_type;
22use crate::binder::Binder;
23use crate::catalog::CatalogError;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::table_catalog::TableType;
26use crate::error::{ErrorCode, Result};
27use crate::handler::HandlerArgs;
28use crate::session::SessionImpl;
29use crate::user::user_privilege::{
30 available_privilege_actions, check_privilege_type, get_prost_action,
31};
32
33fn make_prost_privilege(
34 session: &SessionImpl,
35 privileges: Privileges,
36 objects: GrantObjects,
37) -> Result<Vec<PbGrantPrivilege>> {
38 check_privilege_type(&privileges, &objects)?;
39
40 let catalog_reader = session.env().catalog_reader();
41 let reader = catalog_reader.read_guard();
42 let actions = match privileges {
43 Privileges::All { .. } => available_privilege_actions(&objects)?,
44 Privileges::Actions(actions) => actions
45 .into_iter()
46 .map(|action| get_prost_action(&action))
47 .collect(),
48 };
49 let mut grant_objs = vec![];
50 match objects {
51 GrantObjects::Databases(databases) => {
52 for db in databases {
53 let database_name = Binder::resolve_database_name(db)?;
54 let database = reader.get_database_by_name(&database_name)?;
55 grant_objs.push(PbObject::DatabaseId(database.id()));
56 }
57 }
58 GrantObjects::Schemas(schemas) => {
59 for schema in schemas {
60 let schema_name = Binder::resolve_schema_name(schema)?;
61 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
62 grant_objs.push(PbObject::SchemaId(schema.id()));
63 }
64 }
65 GrantObjects::Mviews(tables) => {
66 let db_name = &session.database();
67 let search_path = session.config().search_path();
68 let user_name = &session.user_name();
69
70 for name in tables {
71 let (schema_name, table_name) =
72 Binder::resolve_schema_qualified_name(db_name, name)?;
73 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
74
75 let (table, _) =
76 reader.get_created_table_by_name(db_name, schema_path, &table_name)?;
77 match table.table_type() {
78 TableType::MaterializedView => {}
79 _ => {
80 return Err(ErrorCode::InvalidInputSyntax(format!(
81 "{table_name} is not a materialized view",
82 ))
83 .into());
84 }
85 }
86 grant_objs.push(PbObject::TableId(table.id().table_id));
87 }
88 }
89 GrantObjects::Tables(tables) => {
90 let db_name = &session.database();
91 let search_path = session.config().search_path();
92 let user_name = &session.user_name();
93
94 for name in tables {
95 let (schema_name, table_name) =
96 Binder::resolve_schema_qualified_name(db_name, name)?;
97 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
98
99 match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
100 Ok((table, _)) => {
101 match table.table_type() {
102 TableType::Table => {
103 grant_objs.push(PbObject::TableId(table.id().table_id));
104 continue;
105 }
106 _ => {
107 return Err(ErrorCode::InvalidInputSyntax(format!(
108 "{table_name} is not a table",
109 ))
110 .into());
111 }
112 };
113 }
114 Err(CatalogError::NotFound("table", _)) => {
115 let (view, _) = reader
116 .get_view_by_name(db_name, schema_path, &table_name)
117 .map_err(|_| CatalogError::NotFound("table", table_name))?;
118 grant_objs.push(PbObject::ViewId(view.id));
119 }
120 Err(e) => {
121 return Err(e.into());
122 }
123 }
124 }
125 }
126 GrantObjects::Sources(sources) => {
127 let db_name = &session.database();
128 let search_path = session.config().search_path();
129 let user_name = &session.user_name();
130
131 for name in sources {
132 let (schema_name, source_name) =
133 Binder::resolve_schema_qualified_name(db_name, name)?;
134 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
135
136 let (source, _) = reader.get_source_by_name(db_name, schema_path, &source_name)?;
137 grant_objs.push(PbObject::SourceId(source.id));
138 }
139 }
140 GrantObjects::Sinks(sinks) => {
141 let db_name = &session.database();
142 let search_path = session.config().search_path();
143 let user_name = &session.user_name();
144
145 for name in sinks {
146 let (schema_name, sink_name) =
147 Binder::resolve_schema_qualified_name(db_name, name)?;
148 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
149
150 let (sink, _) = reader.get_sink_by_name(db_name, schema_path, &sink_name)?;
151 grant_objs.push(PbObject::SinkId(sink.id.sink_id));
152 }
153 }
154 GrantObjects::Views(views) => {
155 let db_name = &session.database();
156 let search_path = session.config().search_path();
157 let user_name = &session.user_name();
158
159 for name in views {
160 let (schema_name, view_name) =
161 Binder::resolve_schema_qualified_name(db_name, name)?;
162 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
163
164 let (view, _) = reader.get_view_by_name(db_name, schema_path, &view_name)?;
165 grant_objs.push(PbObject::ViewId(view.id));
166 }
167 }
168 GrantObjects::Connections(conns) => {
169 let db_name = &session.database();
170 let search_path = session.config().search_path();
171 let user_name = &session.user_name();
172
173 for name in conns {
174 let (schema_name, conn_name) =
175 Binder::resolve_schema_qualified_name(db_name, name)?;
176 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
177
178 let (conn, _) = reader.get_connection_by_name(db_name, schema_path, &conn_name)?;
179 grant_objs.push(PbObject::ConnectionId(conn.id));
180 }
181 }
182 GrantObjects::Subscriptions(subscriptions) => {
183 let db_name = &session.database();
184 let search_path = session.config().search_path();
185 let user_name = &session.user_name();
186
187 for name in subscriptions {
188 let (schema_name, sub_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
189 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
190
191 let (sub, _) = reader.get_subscription_by_name(db_name, schema_path, &sub_name)?;
192 grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
193 }
194 }
195 GrantObjects::Functions(func_descs) => {
196 let db_name = &session.database();
197 let search_path = session.config().search_path();
198 let user_name = &session.user_name();
199
200 for func_desc in func_descs {
201 let (schema_name, func_name) =
202 Binder::resolve_schema_qualified_name(db_name, func_desc.name)?;
203 let arg_types = match func_desc.args {
204 Some(args) => {
205 let mut arg_types = vec![];
206 for arg in args {
207 arg_types.push(bind_data_type(&arg.data_type)?);
208 }
209 Some(arg_types)
210 }
211 None => None,
212 };
213 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
214
215 let (func, _) = match arg_types {
216 Some(arg_types) => reader.get_function_by_name_args(
217 db_name,
218 schema_path,
219 &func_name,
220 &arg_types,
221 )?,
222 None => {
223 let (functions, schema_name) =
224 reader.get_functions_by_name(db_name, schema_path, &func_name)?;
225 if functions.len() > 1 {
226 return Err(ErrorCode::CatalogError(format!(
227 "function name {func_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously."
228 ).into()).into());
229 }
230 (
231 functions.into_iter().next().expect("no functions"),
232 schema_name,
233 )
234 }
235 };
236 grant_objs.push(PbObject::FunctionId(func.id.function_id()));
237 }
238 }
239 GrantObjects::Secrets(secrets) => {
240 let db_name = &session.database();
241 let search_path = session.config().search_path();
242 let user_name = &session.user_name();
243
244 for name in secrets {
245 let (schema_name, secret_name) =
246 Binder::resolve_schema_qualified_name(db_name, name)?;
247 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
248
249 let (secret, _) = reader.get_secret_by_name(db_name, schema_path, &secret_name)?;
250 grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
251 }
252 }
253 GrantObjects::AllSourcesInSchema { schemas } => {
254 for schema in schemas {
255 let schema_name = Binder::resolve_schema_name(schema)?;
256 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
257 schema.iter_source().for_each(|source| {
258 grant_objs.push(PbObject::SourceId(source.id));
259 });
260 }
261 }
262 GrantObjects::AllMviewsInSchema { schemas } => {
263 for schema in schemas {
264 let schema_name = Binder::resolve_schema_name(schema)?;
265 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
266 schema.iter_all_mvs().for_each(|mview| {
267 grant_objs.push(PbObject::TableId(mview.id().table_id));
268 });
269 }
270 }
271 GrantObjects::AllTablesInSchema { schemas } => {
272 for schema in schemas {
273 let schema_name = Binder::resolve_schema_name(schema)?;
274 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
275 schema.iter_user_table().for_each(|table| {
276 grant_objs.push(PbObject::TableId(table.id().table_id));
277 });
278 }
279 }
280 GrantObjects::AllSinksInSchema { schemas } => {
281 for schema in schemas {
282 let schema_name = Binder::resolve_schema_name(schema)?;
283 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
284 schema.iter_sink().for_each(|sink| {
285 grant_objs.push(PbObject::SinkId(sink.id.sink_id));
286 });
287 }
288 }
289 GrantObjects::AllViewsInSchema { schemas } => {
290 for schema in schemas {
291 let schema_name = Binder::resolve_schema_name(schema)?;
292 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
293 schema.iter_view().for_each(|view| {
294 grant_objs.push(PbObject::ViewId(view.id));
295 });
296 }
297 }
298 GrantObjects::AllFunctionsInSchema { schemas } => {
299 for schema in schemas {
300 let schema_name = Binder::resolve_schema_name(schema)?;
301 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
302 schema.iter_function().for_each(|func| {
303 grant_objs.push(PbObject::FunctionId(func.id.function_id()));
304 });
305 }
306 }
307 GrantObjects::AllSecretsInSchema { schemas } => {
308 for schema in schemas {
309 let schema_name = Binder::resolve_schema_name(schema)?;
310 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
311 schema.iter_secret().for_each(|secret| {
312 grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
313 });
314 }
315 }
316 GrantObjects::AllSubscriptionsInSchema { schemas } => {
317 for schema in schemas {
318 let schema_name = Binder::resolve_schema_name(schema)?;
319 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
320 schema.iter_subscription().for_each(|sub| {
321 grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
322 });
323 }
324 }
325 GrantObjects::AllConnectionsInSchema { schemas } => {
326 for schema in schemas {
327 let schema_name = Binder::resolve_schema_name(schema)?;
328 let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
329 schema.iter_connections().for_each(|conn| {
330 grant_objs.push(PbObject::ConnectionId(conn.id));
331 });
332 }
333 }
334 o => {
335 return Err(ErrorCode::BindError(format!(
336 "GRANT statement does not support object type: {:?}",
337 o
338 ))
339 .into());
340 }
341 };
342 let action_with_opts = actions
343 .into_iter()
344 .map(|action| ActionWithGrantOption {
345 action: action as i32,
346 granted_by: session.user_id(),
347 ..Default::default()
348 })
349 .collect::<Vec<_>>();
350
351 let mut prost_privileges = vec![];
352 for objs in grant_objs {
353 prost_privileges.push(PbGrantPrivilege {
354 action_with_opts: action_with_opts.clone(),
355 object: Some(objs),
356 });
357 }
358 Ok(prost_privileges)
359}
360
361pub async fn handle_grant_privilege(
362 handler_args: HandlerArgs,
363 stmt: Statement,
364) -> Result<RwPgResponse> {
365 let session = handler_args.session;
366 let Statement::Grant {
367 privileges,
368 objects,
369 grantees,
370 with_grant_option,
371 granted_by,
372 } = stmt
373 else {
374 return Err(ErrorCode::BindError("Invalid grant statement".to_owned()).into());
375 };
376 let mut users = vec![];
377 {
378 let user_reader = session.env().user_info_reader();
379 let reader = user_reader.read_guard();
380 for grantee in grantees {
381 if let Some(user) = reader.get_user_by_name(&grantee.real_value()) {
382 users.push(user.id);
383 } else {
384 return Err(ErrorCode::BindError("Grantee does not exist".to_owned()).into());
385 }
386 }
387 if let Some(granted_by) = &granted_by {
388 if reader.get_user_by_name(&granted_by.real_value()).is_none() {
390 return Err(ErrorCode::BindError("Grantor does not exist".to_owned()).into());
391 }
392 }
393 };
394
395 let privileges = make_prost_privilege(&session, privileges, objects)?;
396 let user_info_writer = session.user_info_writer()?;
397 user_info_writer
398 .grant_privilege(users, privileges, with_grant_option, session.user_id())
399 .await?;
400 Ok(PgResponse::empty_result(StatementType::GRANT_PRIVILEGE))
401}
402
403pub async fn handle_revoke_privilege(
404 handler_args: HandlerArgs,
405 stmt: Statement,
406) -> Result<RwPgResponse> {
407 let session = handler_args.session;
408 let Statement::Revoke {
409 privileges,
410 objects,
411 grantees,
412 granted_by,
413 revoke_grant_option,
414 cascade,
415 } = stmt
416 else {
417 return Err(ErrorCode::BindError("Invalid revoke statement".to_owned()).into());
418 };
419 let mut users = vec![];
420 let mut granted_by_id = None;
421 {
422 let user_reader = session.env().user_info_reader();
423 let reader = user_reader.read_guard();
424 for grantee in grantees {
425 if let Some(user) = reader.get_user_by_name(&grantee.real_value()) {
426 users.push(user.id);
427 } else {
428 return Err(ErrorCode::BindError("Grantee does not exist".to_owned()).into());
429 }
430 }
431 if let Some(granted_by) = &granted_by {
432 if let Some(user) = reader.get_user_by_name(&granted_by.real_value()) {
433 granted_by_id = Some(user.id);
434 } else {
435 return Err(ErrorCode::BindError("Grantor does not exist".to_owned()).into());
436 }
437 }
438 };
439 let privileges = make_prost_privilege(&session, privileges, objects)?;
440 let user_info_writer = session.user_info_writer()?;
441 user_info_writer
442 .revoke_privilege(
443 users,
444 privileges,
445 granted_by_id.unwrap_or(session.user_id()),
446 session.user_id(),
447 revoke_grant_option,
448 cascade,
449 )
450 .await?;
451
452 Ok(PgResponse::empty_result(StatementType::REVOKE_PRIVILEGE))
453}
454
455#[cfg(test)]
456mod tests {
457 use risingwave_common::catalog::DEFAULT_SUPER_USER_ID;
458 use risingwave_pb::user::grant_privilege::Action;
459
460 use super::*;
461 use crate::test_utils::LocalFrontend;
462
463 #[tokio::test]
464 async fn test_grant_privilege() {
465 let frontend = LocalFrontend::new(Default::default()).await;
466 let session = frontend.session_ref();
467 frontend
468 .run_sql("CREATE USER user WITH SUPERUSER PASSWORD 'password'")
469 .await
470 .unwrap();
471 frontend
472 .run_sql("CREATE USER user1 WITH PASSWORD 'password1'")
473 .await
474 .unwrap();
475 frontend.run_sql("CREATE DATABASE db1").await.unwrap();
476 frontend
477 .run_sql("GRANT ALL ON DATABASE db1 TO user1 WITH GRANT OPTION GRANTED BY user")
478 .await
479 .unwrap();
480
481 let (session_database_id, database_id) = {
482 let catalog_reader = session.env().catalog_reader();
483 let reader = catalog_reader.read_guard();
484 (
485 reader
486 .get_database_by_name(&session.database())
487 .unwrap()
488 .id(),
489 reader.get_database_by_name("db1").unwrap().id(),
490 )
491 };
492
493 {
494 let user_reader = session.env().user_info_reader();
495 let reader = user_reader.read_guard();
496 let user_info = reader.get_user_by_name("user1").unwrap();
497 assert_eq!(
498 user_info.grant_privileges,
499 vec![
500 PbGrantPrivilege {
501 action_with_opts: vec![ActionWithGrantOption {
502 action: Action::Connect as i32,
503 with_grant_option: true,
504 granted_by: session.user_id(),
505 }],
506 object: Some(PbObject::DatabaseId(session_database_id)),
507 },
508 PbGrantPrivilege {
509 action_with_opts: vec![
510 ActionWithGrantOption {
511 action: Action::Create as i32,
512 with_grant_option: true,
513 granted_by: DEFAULT_SUPER_USER_ID,
514 },
515 ActionWithGrantOption {
516 action: Action::Connect as i32,
517 with_grant_option: true,
518 granted_by: DEFAULT_SUPER_USER_ID,
519 }
520 ],
521 object: Some(PbObject::DatabaseId(database_id)),
522 }
523 ]
524 );
525 }
526
527 frontend
528 .run_sql("REVOKE GRANT OPTION FOR ALL ON DATABASE db1 from user1 GRANTED BY user")
529 .await
530 .unwrap();
531 {
532 let user_reader = session.env().user_info_reader();
533 let reader = user_reader.read_guard();
534 let user_info = reader.get_user_by_name("user1").unwrap();
535 assert!(
536 user_info
537 .grant_privileges
538 .iter()
539 .filter(|gp| gp.object == Some(PbObject::DatabaseId(database_id)))
540 .all(|p| p.action_with_opts.iter().all(|ao| !ao.with_grant_option))
541 );
542 }
543
544 frontend
545 .run_sql("REVOKE ALL ON DATABASE db1 from user1 GRANTED BY user")
546 .await
547 .unwrap();
548 {
549 let user_reader = session.env().user_info_reader();
550 let reader = user_reader.read_guard();
551 let user_info = reader.get_user_by_name("user1").unwrap();
552 assert_eq!(
553 user_info.grant_privileges,
554 vec![PbGrantPrivilege {
555 action_with_opts: vec![ActionWithGrantOption {
556 action: Action::Connect as i32,
557 with_grant_option: true,
558 granted_by: session.user_id(),
559 }],
560 object: Some(PbObject::DatabaseId(session_database_id)),
561 }]
562 );
563 }
564 frontend.run_sql("DROP USER user1").await.unwrap();
565 }
566}