1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
21use risingwave_meta_model::user_privilege::Action;
22use risingwave_meta_model::{
23 AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
24 user_default_privilege, user_privilege,
25};
26use risingwave_pb::common::PbObjectType;
27use risingwave_pb::meta::subscribe_response::{
28 Info as NotificationInfo, Operation as NotificationOperation,
29};
30use risingwave_pb::user::update_user_request::PbUpdateField;
31use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
32use sea_orm::ActiveValue::Set;
33use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
34use sea_orm::{
35 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
36 QuerySelect, TransactionTrait,
37};
38
39use crate::controller::catalog::CatalogController;
40use crate::controller::utils::{
41 PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
42 ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
43 get_index_state_tables_by_table_id, get_internal_tables_by_id, get_object_owner,
44 get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
45};
46use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
47use crate::{MetaError, MetaResult};
48
49impl CatalogController {
50 pub(crate) async fn notify_users_update(
51 &self,
52 user_infos: Vec<PbUserInfo>,
53 ) -> NotificationVersion {
54 let mut version = 0;
55 for info in user_infos {
56 version = self
57 .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
58 .await;
59 }
60 version
61 }
62
63 pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
64 let inner = self.inner.write().await;
65 let txn = inner.db.begin().await?;
66 check_user_name_duplicate(&pb_user.name, &txn).await?;
67
68 let grant_privileges = pb_user.grant_privileges.clone();
69 let user: user::ActiveModel = pb_user.into();
70 let user = user.insert(&txn).await?;
71
72 if !grant_privileges.is_empty() {
73 let mut privileges = vec![];
74 for gp in &grant_privileges {
75 let id = extract_grant_obj_id(gp.get_object()?);
76 for action_with_opt in &gp.action_with_opts {
77 privileges.push(user_privilege::ActiveModel {
78 user_id: Set(user.user_id),
79 oid: Set(id),
80 granted_by: Set(action_with_opt.granted_by as _),
81 action: Set(action_with_opt.get_action()?.into()),
82 with_grant_option: Set(action_with_opt.with_grant_option),
83 ..Default::default()
84 });
85 }
86 }
87 UserPrivilege::insert_many(privileges).exec(&txn).await?;
88 }
89 txn.commit().await?;
90
91 let mut user_info: PbUserInfo = user.into();
92 user_info.grant_privileges = grant_privileges;
93 let version = self
94 .notify_frontend(
95 NotificationOperation::Add,
96 NotificationInfo::User(user_info),
97 )
98 .await;
99
100 Ok(version)
101 }
102
103 pub async fn update_user(
104 &self,
105 update_user: PbUserInfo,
106 update_fields: &[PbUpdateField],
107 ) -> MetaResult<NotificationVersion> {
108 let inner = self.inner.write().await;
109 let rename_flag = update_fields.contains(&PbUpdateField::Rename);
110 if rename_flag {
111 check_user_name_duplicate(&update_user.name, &inner.db).await?;
112 }
113
114 let user = User::find_by_id(update_user.id as UserId)
115 .one(&inner.db)
116 .await?
117 .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
118 let mut user = user.into_active_model();
119 update_fields.iter().for_each(|&field| match field {
120 PbUpdateField::Unspecified => unreachable!(),
121 PbUpdateField::Super => user.is_super = Set(update_user.is_super),
122 PbUpdateField::Login => user.can_login = Set(update_user.can_login),
123 PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
124 PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
125 PbUpdateField::AuthInfo => {
126 user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
127 }
128 PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
129 PbUpdateField::Admin => user.is_admin = Set(update_user.is_admin),
130 });
131
132 let user = user.update(&inner.db).await?;
133 let mut user_info: PbUserInfo = user.into();
134 user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
135 let version = self
136 .notify_frontend(
137 NotificationOperation::Update,
138 NotificationInfo::User(user_info),
139 )
140 .await;
141
142 Ok(version)
143 }
144
145 #[cfg(test)]
146 pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
147 let inner = self.inner.read().await;
148 let user = User::find_by_id(id)
149 .one(&inner.db)
150 .await?
151 .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
152 Ok(user)
153 }
154
155 #[cfg(test)]
156 pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
157 let inner = self.inner.read().await;
158 let user = User::find()
159 .filter(user::Column::Name.eq(name))
160 .one(&inner.db)
161 .await?
162 .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
163 Ok(user)
164 }
165
166 pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
167 let inner = self.inner.write().await;
168 let txn = inner.db.begin().await?;
169 let user = User::find_by_id(user_id)
170 .one(&txn)
171 .await?
172 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
173 if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
174 return Err(MetaError::permission_denied(format!(
175 "drop default super user {} is not allowed",
176 user.name
177 )));
178 }
179
180 let count = Object::find()
182 .filter(object::Column::OwnerId.eq(user_id))
183 .count(&txn)
184 .await?;
185 if count != 0 {
186 return Err(MetaError::permission_denied(format!(
187 "drop user {} is not allowed, because it owns {} objects",
188 user.name, count
189 )));
190 }
191
192 let count = UserPrivilege::find()
194 .filter(user_privilege::Column::GrantedBy.eq(user_id))
195 .count(&txn)
196 .await?;
197 if count != 0 {
198 return Err(MetaError::permission_denied(format!(
199 "drop user {} is not allowed, because it granted {} privileges to others",
200 user.name, count
201 )));
202 }
203
204 let res = User::delete_by_id(user_id).exec(&txn).await?;
205 if res.rows_affected != 1 {
206 return Err(MetaError::catalog_id_not_found("user", user_id));
207 }
208 txn.commit().await?;
209
210 let version = self
211 .notify_frontend(
212 NotificationOperation::Delete,
213 NotificationInfo::User(PbUserInfo {
214 id: user_id as _,
215 ..Default::default()
216 }),
217 )
218 .await;
219
220 Ok(version)
221 }
222
223 pub async fn grant_privilege(
224 &self,
225 user_ids: Vec<UserId>,
226 new_grant_privileges: &[PbGrantPrivilege],
227 grantor: UserId,
228 with_grant_option: bool,
229 ) -> MetaResult<NotificationVersion> {
230 let inner = self.inner.write().await;
231 let txn = inner.db.begin().await?;
232 for user_id in &user_ids {
233 ensure_user_id(*user_id, &txn).await?;
234 }
235
236 let mut privileges = vec![];
237 for gp in new_grant_privileges {
238 let id = extract_grant_obj_id(gp.get_object()?);
239 let internal_table_ids = get_internal_tables_by_id(id.as_job_id(), &txn).await?;
240 let index_state_table_ids =
241 get_index_state_tables_by_table_id(id.as_table_id(), &txn).await?;
242 for action_with_opt in &gp.action_with_opts {
243 let action = action_with_opt.get_action()?.into();
244 privileges.push(user_privilege::ActiveModel {
245 oid: Set(id),
246 granted_by: Set(grantor),
247 action: Set(action),
248 with_grant_option: Set(with_grant_option),
249 ..Default::default()
250 });
251 if action == Action::Select {
252 privileges.extend(
253 internal_table_ids
254 .iter()
255 .chain(index_state_table_ids.iter())
256 .map(|&tid| user_privilege::ActiveModel {
257 oid: Set(tid.as_object_id()),
258 granted_by: Set(grantor),
259 action: Set(Action::Select),
260 with_grant_option: Set(with_grant_option),
261 ..Default::default()
262 }),
263 );
264 }
265 }
266 }
267
268 let user = User::find_by_id(grantor)
270 .one(&txn)
271 .await?
272 .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
273 let mut filtered_privileges = vec![];
274 if !user.is_super {
275 for mut privilege in privileges {
276 if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
277 filtered_privileges.push(privilege);
278 continue;
279 }
280 let filter = user_privilege::Column::UserId
281 .eq(grantor)
282 .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
283 .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
284 .and(user_privilege::Column::WithGrantOption.eq(true));
285 let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
286 .select_only()
287 .column(user_privilege::Column::Id)
288 .filter(filter)
289 .into_tuple()
290 .one(&txn)
291 .await?;
292 let Some(privilege_id) = privilege_id else {
293 tracing::warn!(
294 "user {} don't have privilege {:?} or grant option",
295 grantor,
296 privilege.action
297 );
298 continue;
299 };
300 privilege.dependent_id = Set(Some(privilege_id));
301 filtered_privileges.push(privilege);
302 }
303 } else {
304 filtered_privileges = privileges;
305 }
306
307 let user_privileges = user_ids
309 .iter()
310 .flat_map(|user_id| {
311 filtered_privileges.iter().map(|p| {
312 let mut p = p.clone();
313 p.user_id = Set(*user_id);
314 p
315 })
316 })
317 .collect_vec();
318 for privilege in user_privileges {
319 let mut on_conflict = OnConflict::columns([
320 user_privilege::Column::UserId,
321 user_privilege::Column::Oid,
322 user_privilege::Column::Action,
323 user_privilege::Column::GrantedBy,
324 ]);
325 if *privilege.with_grant_option.as_ref() {
326 on_conflict.update_column(user_privilege::Column::WithGrantOption);
327 } else {
328 on_conflict.update_column(user_privilege::Column::UserId);
330 }
331
332 UserPrivilege::insert(privilege)
333 .on_conflict(on_conflict)
334 .do_nothing()
335 .exec(&txn)
336 .await?;
337 }
338
339 let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
340
341 txn.commit().await?;
342
343 let version = self.notify_users_update(user_infos).await;
344 Ok(version)
345 }
346
347 pub async fn revoke_privilege(
348 &self,
349 user_ids: Vec<UserId>,
350 revoke_grant_privileges: &[PbGrantPrivilege],
351 granted_by: UserId,
352 revoke_by: UserId,
353 revoke_grant_option: bool,
354 cascade: bool,
355 ) -> MetaResult<NotificationVersion> {
356 let inner = self.inner.write().await;
357 let txn = inner.db.begin().await?;
358 for user_id in &user_ids {
359 ensure_user_id(*user_id, &txn).await?;
360 }
361 let revoke_user = User::find_by_id(revoke_by)
363 .one(&txn)
364 .await?
365 .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
366
367 if !revoke_user.is_super && granted_by != revoke_by {
369 let granted_user_name: String = User::find_by_id(granted_by)
370 .select_only()
371 .column(user::Column::Name)
372 .into_tuple()
373 .one(&txn)
374 .await?
375 .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
376 return Err(MetaError::permission_denied(format!(
377 "user {} is not super, can't revoke privileges for {}",
378 revoke_user.name, granted_user_name
379 )));
380 }
381
382 let mut revoke_items = HashMap::new();
383 for privilege in revoke_grant_privileges {
384 let obj = extract_grant_obj_id(privilege.get_object()?);
385 let internal_table_ids = get_internal_tables_by_id(obj.as_job_id(), &txn).await?;
386 let index_state_table_ids =
387 get_index_state_tables_by_table_id(obj.as_table_id(), &txn).await?;
388 let mut include_select = false;
389 let actions = privilege
390 .action_with_opts
391 .iter()
392 .map(|ao| {
393 let action = Action::from(ao.get_action().unwrap());
394 if action == Action::Select {
395 include_select = true;
396 }
397 action
398 })
399 .collect_vec();
400 revoke_items.insert(obj, actions);
401 if include_select {
402 revoke_items.extend(
403 internal_table_ids
404 .iter()
405 .chain(index_state_table_ids.iter())
406 .map(|&tid| (tid.as_object_id(), vec![Action::Select])),
407 );
408 }
409 }
410
411 let filter = if !revoke_user.is_super {
412 for (obj, actions) in &revoke_items {
414 if revoke_by == get_object_owner(*obj, &txn).await? {
415 continue;
416 }
417 let owned_actions: HashSet<Action> = UserPrivilege::find()
418 .select_only()
419 .column(user_privilege::Column::Action)
420 .filter(
421 user_privilege::Column::UserId
422 .eq(granted_by)
423 .and(user_privilege::Column::Oid.eq(*obj))
424 .and(user_privilege::Column::WithGrantOption.eq(true)),
425 )
426 .into_tuple::<Action>()
427 .all(&txn)
428 .await?
429 .into_iter()
430 .collect();
431 if actions.iter().any(|ac| !owned_actions.contains(ac)) {
432 return Err(MetaError::permission_denied(format!(
433 "user {} don't have privileges {:?} or grant option",
434 revoke_user.name, actions,
435 )));
436 }
437 }
438
439 user_privilege::Column::GrantedBy
440 .eq(granted_by)
441 .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
442 } else {
443 user_privilege::Column::UserId.is_in(user_ids.clone())
444 };
445 let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
446 for (obj, actions) in revoke_items {
447 let filter = filter
448 .clone()
449 .and(user_privilege::Column::Oid.eq(obj))
450 .and(user_privilege::Column::Action.is_in(actions));
451 root_user_privileges.extend(
452 UserPrivilege::find()
453 .select_only()
454 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
455 .filter(filter)
456 .into_partial_model()
457 .all(&txn)
458 .await?,
459 );
460 }
461 if root_user_privileges.is_empty() {
462 tracing::warn!("no privilege to revoke, ignore it");
463 return Ok(IGNORED_NOTIFICATION_VERSION);
464 }
465
466 let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
468 let (all_privilege_ids, to_update_user_ids) = if !cascade {
469 ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
470 (
471 root_privilege_ids.clone(),
472 root_user_privileges
473 .iter()
474 .map(|ur| ur.user_id)
475 .collect_vec(),
476 )
477 } else {
478 let all_user_privileges =
479 get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
480 (
481 all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
482 all_user_privileges
483 .iter()
484 .map(|ur| ur.user_id)
485 .collect_vec(),
486 )
487 };
488
489 if revoke_grant_option {
490 UserPrivilege::update_many()
491 .col_expr(
492 user_privilege::Column::WithGrantOption,
493 SimpleExpr::Value(Value::Bool(Some(false))),
494 )
495 .filter(
496 user_privilege::Column::Id
497 .is_in(all_privilege_ids)
498 .and(user_privilege::Column::WithGrantOption.eq(true)),
499 )
500 .exec(&txn)
501 .await?;
502 } else {
503 UserPrivilege::delete_many()
505 .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
506 .exec(&txn)
507 .await?;
508 }
509
510 let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
511
512 txn.commit().await?;
513
514 let version = self.notify_users_update(user_infos).await;
515 Ok(version)
516 }
517
518 pub async fn grant_default_privileges(
519 &self,
520 user_ids: Vec<UserId>,
521 database_id: DatabaseId,
522 schema_ids: Vec<SchemaId>,
523 grantor: UserId,
524 actions: Vec<PbAction>,
525 object_type: PbObjectType,
526 grantees: Vec<UserId>,
527 with_grant_option: bool,
528 ) -> MetaResult<()> {
529 tracing::debug!(
530 ?user_ids,
531 %database_id,
532 ?schema_ids,
533 ?actions,
534 ?object_type,
535 ?grantees,
536 with_grant_option,
537 "grant default privileges",
538 );
539 let inner = self.inner.write().await;
540 let txn = inner.db.begin().await?;
541 for user_id in &user_ids {
542 ensure_user_id(*user_id, &txn).await?;
543 }
544 ensure_object_id(ObjectType::Database, database_id, &txn).await?;
545 for schema_id in &schema_ids {
546 ensure_object_id(ObjectType::Schema, *schema_id, &txn).await?;
547 }
548 for grantee in &grantees {
549 ensure_user_id(*grantee, &txn).await?;
550 }
551 if object_type == PbObjectType::Schema {
552 assert!(
553 schema_ids.is_empty(),
554 "schema_ids should be empty when object_type is Schema"
555 );
556
557 let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
560 let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
561 .select_only()
562 .columns([
563 user_default_privilege::Column::Id,
564 user_default_privilege::Column::UserId,
565 user_default_privilege::Column::Grantee,
566 user_default_privilege::Column::Action,
567 ])
568 .filter(
569 user_default_privilege::Column::DatabaseId
570 .eq(database_id)
571 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
572 .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
573 .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
574 .and(user_default_privilege::Column::Action.is_in(actions.clone())),
575 )
576 .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
577 .all(&txn)
578 .await?
579 .into_iter()
580 .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
581 .collect();
582
583 for user_id in user_ids {
584 for grantee in &grantees {
585 for action in &actions {
586 if let Some(existing_id) =
587 existing_default_privileges.get(&(user_id, *grantee, *action))
588 && with_grant_option
589 {
590 UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
592 id: Set(*existing_id),
593 with_grant_option: Set(true),
594 granted_by: Set(grantor as _),
595 ..Default::default()
596 })
597 .exec(&txn)
598 .await?;
599 } else {
600 UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
601 id: Default::default(),
602 database_id: Set(database_id),
603 schema_id: Set(None),
604 object_type: Set(ObjectType::Schema),
605 for_materialized_view: Set(false),
606 user_id: Set(user_id),
607 grantee: Set(*grantee),
608 granted_by: Set(grantor as _),
609 action: Set(*action),
610 with_grant_option: Set(with_grant_option),
611 })
612 .exec(&txn)
613 .await?;
614 }
615 }
616 }
617 }
618 } else {
619 let mut default_privileges = vec![];
620 for user_id in user_ids {
621 for grantee in &grantees {
622 for action in &actions {
623 if schema_ids.is_empty() {
624 default_privileges.push(user_default_privilege::ActiveModel {
625 id: Default::default(),
626 database_id: Set(database_id),
627 schema_id: Set(None),
628 object_type: Set(object_type.into()),
629 for_materialized_view: Set(object_type == PbObjectType::Mview),
630 user_id: Set(user_id),
631 grantee: Set(*grantee),
632 granted_by: Set(grantor as _),
633 action: Set((*action).into()),
634 with_grant_option: Set(with_grant_option),
635 });
636 continue;
637 }
638 for schema_id in &schema_ids {
639 default_privileges.push(user_default_privilege::ActiveModel {
640 id: Default::default(),
641 database_id: Set(database_id),
642 schema_id: Set(Some(*schema_id)),
643 object_type: Set(object_type.into()),
644 for_materialized_view: Set(object_type == PbObjectType::Mview),
645 user_id: Set(user_id),
646 grantee: Set(*grantee),
647 granted_by: Set(grantor as _),
648 action: Set((*action).into()),
649 with_grant_option: Set(with_grant_option),
650 });
651 }
652 }
653 }
654 }
655
656 let mut on_conflict = OnConflict::columns([
657 user_default_privilege::Column::UserId,
658 user_default_privilege::Column::DatabaseId,
659 user_default_privilege::Column::SchemaId,
660 user_default_privilege::Column::ObjectType,
661 user_default_privilege::Column::ForMaterializedView,
662 user_default_privilege::Column::Grantee,
663 user_default_privilege::Column::Action,
664 ]);
665 if with_grant_option {
666 on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
667 } else {
668 on_conflict.update_column(user_default_privilege::Column::UserId);
670 }
671 UserDefaultPrivilege::insert_many(default_privileges)
672 .on_conflict(on_conflict)
673 .do_nothing()
674 .exec(&txn)
675 .await?;
676 }
677
678 txn.commit().await?;
679 Ok(())
680 }
681
682 pub async fn revoke_default_privileges(
683 &self,
684 user_ids: Vec<UserId>,
685 database_id: DatabaseId,
686 schema_ids: Vec<SchemaId>,
687 actions: Vec<PbAction>,
688 object_type: PbObjectType,
689 grantees: Vec<UserId>,
690 revoke_grant_option: bool,
691 ) -> MetaResult<()> {
692 let inner = self.inner.write().await;
693 let txn = inner.db.begin().await?;
694 for user_id in &user_ids {
695 ensure_user_id(*user_id, &txn).await?;
696 }
697
698 let schema_filter = if schema_ids.is_empty() {
699 user_default_privilege::Column::SchemaId.is_null()
700 } else {
701 user_default_privilege::Column::SchemaId.is_in(schema_ids)
702 };
703 let filter = user_default_privilege::Column::DatabaseId
704 .eq(database_id)
705 .and(schema_filter)
706 .and(user_default_privilege::Column::UserId.is_in(user_ids))
707 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
708 .and(user_default_privilege::Column::Grantee.is_in(grantees))
709 .and(
710 user_default_privilege::Column::Action
711 .is_in(actions.iter().map(|&a| Action::from(a))),
712 );
713
714 if revoke_grant_option {
715 let res = UserDefaultPrivilege::update_many()
717 .col_expr(
718 user_default_privilege::Column::WithGrantOption,
719 SimpleExpr::Value(Value::Bool(Some(false))),
720 )
721 .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
722 .exec(&txn)
723 .await?;
724 tracing::info!(
725 "revoke {count} grant option for default privileges",
726 count = res.rows_affected
727 );
728 } else {
729 let res = UserDefaultPrivilege::delete_many()
730 .filter(filter)
731 .exec(&txn)
732 .await?;
733 tracing::info!(
734 "revoke {count} default privileges",
735 count = res.rows_affected
736 );
737 }
738
739 txn.commit().await?;
740 Ok(())
741 }
742}
743
744#[cfg(test)]
745mod tests {
746 use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
747
748 use super::*;
749 use crate::manager::MetaSrvEnv;
750
751 const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
752 const TEST_ROOT_USER_ID: UserId = 1;
753
754 fn make_test_user(name: &str) -> PbUserInfo {
755 PbUserInfo {
756 name: name.to_owned(),
757 ..Default::default()
758 }
759 }
760
761 fn make_privilege(
762 object: PbObject,
763 actions: &[PbAction],
764 with_grant_option: bool,
765 ) -> PbGrantPrivilege {
766 PbGrantPrivilege {
767 object: Some(object),
768 action_with_opts: actions
769 .iter()
770 .map(|&action| PbActionWithGrantOption {
771 action: action as _,
772 with_grant_option,
773 ..Default::default()
774 })
775 .collect(),
776 }
777 }
778
779 #[tokio::test]
780 async fn test_user_and_privilege() -> MetaResult<()> {
781 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
782 mgr.create_user(make_test_user("test_user_1")).await?;
783 mgr.create_user(make_test_user("test_user_2")).await?;
784 let user_1 = mgr.get_user_by_name("test_user_1").await?;
785 let user_2 = mgr.get_user_by_name("test_user_2").await?;
786
787 assert!(
788 mgr.create_user(make_test_user("test_user_1"))
789 .await
790 .is_err(),
791 "user_1 already exists"
792 );
793 mgr.update_user(
794 PbUserInfo {
795 id: user_1.user_id as _,
796 name: "test_user_1_new".to_owned(),
797 ..Default::default()
798 },
799 &[PbUpdateField::Rename],
800 )
801 .await?;
802 let user_1 = mgr.get_user(user_1.user_id).await?;
803 assert_eq!(user_1.name, "test_user_1_new".to_owned());
804
805 let conn_with_option = make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Connect], true);
806 let create_without_option =
807 make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Create], false);
808 mgr.grant_privilege(
810 vec![user_1.user_id],
811 std::slice::from_ref(&conn_with_option),
812 TEST_ROOT_USER_ID,
813 true,
814 )
815 .await?;
816 mgr.grant_privilege(
818 vec![user_1.user_id],
819 std::slice::from_ref(&create_without_option),
820 TEST_ROOT_USER_ID,
821 false,
822 )
823 .await?;
824 mgr.grant_privilege(
826 vec![user_2.user_id],
827 std::slice::from_ref(&conn_with_option),
828 user_1.user_id,
829 true,
830 )
831 .await?;
832 mgr.grant_privilege(
833 vec![user_2.user_id],
834 std::slice::from_ref(&create_without_option),
835 user_1.user_id,
836 false,
837 )
838 .await?;
839
840 assert!(
841 mgr.drop_user(user_1.user_id).await.is_err(),
842 "user_1 can't be dropped"
843 );
844
845 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
846 assert_eq!(privilege_1.len(), 2);
847 assert!(
848 privilege_1
849 .iter()
850 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
851 && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32)
852 );
853
854 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
855 assert_eq!(privilege_2.len(), 1);
856 assert!(
857 privilege_2
858 .iter()
859 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
860 && gp.action_with_opts[0].granted_by == user_1.user_id as u32
861 && gp.action_with_opts[0].with_grant_option)
862 );
863
864 assert!(
866 mgr.revoke_privilege(
867 vec![user_1.user_id],
868 std::slice::from_ref(&conn_with_option),
869 TEST_ROOT_USER_ID,
870 user_2.user_id,
871 false,
872 false
873 )
874 .await
875 .is_err(),
876 "user_2 can't revoke for user_1"
877 );
878
879 assert!(
881 mgr.revoke_privilege(
882 vec![user_2.user_id],
883 std::slice::from_ref(&create_without_option),
884 user_1.user_id,
885 user_1.user_id,
886 false,
887 false
888 )
889 .await
890 .is_err(),
891 "user_2 don't have grant option for CREATE"
892 );
893
894 assert!(
896 mgr.revoke_privilege(
897 vec![user_1.user_id],
898 std::slice::from_ref(&conn_with_option),
899 TEST_ROOT_USER_ID,
900 TEST_ROOT_USER_ID,
901 false,
902 false
903 )
904 .await
905 .is_err(),
906 "permission deny in restrict mode, CONN granted to user_2"
907 );
908
909 mgr.revoke_privilege(
911 vec![user_1.user_id],
912 std::slice::from_ref(&create_without_option),
913 TEST_ROOT_USER_ID,
914 TEST_ROOT_USER_ID,
915 false,
916 false,
917 )
918 .await?;
919
920 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
921 assert_eq!(privilege_1.len(), 1);
922 assert!(
923 privilege_1
924 .iter()
925 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
926 && gp.action_with_opts[0].action == PbAction::Connect as i32)
927 );
928
929 mgr.revoke_privilege(
931 vec![user_1.user_id],
932 std::slice::from_ref(&conn_with_option),
933 TEST_ROOT_USER_ID,
934 TEST_ROOT_USER_ID,
935 true,
936 true,
937 )
938 .await?;
939 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
940 assert_eq!(privilege_1.len(), 1);
941 assert!(
942 privilege_1
943 .iter()
944 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
945 && gp.action_with_opts[0].action == PbAction::Connect as i32
946 && !gp.action_with_opts[0].with_grant_option)
947 );
948 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
949 assert_eq!(privilege_2.len(), 1);
950 assert!(
951 privilege_2
952 .iter()
953 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
954 && gp.action_with_opts[0].action == PbAction::Connect as i32
955 && !gp.action_with_opts[0].with_grant_option)
956 );
957
958 mgr.revoke_privilege(
960 vec![user_1.user_id],
961 std::slice::from_ref(&conn_with_option),
962 TEST_ROOT_USER_ID,
963 TEST_ROOT_USER_ID,
964 false,
965 true,
966 )
967 .await?;
968 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
969 assert!(privilege_1.is_empty());
970 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
971 assert!(privilege_2.is_empty());
972
973 mgr.drop_user(user_1.user_id).await?;
974 mgr.drop_user(user_2.user_id).await?;
975 Ok(())
976 }
977}