1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
21use risingwave_meta_model::user_privilege::Action;
22use risingwave_meta_model::{
23 AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
24 user_default_privilege, user_privilege,
25};
26use risingwave_pb::common::PbObjectType;
27use risingwave_pb::meta::subscribe_response::{
28 Info as NotificationInfo, Operation as NotificationOperation,
29};
30use risingwave_pb::user::update_user_request::PbUpdateField;
31use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
32use sea_orm::ActiveValue::Set;
33use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
34use sea_orm::{
35 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
36 QuerySelect, TransactionTrait,
37};
38
39use crate::controller::catalog::CatalogController;
40use crate::controller::utils::{
41 PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
42 ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
43 get_index_state_tables_by_table_id, get_internal_tables_by_id, get_object_owner,
44 get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
45};
46use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
47use crate::{MetaError, MetaResult};
48
49impl CatalogController {
50 pub(crate) async fn notify_users_update(
51 &self,
52 user_infos: Vec<PbUserInfo>,
53 ) -> NotificationVersion {
54 let mut version = 0;
55 for info in user_infos {
56 version = self
57 .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
58 .await;
59 }
60 version
61 }
62
63 pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
64 let inner = self.inner.write().await;
65 let txn = inner.db.begin().await?;
66 check_user_name_duplicate(&pb_user.name, &txn).await?;
67
68 let grant_privileges = pb_user.grant_privileges.clone();
69 let user: user::ActiveModel = pb_user.into();
70 let user = user.insert(&txn).await?;
71
72 if !grant_privileges.is_empty() {
73 let mut privileges = vec![];
74 for gp in &grant_privileges {
75 let id = extract_grant_obj_id(gp.get_object()?);
76 for action_with_opt in &gp.action_with_opts {
77 privileges.push(user_privilege::ActiveModel {
78 user_id: Set(user.user_id),
79 oid: Set(id),
80 granted_by: Set(action_with_opt.granted_by as _),
81 action: Set(action_with_opt.get_action()?.into()),
82 with_grant_option: Set(action_with_opt.with_grant_option),
83 ..Default::default()
84 });
85 }
86 }
87 UserPrivilege::insert_many(privileges).exec(&txn).await?;
88 }
89 txn.commit().await?;
90
91 let mut user_info: PbUserInfo = user.into();
92 user_info.grant_privileges = grant_privileges;
93 let version = self
94 .notify_frontend(
95 NotificationOperation::Add,
96 NotificationInfo::User(user_info),
97 )
98 .await;
99
100 Ok(version)
101 }
102
103 pub async fn update_user(
104 &self,
105 update_user: PbUserInfo,
106 update_fields: &[PbUpdateField],
107 ) -> MetaResult<NotificationVersion> {
108 let inner = self.inner.write().await;
109 let rename_flag = update_fields.contains(&PbUpdateField::Rename);
110 if rename_flag {
111 check_user_name_duplicate(&update_user.name, &inner.db).await?;
112 }
113
114 let user = User::find_by_id(update_user.id as UserId)
115 .one(&inner.db)
116 .await?
117 .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
118 let mut user = user.into_active_model();
119 update_fields.iter().for_each(|&field| match field {
120 PbUpdateField::Unspecified => unreachable!(),
121 PbUpdateField::Super => user.is_super = Set(update_user.is_super),
122 PbUpdateField::Login => user.can_login = Set(update_user.can_login),
123 PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
124 PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
125 PbUpdateField::AuthInfo => {
126 user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
127 }
128 PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
129 PbUpdateField::Admin => user.is_admin = Set(update_user.is_admin),
130 });
131
132 let user = user.update(&inner.db).await?;
133 let mut user_info: PbUserInfo = user.into();
134 user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
135 let version = self
136 .notify_frontend(
137 NotificationOperation::Update,
138 NotificationInfo::User(user_info),
139 )
140 .await;
141
142 Ok(version)
143 }
144
145 #[cfg(test)]
146 pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
147 let inner = self.inner.read().await;
148 let user = User::find_by_id(id)
149 .one(&inner.db)
150 .await?
151 .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
152 Ok(user)
153 }
154
155 #[cfg(test)]
156 pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
157 let inner = self.inner.read().await;
158 let user = User::find()
159 .filter(user::Column::Name.eq(name))
160 .one(&inner.db)
161 .await?
162 .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
163 Ok(user)
164 }
165
166 pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
167 let inner = self.inner.write().await;
168 let txn = inner.db.begin().await?;
169 let user = User::find_by_id(user_id)
170 .one(&txn)
171 .await?
172 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
173 if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
174 return Err(MetaError::permission_denied(format!(
175 "drop default super user {} is not allowed",
176 user.name
177 )));
178 }
179
180 let count = Object::find()
182 .filter(object::Column::OwnerId.eq(user_id))
183 .count(&txn)
184 .await?;
185 if count != 0 {
186 return Err(MetaError::permission_denied(format!(
187 "drop user {} is not allowed, because it owns {} objects",
188 user.name, count
189 )));
190 }
191
192 let count = UserPrivilege::find()
194 .filter(user_privilege::Column::GrantedBy.eq(user_id))
195 .count(&txn)
196 .await?;
197 if count != 0 {
198 return Err(MetaError::permission_denied(format!(
199 "drop user {} is not allowed, because it granted {} privileges to others",
200 user.name, count
201 )));
202 }
203
204 let res = User::delete_by_id(user_id).exec(&txn).await?;
205 if res.rows_affected != 1 {
206 return Err(MetaError::catalog_id_not_found("user", user_id));
207 }
208 txn.commit().await?;
209
210 let version = self
211 .notify_frontend(
212 NotificationOperation::Delete,
213 NotificationInfo::User(PbUserInfo {
214 id: user_id as _,
215 ..Default::default()
216 }),
217 )
218 .await;
219
220 Ok(version)
221 }
222
223 pub async fn grant_privilege(
224 &self,
225 user_ids: Vec<UserId>,
226 new_grant_privileges: &[PbGrantPrivilege],
227 grantor: UserId,
228 with_grant_option: bool,
229 ) -> MetaResult<NotificationVersion> {
230 let inner = self.inner.write().await;
231 let txn = inner.db.begin().await?;
232 for user_id in &user_ids {
233 ensure_user_id(*user_id, &txn).await?;
234 }
235
236 let mut privileges = vec![];
237 for gp in new_grant_privileges {
238 let id = extract_grant_obj_id(gp.get_object()?);
239 let internal_table_ids = get_internal_tables_by_id(id, &txn).await?;
240 let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?;
241 for action_with_opt in &gp.action_with_opts {
242 let action = action_with_opt.get_action()?.into();
243 privileges.push(user_privilege::ActiveModel {
244 oid: Set(id),
245 granted_by: Set(grantor),
246 action: Set(action),
247 with_grant_option: Set(with_grant_option),
248 ..Default::default()
249 });
250 if action == Action::Select {
251 privileges.extend(
252 internal_table_ids
253 .iter()
254 .chain(index_state_table_ids.iter())
255 .map(|&tid| user_privilege::ActiveModel {
256 oid: Set(tid),
257 granted_by: Set(grantor),
258 action: Set(Action::Select),
259 with_grant_option: Set(with_grant_option),
260 ..Default::default()
261 }),
262 );
263 }
264 }
265 }
266
267 let user = User::find_by_id(grantor)
269 .one(&txn)
270 .await?
271 .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
272 let mut filtered_privileges = vec![];
273 if !user.is_super {
274 for mut privilege in privileges {
275 if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
276 filtered_privileges.push(privilege);
277 continue;
278 }
279 let filter = user_privilege::Column::UserId
280 .eq(grantor)
281 .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
282 .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
283 .and(user_privilege::Column::WithGrantOption.eq(true));
284 let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
285 .select_only()
286 .column(user_privilege::Column::Id)
287 .filter(filter)
288 .into_tuple()
289 .one(&txn)
290 .await?;
291 let Some(privilege_id) = privilege_id else {
292 tracing::warn!(
293 "user {} don't have privilege {:?} or grant option",
294 grantor,
295 privilege.action
296 );
297 continue;
298 };
299 privilege.dependent_id = Set(Some(privilege_id));
300 filtered_privileges.push(privilege);
301 }
302 } else {
303 filtered_privileges = privileges;
304 }
305
306 let user_privileges = user_ids
308 .iter()
309 .flat_map(|user_id| {
310 filtered_privileges.iter().map(|p| {
311 let mut p = p.clone();
312 p.user_id = Set(*user_id);
313 p
314 })
315 })
316 .collect_vec();
317 for privilege in user_privileges {
318 let mut on_conflict = OnConflict::columns([
319 user_privilege::Column::UserId,
320 user_privilege::Column::Oid,
321 user_privilege::Column::Action,
322 user_privilege::Column::GrantedBy,
323 ]);
324 if *privilege.with_grant_option.as_ref() {
325 on_conflict.update_column(user_privilege::Column::WithGrantOption);
326 } else {
327 on_conflict.update_column(user_privilege::Column::UserId);
329 }
330
331 UserPrivilege::insert(privilege)
332 .on_conflict(on_conflict)
333 .do_nothing()
334 .exec(&txn)
335 .await?;
336 }
337
338 let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
339
340 txn.commit().await?;
341
342 let version = self.notify_users_update(user_infos).await;
343 Ok(version)
344 }
345
346 pub async fn revoke_privilege(
347 &self,
348 user_ids: Vec<UserId>,
349 revoke_grant_privileges: &[PbGrantPrivilege],
350 granted_by: UserId,
351 revoke_by: UserId,
352 revoke_grant_option: bool,
353 cascade: bool,
354 ) -> MetaResult<NotificationVersion> {
355 let inner = self.inner.write().await;
356 let txn = inner.db.begin().await?;
357 for user_id in &user_ids {
358 ensure_user_id(*user_id, &txn).await?;
359 }
360 let revoke_user = User::find_by_id(revoke_by)
362 .one(&txn)
363 .await?
364 .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
365
366 if !revoke_user.is_super && granted_by != revoke_by {
368 let granted_user_name: String = User::find_by_id(granted_by)
369 .select_only()
370 .column(user::Column::Name)
371 .into_tuple()
372 .one(&txn)
373 .await?
374 .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
375 return Err(MetaError::permission_denied(format!(
376 "user {} is not super, can't revoke privileges for {}",
377 revoke_user.name, granted_user_name
378 )));
379 }
380
381 let mut revoke_items = HashMap::new();
382 for privilege in revoke_grant_privileges {
383 let obj = extract_grant_obj_id(privilege.get_object()?);
384 let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?;
385 let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?;
386 let mut include_select = false;
387 let actions = privilege
388 .action_with_opts
389 .iter()
390 .map(|ao| {
391 let action = Action::from(ao.get_action().unwrap());
392 if action == Action::Select {
393 include_select = true;
394 }
395 action
396 })
397 .collect_vec();
398 revoke_items.insert(obj, actions);
399 if include_select {
400 revoke_items.extend(
401 internal_table_ids
402 .iter()
403 .chain(index_state_table_ids.iter())
404 .map(|&tid| (tid, vec![Action::Select])),
405 );
406 }
407 }
408
409 let filter = if !revoke_user.is_super {
410 for (obj, actions) in &revoke_items {
412 if revoke_by == get_object_owner(*obj, &txn).await? {
413 continue;
414 }
415 let owned_actions: HashSet<Action> = UserPrivilege::find()
416 .select_only()
417 .column(user_privilege::Column::Action)
418 .filter(
419 user_privilege::Column::UserId
420 .eq(granted_by)
421 .and(user_privilege::Column::Oid.eq(*obj))
422 .and(user_privilege::Column::WithGrantOption.eq(true)),
423 )
424 .into_tuple::<Action>()
425 .all(&txn)
426 .await?
427 .into_iter()
428 .collect();
429 if actions.iter().any(|ac| !owned_actions.contains(ac)) {
430 return Err(MetaError::permission_denied(format!(
431 "user {} don't have privileges {:?} or grant option",
432 revoke_user.name, actions,
433 )));
434 }
435 }
436
437 user_privilege::Column::GrantedBy
438 .eq(granted_by)
439 .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
440 } else {
441 user_privilege::Column::UserId.is_in(user_ids.clone())
442 };
443 let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
444 for (obj, actions) in revoke_items {
445 let filter = filter
446 .clone()
447 .and(user_privilege::Column::Oid.eq(obj))
448 .and(user_privilege::Column::Action.is_in(actions));
449 root_user_privileges.extend(
450 UserPrivilege::find()
451 .select_only()
452 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
453 .filter(filter)
454 .into_partial_model()
455 .all(&txn)
456 .await?,
457 );
458 }
459 if root_user_privileges.is_empty() {
460 tracing::warn!("no privilege to revoke, ignore it");
461 return Ok(IGNORED_NOTIFICATION_VERSION);
462 }
463
464 let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
466 let (all_privilege_ids, to_update_user_ids) = if !cascade {
467 ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
468 (
469 root_privilege_ids.clone(),
470 root_user_privileges
471 .iter()
472 .map(|ur| ur.user_id)
473 .collect_vec(),
474 )
475 } else {
476 let all_user_privileges =
477 get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
478 (
479 all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
480 all_user_privileges
481 .iter()
482 .map(|ur| ur.user_id)
483 .collect_vec(),
484 )
485 };
486
487 if revoke_grant_option {
488 UserPrivilege::update_many()
489 .col_expr(
490 user_privilege::Column::WithGrantOption,
491 SimpleExpr::Value(Value::Bool(Some(false))),
492 )
493 .filter(
494 user_privilege::Column::Id
495 .is_in(all_privilege_ids)
496 .and(user_privilege::Column::WithGrantOption.eq(true)),
497 )
498 .exec(&txn)
499 .await?;
500 } else {
501 UserPrivilege::delete_many()
503 .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
504 .exec(&txn)
505 .await?;
506 }
507
508 let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
509
510 txn.commit().await?;
511
512 let version = self.notify_users_update(user_infos).await;
513 Ok(version)
514 }
515
516 pub async fn grant_default_privileges(
517 &self,
518 user_ids: Vec<UserId>,
519 database_id: DatabaseId,
520 schema_ids: Vec<SchemaId>,
521 grantor: UserId,
522 actions: Vec<PbAction>,
523 object_type: PbObjectType,
524 grantees: Vec<UserId>,
525 with_grant_option: bool,
526 ) -> MetaResult<()> {
527 tracing::debug!(
528 ?user_ids,
529 database_id,
530 ?schema_ids,
531 ?actions,
532 ?object_type,
533 ?grantees,
534 with_grant_option,
535 "grant default privileges",
536 );
537 let inner = self.inner.write().await;
538 let txn = inner.db.begin().await?;
539 for user_id in &user_ids {
540 ensure_user_id(*user_id, &txn).await?;
541 }
542 ensure_object_id(ObjectType::Database, database_id, &txn).await?;
543 for schema_id in &schema_ids {
544 ensure_object_id(ObjectType::Schema, *schema_id, &txn).await?;
545 }
546 for grantee in &grantees {
547 ensure_user_id(*grantee, &txn).await?;
548 }
549 if object_type == PbObjectType::Schema {
550 assert!(
551 schema_ids.is_empty(),
552 "schema_ids should be empty when object_type is Schema"
553 );
554
555 let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
558 let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
559 .select_only()
560 .columns([
561 user_default_privilege::Column::Id,
562 user_default_privilege::Column::UserId,
563 user_default_privilege::Column::Grantee,
564 user_default_privilege::Column::Action,
565 ])
566 .filter(
567 user_default_privilege::Column::DatabaseId
568 .eq(database_id)
569 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
570 .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
571 .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
572 .and(user_default_privilege::Column::Action.is_in(actions.clone())),
573 )
574 .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
575 .all(&txn)
576 .await?
577 .into_iter()
578 .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
579 .collect();
580
581 for user_id in user_ids {
582 for grantee in &grantees {
583 for action in &actions {
584 if let Some(existing_id) =
585 existing_default_privileges.get(&(user_id, *grantee, *action))
586 && with_grant_option
587 {
588 UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
590 id: Set(*existing_id),
591 with_grant_option: Set(true),
592 granted_by: Set(grantor as _),
593 ..Default::default()
594 })
595 .exec(&txn)
596 .await?;
597 } else {
598 UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
599 id: Default::default(),
600 database_id: Set(database_id),
601 schema_id: Set(None),
602 object_type: Set(ObjectType::Schema),
603 for_materialized_view: Set(false),
604 user_id: Set(user_id),
605 grantee: Set(*grantee),
606 granted_by: Set(grantor as _),
607 action: Set(*action),
608 with_grant_option: Set(with_grant_option),
609 })
610 .exec(&txn)
611 .await?;
612 }
613 }
614 }
615 }
616 } else {
617 let mut default_privileges = vec![];
618 for user_id in user_ids {
619 for grantee in &grantees {
620 for action in &actions {
621 if schema_ids.is_empty() {
622 default_privileges.push(user_default_privilege::ActiveModel {
623 id: Default::default(),
624 database_id: Set(database_id),
625 schema_id: Set(None),
626 object_type: Set(object_type.into()),
627 for_materialized_view: Set(object_type == PbObjectType::Mview),
628 user_id: Set(user_id),
629 grantee: Set(*grantee),
630 granted_by: Set(grantor as _),
631 action: Set((*action).into()),
632 with_grant_option: Set(with_grant_option),
633 });
634 continue;
635 }
636 for schema_id in &schema_ids {
637 default_privileges.push(user_default_privilege::ActiveModel {
638 id: Default::default(),
639 database_id: Set(database_id),
640 schema_id: Set(Some(*schema_id)),
641 object_type: Set(object_type.into()),
642 for_materialized_view: Set(object_type == PbObjectType::Mview),
643 user_id: Set(user_id),
644 grantee: Set(*grantee),
645 granted_by: Set(grantor as _),
646 action: Set((*action).into()),
647 with_grant_option: Set(with_grant_option),
648 });
649 }
650 }
651 }
652 }
653
654 let mut on_conflict = OnConflict::columns([
655 user_default_privilege::Column::UserId,
656 user_default_privilege::Column::DatabaseId,
657 user_default_privilege::Column::SchemaId,
658 user_default_privilege::Column::ObjectType,
659 user_default_privilege::Column::ForMaterializedView,
660 user_default_privilege::Column::Grantee,
661 user_default_privilege::Column::Action,
662 ]);
663 if with_grant_option {
664 on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
665 } else {
666 on_conflict.update_column(user_default_privilege::Column::UserId);
668 }
669 UserDefaultPrivilege::insert_many(default_privileges)
670 .on_conflict(on_conflict)
671 .do_nothing()
672 .exec(&txn)
673 .await?;
674 }
675
676 txn.commit().await?;
677 Ok(())
678 }
679
680 pub async fn revoke_default_privileges(
681 &self,
682 user_ids: Vec<UserId>,
683 database_id: DatabaseId,
684 schema_ids: Vec<SchemaId>,
685 actions: Vec<PbAction>,
686 object_type: PbObjectType,
687 grantees: Vec<UserId>,
688 revoke_grant_option: bool,
689 ) -> MetaResult<()> {
690 let inner = self.inner.write().await;
691 let txn = inner.db.begin().await?;
692 for user_id in &user_ids {
693 ensure_user_id(*user_id, &txn).await?;
694 }
695
696 let schema_filter = if schema_ids.is_empty() {
697 user_default_privilege::Column::SchemaId.is_null()
698 } else {
699 user_default_privilege::Column::SchemaId.is_in(schema_ids)
700 };
701 let filter = user_default_privilege::Column::DatabaseId
702 .eq(database_id)
703 .and(schema_filter)
704 .and(user_default_privilege::Column::UserId.is_in(user_ids))
705 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
706 .and(user_default_privilege::Column::Grantee.is_in(grantees))
707 .and(
708 user_default_privilege::Column::Action
709 .is_in(actions.iter().map(|&a| Action::from(a))),
710 );
711
712 if revoke_grant_option {
713 let res = UserDefaultPrivilege::update_many()
715 .col_expr(
716 user_default_privilege::Column::WithGrantOption,
717 SimpleExpr::Value(Value::Bool(Some(false))),
718 )
719 .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
720 .exec(&txn)
721 .await?;
722 tracing::info!(
723 "revoke {count} grant option for default privileges",
724 count = res.rows_affected
725 );
726 } else {
727 let res = UserDefaultPrivilege::delete_many()
728 .filter(filter)
729 .exec(&txn)
730 .await?;
731 tracing::info!(
732 "revoke {count} default privileges",
733 count = res.rows_affected
734 );
735 }
736
737 txn.commit().await?;
738 Ok(())
739 }
740}
741
742#[cfg(test)]
743mod tests {
744 use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
745
746 use super::*;
747 use crate::manager::MetaSrvEnv;
748
749 const TEST_DATABASE_ID: DatabaseId = 1;
750 const TEST_ROOT_USER_ID: UserId = 1;
751
752 fn make_test_user(name: &str) -> PbUserInfo {
753 PbUserInfo {
754 name: name.to_owned(),
755 ..Default::default()
756 }
757 }
758
759 fn make_privilege(
760 object: PbObject,
761 actions: &[PbAction],
762 with_grant_option: bool,
763 ) -> PbGrantPrivilege {
764 PbGrantPrivilege {
765 object: Some(object),
766 action_with_opts: actions
767 .iter()
768 .map(|&action| PbActionWithGrantOption {
769 action: action as _,
770 with_grant_option,
771 ..Default::default()
772 })
773 .collect(),
774 }
775 }
776
777 #[tokio::test]
778 async fn test_user_and_privilege() -> MetaResult<()> {
779 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
780 mgr.create_user(make_test_user("test_user_1")).await?;
781 mgr.create_user(make_test_user("test_user_2")).await?;
782 let user_1 = mgr.get_user_by_name("test_user_1").await?;
783 let user_2 = mgr.get_user_by_name("test_user_2").await?;
784
785 assert!(
786 mgr.create_user(make_test_user("test_user_1"))
787 .await
788 .is_err(),
789 "user_1 already exists"
790 );
791 mgr.update_user(
792 PbUserInfo {
793 id: user_1.user_id as _,
794 name: "test_user_1_new".to_owned(),
795 ..Default::default()
796 },
797 &[PbUpdateField::Rename],
798 )
799 .await?;
800 let user_1 = mgr.get_user(user_1.user_id).await?;
801 assert_eq!(user_1.name, "test_user_1_new".to_owned());
802
803 let conn_with_option = make_privilege(
804 PbObject::DatabaseId(TEST_DATABASE_ID as _),
805 &[PbAction::Connect],
806 true,
807 );
808 let create_without_option = make_privilege(
809 PbObject::DatabaseId(TEST_DATABASE_ID as _),
810 &[PbAction::Create],
811 false,
812 );
813 mgr.grant_privilege(
815 vec![user_1.user_id],
816 std::slice::from_ref(&conn_with_option),
817 TEST_ROOT_USER_ID,
818 true,
819 )
820 .await?;
821 mgr.grant_privilege(
823 vec![user_1.user_id],
824 std::slice::from_ref(&create_without_option),
825 TEST_ROOT_USER_ID,
826 false,
827 )
828 .await?;
829 mgr.grant_privilege(
831 vec![user_2.user_id],
832 std::slice::from_ref(&conn_with_option),
833 user_1.user_id,
834 true,
835 )
836 .await?;
837 mgr.grant_privilege(
838 vec![user_2.user_id],
839 std::slice::from_ref(&create_without_option),
840 user_1.user_id,
841 false,
842 )
843 .await?;
844
845 assert!(
846 mgr.drop_user(user_1.user_id).await.is_err(),
847 "user_1 can't be dropped"
848 );
849
850 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
851 assert_eq!(privilege_1.len(), 2);
852 assert!(privilege_1.iter().all(|gp| gp.object
853 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
854 && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32));
855
856 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
857 assert_eq!(privilege_2.len(), 1);
858 assert!(privilege_2.iter().all(|gp| gp.object
859 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
860 && gp.action_with_opts[0].granted_by == user_1.user_id as u32
861 && gp.action_with_opts[0].with_grant_option));
862
863 assert!(
865 mgr.revoke_privilege(
866 vec![user_1.user_id],
867 std::slice::from_ref(&conn_with_option),
868 TEST_ROOT_USER_ID,
869 user_2.user_id,
870 false,
871 false
872 )
873 .await
874 .is_err(),
875 "user_2 can't revoke for user_1"
876 );
877
878 assert!(
880 mgr.revoke_privilege(
881 vec![user_2.user_id],
882 std::slice::from_ref(&create_without_option),
883 user_1.user_id,
884 user_1.user_id,
885 false,
886 false
887 )
888 .await
889 .is_err(),
890 "user_2 don't have grant option for CREATE"
891 );
892
893 assert!(
895 mgr.revoke_privilege(
896 vec![user_1.user_id],
897 std::slice::from_ref(&conn_with_option),
898 TEST_ROOT_USER_ID,
899 TEST_ROOT_USER_ID,
900 false,
901 false
902 )
903 .await
904 .is_err(),
905 "permission deny in restrict mode, CONN granted to user_2"
906 );
907
908 mgr.revoke_privilege(
910 vec![user_1.user_id],
911 std::slice::from_ref(&create_without_option),
912 TEST_ROOT_USER_ID,
913 TEST_ROOT_USER_ID,
914 false,
915 false,
916 )
917 .await?;
918
919 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
920 assert_eq!(privilege_1.len(), 1);
921 assert!(privilege_1.iter().all(|gp| gp.object
922 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
923 && gp.action_with_opts[0].action == PbAction::Connect as i32));
924
925 mgr.revoke_privilege(
927 vec![user_1.user_id],
928 std::slice::from_ref(&conn_with_option),
929 TEST_ROOT_USER_ID,
930 TEST_ROOT_USER_ID,
931 true,
932 true,
933 )
934 .await?;
935 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
936 assert_eq!(privilege_1.len(), 1);
937 assert!(privilege_1.iter().all(|gp| gp.object
938 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
939 && gp.action_with_opts[0].action == PbAction::Connect as i32
940 && !gp.action_with_opts[0].with_grant_option));
941 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
942 assert_eq!(privilege_2.len(), 1);
943 assert!(privilege_2.iter().all(|gp| gp.object
944 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
945 && gp.action_with_opts[0].action == PbAction::Connect as i32
946 && !gp.action_with_opts[0].with_grant_option));
947
948 mgr.revoke_privilege(
950 vec![user_1.user_id],
951 std::slice::from_ref(&conn_with_option),
952 TEST_ROOT_USER_ID,
953 TEST_ROOT_USER_ID,
954 false,
955 true,
956 )
957 .await?;
958 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
959 assert!(privilege_1.is_empty());
960 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
961 assert!(privilege_2.is_empty());
962
963 mgr.drop_user(user_1.user_id).await?;
964 mgr.drop_user(user_2.user_id).await?;
965 Ok(())
966 }
967}