1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_common::id::{JobId, TableId};
20use risingwave_meta_model::object::ObjectType;
21use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
22use risingwave_meta_model::user_privilege::Action;
23use risingwave_meta_model::{
24 AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
25 user_default_privilege, user_privilege,
26};
27use risingwave_pb::common::PbObjectType;
28use risingwave_pb::meta::subscribe_response::{
29 Info as NotificationInfo, Operation as NotificationOperation,
30};
31use risingwave_pb::user::update_user_request::PbUpdateField;
32use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
33use sea_orm::ActiveValue::Set;
34use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
35use sea_orm::{
36 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
37 QuerySelect, TransactionTrait,
38};
39
40use crate::controller::catalog::CatalogController;
41use crate::controller::utils::{
42 PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
43 ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
44 get_index_state_tables_by_table_id, get_internal_tables_by_id, get_object_owner,
45 get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
46};
47use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
48use crate::{MetaError, MetaResult};
49
50impl CatalogController {
51 pub(crate) async fn notify_users_update(
52 &self,
53 user_infos: Vec<PbUserInfo>,
54 ) -> NotificationVersion {
55 let mut version = 0;
56 for info in user_infos {
57 version = self
58 .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
59 .await;
60 }
61 version
62 }
63
64 pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
65 let inner = self.inner.write().await;
66 let txn = inner.db.begin().await?;
67 check_user_name_duplicate(&pb_user.name, &txn).await?;
68
69 let grant_privileges = pb_user.grant_privileges.clone();
70 let user: user::ActiveModel = pb_user.into();
71 let user = user.insert(&txn).await?;
72
73 if !grant_privileges.is_empty() {
74 let mut privileges = vec![];
75 for gp in &grant_privileges {
76 let id = extract_grant_obj_id(gp.get_object()?);
77 for action_with_opt in &gp.action_with_opts {
78 privileges.push(user_privilege::ActiveModel {
79 user_id: Set(user.user_id),
80 oid: Set(id),
81 granted_by: Set(action_with_opt.granted_by as _),
82 action: Set(action_with_opt.get_action()?.into()),
83 with_grant_option: Set(action_with_opt.with_grant_option),
84 ..Default::default()
85 });
86 }
87 }
88 UserPrivilege::insert_many(privileges).exec(&txn).await?;
89 }
90 txn.commit().await?;
91
92 let mut user_info: PbUserInfo = user.into();
93 user_info.grant_privileges = grant_privileges;
94 let version = self
95 .notify_frontend(
96 NotificationOperation::Add,
97 NotificationInfo::User(user_info),
98 )
99 .await;
100
101 Ok(version)
102 }
103
104 pub async fn update_user(
105 &self,
106 update_user: PbUserInfo,
107 update_fields: &[PbUpdateField],
108 ) -> MetaResult<NotificationVersion> {
109 let inner = self.inner.write().await;
110 let rename_flag = update_fields.contains(&PbUpdateField::Rename);
111 if rename_flag {
112 check_user_name_duplicate(&update_user.name, &inner.db).await?;
113 }
114
115 let user = User::find_by_id(update_user.id as UserId)
116 .one(&inner.db)
117 .await?
118 .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
119 let mut user = user.into_active_model();
120 update_fields.iter().for_each(|&field| match field {
121 PbUpdateField::Unspecified => unreachable!(),
122 PbUpdateField::Super => user.is_super = Set(update_user.is_super),
123 PbUpdateField::Login => user.can_login = Set(update_user.can_login),
124 PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
125 PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
126 PbUpdateField::AuthInfo => {
127 user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
128 }
129 PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
130 PbUpdateField::Admin => user.is_admin = Set(update_user.is_admin),
131 });
132
133 let user = user.update(&inner.db).await?;
134 let mut user_info: PbUserInfo = user.into();
135 user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
136 let version = self
137 .notify_frontend(
138 NotificationOperation::Update,
139 NotificationInfo::User(user_info),
140 )
141 .await;
142
143 Ok(version)
144 }
145
146 #[cfg(test)]
147 pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
148 let inner = self.inner.read().await;
149 let user = User::find_by_id(id)
150 .one(&inner.db)
151 .await?
152 .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
153 Ok(user)
154 }
155
156 #[cfg(test)]
157 pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
158 let inner = self.inner.read().await;
159 let user = User::find()
160 .filter(user::Column::Name.eq(name))
161 .one(&inner.db)
162 .await?
163 .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
164 Ok(user)
165 }
166
167 pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
168 let inner = self.inner.write().await;
169 let txn = inner.db.begin().await?;
170 let user = User::find_by_id(user_id)
171 .one(&txn)
172 .await?
173 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
174 if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
175 return Err(MetaError::permission_denied(format!(
176 "drop default super user {} is not allowed",
177 user.name
178 )));
179 }
180
181 let count = Object::find()
183 .filter(object::Column::OwnerId.eq(user_id))
184 .count(&txn)
185 .await?;
186 if count != 0 {
187 return Err(MetaError::permission_denied(format!(
188 "drop user {} is not allowed, because it owns {} objects",
189 user.name, count
190 )));
191 }
192
193 let count = UserPrivilege::find()
195 .filter(user_privilege::Column::GrantedBy.eq(user_id))
196 .count(&txn)
197 .await?;
198 if count != 0 {
199 return Err(MetaError::permission_denied(format!(
200 "drop user {} is not allowed, because it granted {} privileges to others",
201 user.name, count
202 )));
203 }
204
205 let res = User::delete_by_id(user_id).exec(&txn).await?;
206 if res.rows_affected != 1 {
207 return Err(MetaError::catalog_id_not_found("user", user_id));
208 }
209 txn.commit().await?;
210
211 let version = self
212 .notify_frontend(
213 NotificationOperation::Delete,
214 NotificationInfo::User(PbUserInfo {
215 id: user_id as _,
216 ..Default::default()
217 }),
218 )
219 .await;
220
221 Ok(version)
222 }
223
224 pub async fn grant_privilege(
225 &self,
226 user_ids: Vec<UserId>,
227 new_grant_privileges: &[PbGrantPrivilege],
228 grantor: UserId,
229 with_grant_option: bool,
230 ) -> MetaResult<NotificationVersion> {
231 let inner = self.inner.write().await;
232 let txn = inner.db.begin().await?;
233 for user_id in &user_ids {
234 ensure_user_id(*user_id, &txn).await?;
235 }
236
237 let mut privileges = vec![];
238 for gp in new_grant_privileges {
239 let id = extract_grant_obj_id(gp.get_object()?);
240 let internal_table_ids = get_internal_tables_by_id(JobId::new(id as _), &txn).await?;
241 let index_state_table_ids =
242 get_index_state_tables_by_table_id(TableId::new(id as _), &txn).await?;
243 for action_with_opt in &gp.action_with_opts {
244 let action = action_with_opt.get_action()?.into();
245 privileges.push(user_privilege::ActiveModel {
246 oid: Set(id),
247 granted_by: Set(grantor),
248 action: Set(action),
249 with_grant_option: Set(with_grant_option),
250 ..Default::default()
251 });
252 if action == Action::Select {
253 privileges.extend(
254 internal_table_ids
255 .iter()
256 .chain(index_state_table_ids.iter())
257 .map(|&tid| user_privilege::ActiveModel {
258 oid: Set(tid.as_raw_id() as _),
259 granted_by: Set(grantor),
260 action: Set(Action::Select),
261 with_grant_option: Set(with_grant_option),
262 ..Default::default()
263 }),
264 );
265 }
266 }
267 }
268
269 let user = User::find_by_id(grantor)
271 .one(&txn)
272 .await?
273 .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
274 let mut filtered_privileges = vec![];
275 if !user.is_super {
276 for mut privilege in privileges {
277 if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
278 filtered_privileges.push(privilege);
279 continue;
280 }
281 let filter = user_privilege::Column::UserId
282 .eq(grantor)
283 .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
284 .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
285 .and(user_privilege::Column::WithGrantOption.eq(true));
286 let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
287 .select_only()
288 .column(user_privilege::Column::Id)
289 .filter(filter)
290 .into_tuple()
291 .one(&txn)
292 .await?;
293 let Some(privilege_id) = privilege_id else {
294 tracing::warn!(
295 "user {} don't have privilege {:?} or grant option",
296 grantor,
297 privilege.action
298 );
299 continue;
300 };
301 privilege.dependent_id = Set(Some(privilege_id));
302 filtered_privileges.push(privilege);
303 }
304 } else {
305 filtered_privileges = privileges;
306 }
307
308 let user_privileges = user_ids
310 .iter()
311 .flat_map(|user_id| {
312 filtered_privileges.iter().map(|p| {
313 let mut p = p.clone();
314 p.user_id = Set(*user_id);
315 p
316 })
317 })
318 .collect_vec();
319 for privilege in user_privileges {
320 let mut on_conflict = OnConflict::columns([
321 user_privilege::Column::UserId,
322 user_privilege::Column::Oid,
323 user_privilege::Column::Action,
324 user_privilege::Column::GrantedBy,
325 ]);
326 if *privilege.with_grant_option.as_ref() {
327 on_conflict.update_column(user_privilege::Column::WithGrantOption);
328 } else {
329 on_conflict.update_column(user_privilege::Column::UserId);
331 }
332
333 UserPrivilege::insert(privilege)
334 .on_conflict(on_conflict)
335 .do_nothing()
336 .exec(&txn)
337 .await?;
338 }
339
340 let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
341
342 txn.commit().await?;
343
344 let version = self.notify_users_update(user_infos).await;
345 Ok(version)
346 }
347
348 pub async fn revoke_privilege(
349 &self,
350 user_ids: Vec<UserId>,
351 revoke_grant_privileges: &[PbGrantPrivilege],
352 granted_by: UserId,
353 revoke_by: UserId,
354 revoke_grant_option: bool,
355 cascade: bool,
356 ) -> MetaResult<NotificationVersion> {
357 let inner = self.inner.write().await;
358 let txn = inner.db.begin().await?;
359 for user_id in &user_ids {
360 ensure_user_id(*user_id, &txn).await?;
361 }
362 let revoke_user = User::find_by_id(revoke_by)
364 .one(&txn)
365 .await?
366 .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
367
368 if !revoke_user.is_super && granted_by != revoke_by {
370 let granted_user_name: String = User::find_by_id(granted_by)
371 .select_only()
372 .column(user::Column::Name)
373 .into_tuple()
374 .one(&txn)
375 .await?
376 .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
377 return Err(MetaError::permission_denied(format!(
378 "user {} is not super, can't revoke privileges for {}",
379 revoke_user.name, granted_user_name
380 )));
381 }
382
383 let mut revoke_items = HashMap::new();
384 for privilege in revoke_grant_privileges {
385 let obj = extract_grant_obj_id(privilege.get_object()?);
386 let internal_table_ids = get_internal_tables_by_id(JobId::new(obj as _), &txn).await?;
387 let index_state_table_ids =
388 get_index_state_tables_by_table_id(TableId::new(obj as _), &txn).await?;
389 let mut include_select = false;
390 let actions = privilege
391 .action_with_opts
392 .iter()
393 .map(|ao| {
394 let action = Action::from(ao.get_action().unwrap());
395 if action == Action::Select {
396 include_select = true;
397 }
398 action
399 })
400 .collect_vec();
401 revoke_items.insert(obj, actions);
402 if include_select {
403 revoke_items.extend(
404 internal_table_ids
405 .iter()
406 .chain(index_state_table_ids.iter())
407 .map(|&tid| (tid.as_raw_id() as _, vec![Action::Select])),
408 );
409 }
410 }
411
412 let filter = if !revoke_user.is_super {
413 for (obj, actions) in &revoke_items {
415 if revoke_by == get_object_owner(*obj, &txn).await? {
416 continue;
417 }
418 let owned_actions: HashSet<Action> = UserPrivilege::find()
419 .select_only()
420 .column(user_privilege::Column::Action)
421 .filter(
422 user_privilege::Column::UserId
423 .eq(granted_by)
424 .and(user_privilege::Column::Oid.eq(*obj))
425 .and(user_privilege::Column::WithGrantOption.eq(true)),
426 )
427 .into_tuple::<Action>()
428 .all(&txn)
429 .await?
430 .into_iter()
431 .collect();
432 if actions.iter().any(|ac| !owned_actions.contains(ac)) {
433 return Err(MetaError::permission_denied(format!(
434 "user {} don't have privileges {:?} or grant option",
435 revoke_user.name, actions,
436 )));
437 }
438 }
439
440 user_privilege::Column::GrantedBy
441 .eq(granted_by)
442 .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
443 } else {
444 user_privilege::Column::UserId.is_in(user_ids.clone())
445 };
446 let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
447 for (obj, actions) in revoke_items {
448 let filter = filter
449 .clone()
450 .and(user_privilege::Column::Oid.eq(obj))
451 .and(user_privilege::Column::Action.is_in(actions));
452 root_user_privileges.extend(
453 UserPrivilege::find()
454 .select_only()
455 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
456 .filter(filter)
457 .into_partial_model()
458 .all(&txn)
459 .await?,
460 );
461 }
462 if root_user_privileges.is_empty() {
463 tracing::warn!("no privilege to revoke, ignore it");
464 return Ok(IGNORED_NOTIFICATION_VERSION);
465 }
466
467 let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
469 let (all_privilege_ids, to_update_user_ids) = if !cascade {
470 ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
471 (
472 root_privilege_ids.clone(),
473 root_user_privileges
474 .iter()
475 .map(|ur| ur.user_id)
476 .collect_vec(),
477 )
478 } else {
479 let all_user_privileges =
480 get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
481 (
482 all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
483 all_user_privileges
484 .iter()
485 .map(|ur| ur.user_id)
486 .collect_vec(),
487 )
488 };
489
490 if revoke_grant_option {
491 UserPrivilege::update_many()
492 .col_expr(
493 user_privilege::Column::WithGrantOption,
494 SimpleExpr::Value(Value::Bool(Some(false))),
495 )
496 .filter(
497 user_privilege::Column::Id
498 .is_in(all_privilege_ids)
499 .and(user_privilege::Column::WithGrantOption.eq(true)),
500 )
501 .exec(&txn)
502 .await?;
503 } else {
504 UserPrivilege::delete_many()
506 .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
507 .exec(&txn)
508 .await?;
509 }
510
511 let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
512
513 txn.commit().await?;
514
515 let version = self.notify_users_update(user_infos).await;
516 Ok(version)
517 }
518
519 pub async fn grant_default_privileges(
520 &self,
521 user_ids: Vec<UserId>,
522 database_id: DatabaseId,
523 schema_ids: Vec<SchemaId>,
524 grantor: UserId,
525 actions: Vec<PbAction>,
526 object_type: PbObjectType,
527 grantees: Vec<UserId>,
528 with_grant_option: bool,
529 ) -> MetaResult<()> {
530 tracing::debug!(
531 ?user_ids,
532 %database_id,
533 ?schema_ids,
534 ?actions,
535 ?object_type,
536 ?grantees,
537 with_grant_option,
538 "grant default privileges",
539 );
540 let inner = self.inner.write().await;
541 let txn = inner.db.begin().await?;
542 for user_id in &user_ids {
543 ensure_user_id(*user_id, &txn).await?;
544 }
545 ensure_object_id(ObjectType::Database, database_id.as_raw_id() as _, &txn).await?;
546 for schema_id in &schema_ids {
547 ensure_object_id(ObjectType::Schema, schema_id.as_raw_id() as _, &txn).await?;
548 }
549 for grantee in &grantees {
550 ensure_user_id(*grantee, &txn).await?;
551 }
552 if object_type == PbObjectType::Schema {
553 assert!(
554 schema_ids.is_empty(),
555 "schema_ids should be empty when object_type is Schema"
556 );
557
558 let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
561 let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
562 .select_only()
563 .columns([
564 user_default_privilege::Column::Id,
565 user_default_privilege::Column::UserId,
566 user_default_privilege::Column::Grantee,
567 user_default_privilege::Column::Action,
568 ])
569 .filter(
570 user_default_privilege::Column::DatabaseId
571 .eq(database_id)
572 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
573 .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
574 .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
575 .and(user_default_privilege::Column::Action.is_in(actions.clone())),
576 )
577 .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
578 .all(&txn)
579 .await?
580 .into_iter()
581 .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
582 .collect();
583
584 for user_id in user_ids {
585 for grantee in &grantees {
586 for action in &actions {
587 if let Some(existing_id) =
588 existing_default_privileges.get(&(user_id, *grantee, *action))
589 && with_grant_option
590 {
591 UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
593 id: Set(*existing_id),
594 with_grant_option: Set(true),
595 granted_by: Set(grantor as _),
596 ..Default::default()
597 })
598 .exec(&txn)
599 .await?;
600 } else {
601 UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
602 id: Default::default(),
603 database_id: Set(database_id),
604 schema_id: Set(None),
605 object_type: Set(ObjectType::Schema),
606 for_materialized_view: Set(false),
607 user_id: Set(user_id),
608 grantee: Set(*grantee),
609 granted_by: Set(grantor as _),
610 action: Set(*action),
611 with_grant_option: Set(with_grant_option),
612 })
613 .exec(&txn)
614 .await?;
615 }
616 }
617 }
618 }
619 } else {
620 let mut default_privileges = vec![];
621 for user_id in user_ids {
622 for grantee in &grantees {
623 for action in &actions {
624 if schema_ids.is_empty() {
625 default_privileges.push(user_default_privilege::ActiveModel {
626 id: Default::default(),
627 database_id: Set(database_id),
628 schema_id: Set(None),
629 object_type: Set(object_type.into()),
630 for_materialized_view: Set(object_type == PbObjectType::Mview),
631 user_id: Set(user_id),
632 grantee: Set(*grantee),
633 granted_by: Set(grantor as _),
634 action: Set((*action).into()),
635 with_grant_option: Set(with_grant_option),
636 });
637 continue;
638 }
639 for schema_id in &schema_ids {
640 default_privileges.push(user_default_privilege::ActiveModel {
641 id: Default::default(),
642 database_id: Set(database_id),
643 schema_id: Set(Some(*schema_id)),
644 object_type: Set(object_type.into()),
645 for_materialized_view: Set(object_type == PbObjectType::Mview),
646 user_id: Set(user_id),
647 grantee: Set(*grantee),
648 granted_by: Set(grantor as _),
649 action: Set((*action).into()),
650 with_grant_option: Set(with_grant_option),
651 });
652 }
653 }
654 }
655 }
656
657 let mut on_conflict = OnConflict::columns([
658 user_default_privilege::Column::UserId,
659 user_default_privilege::Column::DatabaseId,
660 user_default_privilege::Column::SchemaId,
661 user_default_privilege::Column::ObjectType,
662 user_default_privilege::Column::ForMaterializedView,
663 user_default_privilege::Column::Grantee,
664 user_default_privilege::Column::Action,
665 ]);
666 if with_grant_option {
667 on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
668 } else {
669 on_conflict.update_column(user_default_privilege::Column::UserId);
671 }
672 UserDefaultPrivilege::insert_many(default_privileges)
673 .on_conflict(on_conflict)
674 .do_nothing()
675 .exec(&txn)
676 .await?;
677 }
678
679 txn.commit().await?;
680 Ok(())
681 }
682
683 pub async fn revoke_default_privileges(
684 &self,
685 user_ids: Vec<UserId>,
686 database_id: DatabaseId,
687 schema_ids: Vec<SchemaId>,
688 actions: Vec<PbAction>,
689 object_type: PbObjectType,
690 grantees: Vec<UserId>,
691 revoke_grant_option: bool,
692 ) -> MetaResult<()> {
693 let inner = self.inner.write().await;
694 let txn = inner.db.begin().await?;
695 for user_id in &user_ids {
696 ensure_user_id(*user_id, &txn).await?;
697 }
698
699 let schema_filter = if schema_ids.is_empty() {
700 user_default_privilege::Column::SchemaId.is_null()
701 } else {
702 user_default_privilege::Column::SchemaId.is_in(schema_ids)
703 };
704 let filter = user_default_privilege::Column::DatabaseId
705 .eq(database_id)
706 .and(schema_filter)
707 .and(user_default_privilege::Column::UserId.is_in(user_ids))
708 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
709 .and(user_default_privilege::Column::Grantee.is_in(grantees))
710 .and(
711 user_default_privilege::Column::Action
712 .is_in(actions.iter().map(|&a| Action::from(a))),
713 );
714
715 if revoke_grant_option {
716 let res = UserDefaultPrivilege::update_many()
718 .col_expr(
719 user_default_privilege::Column::WithGrantOption,
720 SimpleExpr::Value(Value::Bool(Some(false))),
721 )
722 .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
723 .exec(&txn)
724 .await?;
725 tracing::info!(
726 "revoke {count} grant option for default privileges",
727 count = res.rows_affected
728 );
729 } else {
730 let res = UserDefaultPrivilege::delete_many()
731 .filter(filter)
732 .exec(&txn)
733 .await?;
734 tracing::info!(
735 "revoke {count} default privileges",
736 count = res.rows_affected
737 );
738 }
739
740 txn.commit().await?;
741 Ok(())
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
748
749 use super::*;
750 use crate::manager::MetaSrvEnv;
751
752 const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
753 const TEST_ROOT_USER_ID: UserId = 1;
754
755 fn make_test_user(name: &str) -> PbUserInfo {
756 PbUserInfo {
757 name: name.to_owned(),
758 ..Default::default()
759 }
760 }
761
762 fn make_privilege(
763 object: PbObject,
764 actions: &[PbAction],
765 with_grant_option: bool,
766 ) -> PbGrantPrivilege {
767 PbGrantPrivilege {
768 object: Some(object),
769 action_with_opts: actions
770 .iter()
771 .map(|&action| PbActionWithGrantOption {
772 action: action as _,
773 with_grant_option,
774 ..Default::default()
775 })
776 .collect(),
777 }
778 }
779
780 #[tokio::test]
781 async fn test_user_and_privilege() -> MetaResult<()> {
782 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
783 mgr.create_user(make_test_user("test_user_1")).await?;
784 mgr.create_user(make_test_user("test_user_2")).await?;
785 let user_1 = mgr.get_user_by_name("test_user_1").await?;
786 let user_2 = mgr.get_user_by_name("test_user_2").await?;
787
788 assert!(
789 mgr.create_user(make_test_user("test_user_1"))
790 .await
791 .is_err(),
792 "user_1 already exists"
793 );
794 mgr.update_user(
795 PbUserInfo {
796 id: user_1.user_id as _,
797 name: "test_user_1_new".to_owned(),
798 ..Default::default()
799 },
800 &[PbUpdateField::Rename],
801 )
802 .await?;
803 let user_1 = mgr.get_user(user_1.user_id).await?;
804 assert_eq!(user_1.name, "test_user_1_new".to_owned());
805
806 let conn_with_option = make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Connect], true);
807 let create_without_option =
808 make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Create], false);
809 mgr.grant_privilege(
811 vec![user_1.user_id],
812 std::slice::from_ref(&conn_with_option),
813 TEST_ROOT_USER_ID,
814 true,
815 )
816 .await?;
817 mgr.grant_privilege(
819 vec![user_1.user_id],
820 std::slice::from_ref(&create_without_option),
821 TEST_ROOT_USER_ID,
822 false,
823 )
824 .await?;
825 mgr.grant_privilege(
827 vec![user_2.user_id],
828 std::slice::from_ref(&conn_with_option),
829 user_1.user_id,
830 true,
831 )
832 .await?;
833 mgr.grant_privilege(
834 vec![user_2.user_id],
835 std::slice::from_ref(&create_without_option),
836 user_1.user_id,
837 false,
838 )
839 .await?;
840
841 assert!(
842 mgr.drop_user(user_1.user_id).await.is_err(),
843 "user_1 can't be dropped"
844 );
845
846 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
847 assert_eq!(privilege_1.len(), 2);
848 assert!(
849 privilege_1
850 .iter()
851 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
852 && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32)
853 );
854
855 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
856 assert_eq!(privilege_2.len(), 1);
857 assert!(
858 privilege_2
859 .iter()
860 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
861 && gp.action_with_opts[0].granted_by == user_1.user_id as u32
862 && gp.action_with_opts[0].with_grant_option)
863 );
864
865 assert!(
867 mgr.revoke_privilege(
868 vec![user_1.user_id],
869 std::slice::from_ref(&conn_with_option),
870 TEST_ROOT_USER_ID,
871 user_2.user_id,
872 false,
873 false
874 )
875 .await
876 .is_err(),
877 "user_2 can't revoke for user_1"
878 );
879
880 assert!(
882 mgr.revoke_privilege(
883 vec![user_2.user_id],
884 std::slice::from_ref(&create_without_option),
885 user_1.user_id,
886 user_1.user_id,
887 false,
888 false
889 )
890 .await
891 .is_err(),
892 "user_2 don't have grant option for CREATE"
893 );
894
895 assert!(
897 mgr.revoke_privilege(
898 vec![user_1.user_id],
899 std::slice::from_ref(&conn_with_option),
900 TEST_ROOT_USER_ID,
901 TEST_ROOT_USER_ID,
902 false,
903 false
904 )
905 .await
906 .is_err(),
907 "permission deny in restrict mode, CONN granted to user_2"
908 );
909
910 mgr.revoke_privilege(
912 vec![user_1.user_id],
913 std::slice::from_ref(&create_without_option),
914 TEST_ROOT_USER_ID,
915 TEST_ROOT_USER_ID,
916 false,
917 false,
918 )
919 .await?;
920
921 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
922 assert_eq!(privilege_1.len(), 1);
923 assert!(
924 privilege_1
925 .iter()
926 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
927 && gp.action_with_opts[0].action == PbAction::Connect as i32)
928 );
929
930 mgr.revoke_privilege(
932 vec![user_1.user_id],
933 std::slice::from_ref(&conn_with_option),
934 TEST_ROOT_USER_ID,
935 TEST_ROOT_USER_ID,
936 true,
937 true,
938 )
939 .await?;
940 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
941 assert_eq!(privilege_1.len(), 1);
942 assert!(
943 privilege_1
944 .iter()
945 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
946 && gp.action_with_opts[0].action == PbAction::Connect as i32
947 && !gp.action_with_opts[0].with_grant_option)
948 );
949 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
950 assert_eq!(privilege_2.len(), 1);
951 assert!(
952 privilege_2
953 .iter()
954 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
955 && gp.action_with_opts[0].action == PbAction::Connect as i32
956 && !gp.action_with_opts[0].with_grant_option)
957 );
958
959 mgr.revoke_privilege(
961 vec![user_1.user_id],
962 std::slice::from_ref(&conn_with_option),
963 TEST_ROOT_USER_ID,
964 TEST_ROOT_USER_ID,
965 false,
966 true,
967 )
968 .await?;
969 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
970 assert!(privilege_1.is_empty());
971 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
972 assert!(privilege_2.is_empty());
973
974 mgr.drop_user(user_1.user_id).await?;
975 mgr.drop_user(user_2.user_id).await?;
976 Ok(())
977 }
978}