1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
21use risingwave_meta_model::user_privilege::Action;
22use risingwave_meta_model::{
23 AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
24 user_default_privilege, user_privilege,
25};
26use risingwave_pb::common::PbObjectType;
27use risingwave_pb::meta::subscribe_response::{
28 Info as NotificationInfo, Operation as NotificationOperation,
29};
30use risingwave_pb::user::update_user_request::PbUpdateField;
31use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
32use sea_orm::ActiveValue::Set;
33use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
34use sea_orm::{
35 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
36 QuerySelect, TransactionTrait,
37};
38
39use crate::controller::catalog::CatalogController;
40use crate::controller::utils::{
41 PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
42 ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
43 get_iceberg_related_object_ids, get_index_state_tables_by_table_id, get_internal_tables_by_id,
44 get_object_owner, get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
45};
46use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
47use crate::{MetaError, MetaResult};
48
49impl CatalogController {
50 pub(crate) async fn notify_users_update(
51 &self,
52 user_infos: Vec<PbUserInfo>,
53 ) -> NotificationVersion {
54 let mut version = 0;
55 for info in user_infos {
56 version = self
57 .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
58 .await;
59 }
60 version
61 }
62
63 pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
64 let inner = self.inner.write().await;
65 let txn = inner.db.begin().await?;
66 check_user_name_duplicate(&pb_user.name, &txn).await?;
67
68 let grant_privileges = pb_user.grant_privileges.clone();
69 let user: user::ActiveModel = pb_user.into();
70 let user = user.insert(&txn).await?;
71
72 if !grant_privileges.is_empty() {
73 let mut privileges = vec![];
74 for gp in &grant_privileges {
75 let id = extract_grant_obj_id(gp.get_object()?);
76 for action_with_opt in &gp.action_with_opts {
77 privileges.push(user_privilege::ActiveModel {
78 user_id: Set(user.user_id),
79 oid: Set(id),
80 granted_by: Set(action_with_opt.granted_by as _),
81 action: Set(action_with_opt.get_action()?.into()),
82 with_grant_option: Set(action_with_opt.with_grant_option),
83 ..Default::default()
84 });
85 }
86 }
87 UserPrivilege::insert_many(privileges).exec(&txn).await?;
88 }
89 txn.commit().await?;
90
91 let mut user_info: PbUserInfo = user.into();
92 user_info.grant_privileges = grant_privileges;
93 let version = self
94 .notify_frontend(
95 NotificationOperation::Add,
96 NotificationInfo::User(user_info),
97 )
98 .await;
99
100 Ok(version)
101 }
102
103 pub async fn update_user(
104 &self,
105 update_user: PbUserInfo,
106 update_fields: &[PbUpdateField],
107 ) -> MetaResult<NotificationVersion> {
108 let inner = self.inner.write().await;
109 let rename_flag = update_fields.contains(&PbUpdateField::Rename);
110 if rename_flag {
111 check_user_name_duplicate(&update_user.name, &inner.db).await?;
112 }
113
114 let user = User::find_by_id(update_user.id as UserId)
115 .one(&inner.db)
116 .await?
117 .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
118 let mut user = user.into_active_model();
119 update_fields.iter().for_each(|&field| match field {
120 PbUpdateField::Unspecified => unreachable!(),
121 PbUpdateField::Super => user.is_super = Set(update_user.is_super),
122 PbUpdateField::Login => user.can_login = Set(update_user.can_login),
123 PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
124 PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
125 PbUpdateField::AuthInfo => {
126 user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
127 }
128 PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
129 PbUpdateField::Admin => user.is_admin = Set(update_user.is_admin),
130 });
131
132 let user = user.update(&inner.db).await?;
133 let mut user_info: PbUserInfo = user.into();
134 user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
135 let version = self
136 .notify_frontend(
137 NotificationOperation::Update,
138 NotificationInfo::User(user_info),
139 )
140 .await;
141
142 Ok(version)
143 }
144
145 #[cfg(test)]
146 pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
147 let inner = self.inner.read().await;
148 let user = User::find_by_id(id)
149 .one(&inner.db)
150 .await?
151 .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
152 Ok(user)
153 }
154
155 #[cfg(test)]
156 pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
157 let inner = self.inner.read().await;
158 let user = User::find()
159 .filter(user::Column::Name.eq(name))
160 .one(&inner.db)
161 .await?
162 .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
163 Ok(user)
164 }
165
166 pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
167 let inner = self.inner.write().await;
168 let txn = inner.db.begin().await?;
169 let user = User::find_by_id(user_id)
170 .one(&txn)
171 .await?
172 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
173 if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
174 return Err(MetaError::permission_denied(format!(
175 "drop default super user {} is not allowed",
176 user.name
177 )));
178 }
179
180 let count = Object::find()
182 .filter(object::Column::OwnerId.eq(user_id))
183 .count(&txn)
184 .await?;
185 if count != 0 {
186 return Err(MetaError::permission_denied(format!(
187 "drop user {} is not allowed, because it owns {} objects",
188 user.name, count
189 )));
190 }
191
192 let count = UserPrivilege::find()
194 .filter(user_privilege::Column::GrantedBy.eq(user_id))
195 .count(&txn)
196 .await?;
197 if count != 0 {
198 return Err(MetaError::permission_denied(format!(
199 "drop user {} is not allowed, because it granted {} privileges to others",
200 user.name, count
201 )));
202 }
203
204 let res = User::delete_by_id(user_id).exec(&txn).await?;
205 if res.rows_affected != 1 {
206 return Err(MetaError::catalog_id_not_found("user", user_id));
207 }
208 txn.commit().await?;
209
210 let version = self
211 .notify_frontend(
212 NotificationOperation::Delete,
213 NotificationInfo::User(PbUserInfo {
214 id: user_id as _,
215 ..Default::default()
216 }),
217 )
218 .await;
219
220 Ok(version)
221 }
222
223 pub async fn grant_privilege(
224 &self,
225 user_ids: Vec<UserId>,
226 new_grant_privileges: &[PbGrantPrivilege],
227 grantor: UserId,
228 with_grant_option: bool,
229 ) -> MetaResult<NotificationVersion> {
230 let inner = self.inner.write().await;
231 let txn = inner.db.begin().await?;
232 for user_id in &user_ids {
233 ensure_user_id(*user_id, &txn).await?;
234 }
235
236 let mut privileges = vec![];
237 for gp in new_grant_privileges {
238 let id = extract_grant_obj_id(gp.get_object()?);
239 let internal_table_ids = get_internal_tables_by_id(id.as_job_id(), &txn).await?;
240 let index_state_table_ids =
241 get_index_state_tables_by_table_id(id.as_table_id(), &txn).await?;
242 for action_with_opt in &gp.action_with_opts {
243 let action = action_with_opt.get_action()?.into();
244 privileges.push(user_privilege::ActiveModel {
245 oid: Set(id),
246 granted_by: Set(grantor),
247 action: Set(action),
248 with_grant_option: Set(with_grant_option),
249 ..Default::default()
250 });
251 if action == Action::Select {
252 privileges.extend(
253 internal_table_ids
254 .iter()
255 .chain(index_state_table_ids.iter())
256 .map(|&tid| user_privilege::ActiveModel {
257 oid: Set(tid.as_object_id()),
258 granted_by: Set(grantor),
259 action: Set(Action::Select),
260 with_grant_option: Set(with_grant_option),
261 ..Default::default()
262 }),
263 );
264 let iceberg_privilege_object_ids =
265 get_iceberg_related_object_ids(id, &txn).await?;
266 privileges.extend(iceberg_privilege_object_ids.iter().map(
267 |&iceberg_object_id| user_privilege::ActiveModel {
268 oid: Set(iceberg_object_id),
269 granted_by: Set(grantor),
270 action: Set(action),
271 with_grant_option: Set(with_grant_option),
272 ..Default::default()
273 },
274 ));
275 }
276 }
277 }
278
279 let user = User::find_by_id(grantor)
281 .one(&txn)
282 .await?
283 .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
284 let mut filtered_privileges = vec![];
285 if !user.is_super {
286 for mut privilege in privileges {
287 if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
288 filtered_privileges.push(privilege);
289 continue;
290 }
291 let filter = user_privilege::Column::UserId
292 .eq(grantor)
293 .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
294 .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
295 .and(user_privilege::Column::WithGrantOption.eq(true));
296 let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
297 .select_only()
298 .column(user_privilege::Column::Id)
299 .filter(filter)
300 .into_tuple()
301 .one(&txn)
302 .await?;
303 let Some(privilege_id) = privilege_id else {
304 tracing::warn!(
305 "user {} don't have privilege {:?} or grant option",
306 grantor,
307 privilege.action
308 );
309 continue;
310 };
311 privilege.dependent_id = Set(Some(privilege_id));
312 filtered_privileges.push(privilege);
313 }
314 } else {
315 filtered_privileges = privileges;
316 }
317
318 let user_privileges = user_ids
320 .iter()
321 .flat_map(|user_id| {
322 filtered_privileges.iter().map(|p| {
323 let mut p = p.clone();
324 p.user_id = Set(*user_id);
325 p
326 })
327 })
328 .collect_vec();
329 for privilege in user_privileges {
330 let mut on_conflict = OnConflict::columns([
331 user_privilege::Column::UserId,
332 user_privilege::Column::Oid,
333 user_privilege::Column::Action,
334 user_privilege::Column::GrantedBy,
335 ]);
336 if *privilege.with_grant_option.as_ref() {
337 on_conflict.update_column(user_privilege::Column::WithGrantOption);
338 } else {
339 on_conflict.update_column(user_privilege::Column::UserId);
341 }
342
343 UserPrivilege::insert(privilege)
344 .on_conflict(on_conflict)
345 .do_nothing()
346 .exec(&txn)
347 .await?;
348 }
349
350 let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
351
352 txn.commit().await?;
353
354 let version = self.notify_users_update(user_infos).await;
355 Ok(version)
356 }
357
358 pub async fn revoke_privilege(
359 &self,
360 user_ids: Vec<UserId>,
361 revoke_grant_privileges: &[PbGrantPrivilege],
362 granted_by: UserId,
363 revoke_by: UserId,
364 revoke_grant_option: bool,
365 cascade: bool,
366 ) -> MetaResult<NotificationVersion> {
367 let inner = self.inner.write().await;
368 let txn = inner.db.begin().await?;
369 for user_id in &user_ids {
370 ensure_user_id(*user_id, &txn).await?;
371 }
372 let revoke_user = User::find_by_id(revoke_by)
374 .one(&txn)
375 .await?
376 .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
377
378 if !revoke_user.is_super && granted_by != revoke_by {
380 let granted_user_name: String = User::find_by_id(granted_by)
381 .select_only()
382 .column(user::Column::Name)
383 .into_tuple()
384 .one(&txn)
385 .await?
386 .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
387 return Err(MetaError::permission_denied(format!(
388 "user {} is not super, can't revoke privileges for {}",
389 revoke_user.name, granted_user_name
390 )));
391 }
392
393 let mut revoke_items = HashMap::new();
394 for privilege in revoke_grant_privileges {
395 let obj = extract_grant_obj_id(privilege.get_object()?);
396 let internal_table_ids = get_internal_tables_by_id(obj.as_job_id(), &txn).await?;
397 let index_state_table_ids =
398 get_index_state_tables_by_table_id(obj.as_table_id(), &txn).await?;
399 let mut include_select = false;
400 let actions = privilege
401 .action_with_opts
402 .iter()
403 .map(|ao| {
404 let action = Action::from(ao.get_action().unwrap());
405 if action == Action::Select {
406 include_select = true;
407 }
408 action
409 })
410 .collect_vec();
411 revoke_items.insert(obj, actions);
412 if include_select {
413 revoke_items.extend(
414 internal_table_ids
415 .iter()
416 .chain(index_state_table_ids.iter())
417 .map(|&tid| (tid.as_object_id(), vec![Action::Select])),
418 );
419 let iceberg_privilege_object_ids =
420 get_iceberg_related_object_ids(obj, &txn).await?;
421 if !iceberg_privilege_object_ids.is_empty() {
422 revoke_items.extend(
423 iceberg_privilege_object_ids
424 .into_iter()
425 .map(|iceberg_object_id| (iceberg_object_id, vec![Action::Select])),
426 );
427 }
428 }
429 }
430
431 let filter = if !revoke_user.is_super {
432 for (obj, actions) in &revoke_items {
434 if revoke_by == get_object_owner(*obj, &txn).await? {
435 continue;
436 }
437 let owned_actions: HashSet<Action> = UserPrivilege::find()
438 .select_only()
439 .column(user_privilege::Column::Action)
440 .filter(
441 user_privilege::Column::UserId
442 .eq(granted_by)
443 .and(user_privilege::Column::Oid.eq(*obj))
444 .and(user_privilege::Column::WithGrantOption.eq(true)),
445 )
446 .into_tuple::<Action>()
447 .all(&txn)
448 .await?
449 .into_iter()
450 .collect();
451 if actions.iter().any(|ac| !owned_actions.contains(ac)) {
452 return Err(MetaError::permission_denied(format!(
453 "user {} don't have privileges {:?} or grant option",
454 revoke_user.name, actions,
455 )));
456 }
457 }
458
459 user_privilege::Column::GrantedBy
460 .eq(granted_by)
461 .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
462 } else {
463 user_privilege::Column::UserId.is_in(user_ids.clone())
464 };
465 let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
466 for (obj, actions) in revoke_items {
467 let filter = filter
468 .clone()
469 .and(user_privilege::Column::Oid.eq(obj))
470 .and(user_privilege::Column::Action.is_in(actions));
471 root_user_privileges.extend(
472 UserPrivilege::find()
473 .select_only()
474 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
475 .filter(filter)
476 .into_partial_model()
477 .all(&txn)
478 .await?,
479 );
480 }
481 if root_user_privileges.is_empty() {
482 tracing::warn!("no privilege to revoke, ignore it");
483 return Ok(IGNORED_NOTIFICATION_VERSION);
484 }
485
486 let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
488 let (all_privilege_ids, to_update_user_ids) = if !cascade {
489 ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
490 (
491 root_privilege_ids.clone(),
492 root_user_privileges
493 .iter()
494 .map(|ur| ur.user_id)
495 .collect_vec(),
496 )
497 } else {
498 let all_user_privileges =
499 get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
500 (
501 all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
502 all_user_privileges
503 .iter()
504 .map(|ur| ur.user_id)
505 .collect_vec(),
506 )
507 };
508
509 if revoke_grant_option {
510 UserPrivilege::update_many()
511 .col_expr(
512 user_privilege::Column::WithGrantOption,
513 SimpleExpr::Value(Value::Bool(Some(false))),
514 )
515 .filter(
516 user_privilege::Column::Id
517 .is_in(all_privilege_ids)
518 .and(user_privilege::Column::WithGrantOption.eq(true)),
519 )
520 .exec(&txn)
521 .await?;
522 } else {
523 UserPrivilege::delete_many()
525 .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
526 .exec(&txn)
527 .await?;
528 }
529
530 let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
531
532 txn.commit().await?;
533
534 let version = self.notify_users_update(user_infos).await;
535 Ok(version)
536 }
537
538 pub async fn grant_default_privileges(
539 &self,
540 user_ids: Vec<UserId>,
541 database_id: DatabaseId,
542 schema_ids: Vec<SchemaId>,
543 grantor: UserId,
544 actions: Vec<PbAction>,
545 object_type: PbObjectType,
546 grantees: Vec<UserId>,
547 with_grant_option: bool,
548 ) -> MetaResult<()> {
549 tracing::debug!(
550 ?user_ids,
551 %database_id,
552 ?schema_ids,
553 ?actions,
554 ?object_type,
555 ?grantees,
556 with_grant_option,
557 "grant default privileges",
558 );
559 let inner = self.inner.write().await;
560 let txn = inner.db.begin().await?;
561 for user_id in &user_ids {
562 ensure_user_id(*user_id, &txn).await?;
563 }
564 ensure_object_id(ObjectType::Database, database_id, &txn).await?;
565 for schema_id in &schema_ids {
566 ensure_object_id(ObjectType::Schema, *schema_id, &txn).await?;
567 }
568 for grantee in &grantees {
569 ensure_user_id(*grantee, &txn).await?;
570 }
571 if object_type == PbObjectType::Schema {
572 assert!(
573 schema_ids.is_empty(),
574 "schema_ids should be empty when object_type is Schema"
575 );
576
577 let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
580 let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
581 .select_only()
582 .columns([
583 user_default_privilege::Column::Id,
584 user_default_privilege::Column::UserId,
585 user_default_privilege::Column::Grantee,
586 user_default_privilege::Column::Action,
587 ])
588 .filter(
589 user_default_privilege::Column::DatabaseId
590 .eq(database_id)
591 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
592 .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
593 .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
594 .and(user_default_privilege::Column::Action.is_in(actions.clone())),
595 )
596 .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
597 .all(&txn)
598 .await?
599 .into_iter()
600 .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
601 .collect();
602
603 for user_id in user_ids {
604 for grantee in &grantees {
605 for action in &actions {
606 if let Some(existing_id) =
607 existing_default_privileges.get(&(user_id, *grantee, *action))
608 && with_grant_option
609 {
610 UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
612 id: Set(*existing_id),
613 with_grant_option: Set(true),
614 granted_by: Set(grantor as _),
615 ..Default::default()
616 })
617 .exec(&txn)
618 .await?;
619 } else {
620 UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
621 id: Default::default(),
622 database_id: Set(database_id),
623 schema_id: Set(None),
624 object_type: Set(ObjectType::Schema),
625 for_materialized_view: Set(false),
626 user_id: Set(user_id),
627 grantee: Set(*grantee),
628 granted_by: Set(grantor as _),
629 action: Set(*action),
630 with_grant_option: Set(with_grant_option),
631 })
632 .exec(&txn)
633 .await?;
634 }
635 }
636 }
637 }
638 } else {
639 let mut default_privileges = vec![];
640 for user_id in user_ids {
641 for grantee in &grantees {
642 for action in &actions {
643 if schema_ids.is_empty() {
644 default_privileges.push(user_default_privilege::ActiveModel {
645 id: Default::default(),
646 database_id: Set(database_id),
647 schema_id: Set(None),
648 object_type: Set(object_type.into()),
649 for_materialized_view: Set(object_type == PbObjectType::Mview),
650 user_id: Set(user_id),
651 grantee: Set(*grantee),
652 granted_by: Set(grantor as _),
653 action: Set((*action).into()),
654 with_grant_option: Set(with_grant_option),
655 });
656 continue;
657 }
658 for schema_id in &schema_ids {
659 default_privileges.push(user_default_privilege::ActiveModel {
660 id: Default::default(),
661 database_id: Set(database_id),
662 schema_id: Set(Some(*schema_id)),
663 object_type: Set(object_type.into()),
664 for_materialized_view: Set(object_type == PbObjectType::Mview),
665 user_id: Set(user_id),
666 grantee: Set(*grantee),
667 granted_by: Set(grantor as _),
668 action: Set((*action).into()),
669 with_grant_option: Set(with_grant_option),
670 });
671 }
672 }
673 }
674 }
675
676 let mut on_conflict = OnConflict::columns([
677 user_default_privilege::Column::UserId,
678 user_default_privilege::Column::DatabaseId,
679 user_default_privilege::Column::SchemaId,
680 user_default_privilege::Column::ObjectType,
681 user_default_privilege::Column::ForMaterializedView,
682 user_default_privilege::Column::Grantee,
683 user_default_privilege::Column::Action,
684 ]);
685 if with_grant_option {
686 on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
687 } else {
688 on_conflict.update_column(user_default_privilege::Column::UserId);
690 }
691 UserDefaultPrivilege::insert_many(default_privileges)
692 .on_conflict(on_conflict)
693 .do_nothing()
694 .exec(&txn)
695 .await?;
696 }
697
698 txn.commit().await?;
699 Ok(())
700 }
701
702 pub async fn revoke_default_privileges(
703 &self,
704 user_ids: Vec<UserId>,
705 database_id: DatabaseId,
706 schema_ids: Vec<SchemaId>,
707 actions: Vec<PbAction>,
708 object_type: PbObjectType,
709 grantees: Vec<UserId>,
710 revoke_grant_option: bool,
711 ) -> MetaResult<()> {
712 let inner = self.inner.write().await;
713 let txn = inner.db.begin().await?;
714 for user_id in &user_ids {
715 ensure_user_id(*user_id, &txn).await?;
716 }
717
718 let schema_filter = if schema_ids.is_empty() {
719 user_default_privilege::Column::SchemaId.is_null()
720 } else {
721 user_default_privilege::Column::SchemaId.is_in(schema_ids)
722 };
723 let filter = user_default_privilege::Column::DatabaseId
724 .eq(database_id)
725 .and(schema_filter)
726 .and(user_default_privilege::Column::UserId.is_in(user_ids))
727 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
728 .and(user_default_privilege::Column::Grantee.is_in(grantees))
729 .and(
730 user_default_privilege::Column::Action
731 .is_in(actions.iter().map(|&a| Action::from(a))),
732 );
733
734 if revoke_grant_option {
735 let res = UserDefaultPrivilege::update_many()
737 .col_expr(
738 user_default_privilege::Column::WithGrantOption,
739 SimpleExpr::Value(Value::Bool(Some(false))),
740 )
741 .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
742 .exec(&txn)
743 .await?;
744 tracing::info!(
745 "revoke {count} grant option for default privileges",
746 count = res.rows_affected
747 );
748 } else {
749 let res = UserDefaultPrivilege::delete_many()
750 .filter(filter)
751 .exec(&txn)
752 .await?;
753 tracing::info!(
754 "revoke {count} default privileges",
755 count = res.rows_affected
756 );
757 }
758
759 txn.commit().await?;
760 Ok(())
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
767
768 use super::*;
769 use crate::manager::MetaSrvEnv;
770
771 const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
772 const TEST_ROOT_USER_ID: UserId = 1;
773
774 fn make_test_user(name: &str) -> PbUserInfo {
775 PbUserInfo {
776 name: name.to_owned(),
777 ..Default::default()
778 }
779 }
780
781 fn make_privilege(
782 object: PbObject,
783 actions: &[PbAction],
784 with_grant_option: bool,
785 ) -> PbGrantPrivilege {
786 PbGrantPrivilege {
787 object: Some(object),
788 action_with_opts: actions
789 .iter()
790 .map(|&action| PbActionWithGrantOption {
791 action: action as _,
792 with_grant_option,
793 ..Default::default()
794 })
795 .collect(),
796 }
797 }
798
799 #[tokio::test]
800 async fn test_user_and_privilege() -> MetaResult<()> {
801 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
802 mgr.create_user(make_test_user("test_user_1")).await?;
803 mgr.create_user(make_test_user("test_user_2")).await?;
804 let user_1 = mgr.get_user_by_name("test_user_1").await?;
805 let user_2 = mgr.get_user_by_name("test_user_2").await?;
806
807 assert!(
808 mgr.create_user(make_test_user("test_user_1"))
809 .await
810 .is_err(),
811 "user_1 already exists"
812 );
813 mgr.update_user(
814 PbUserInfo {
815 id: user_1.user_id as _,
816 name: "test_user_1_new".to_owned(),
817 ..Default::default()
818 },
819 &[PbUpdateField::Rename],
820 )
821 .await?;
822 let user_1 = mgr.get_user(user_1.user_id).await?;
823 assert_eq!(user_1.name, "test_user_1_new".to_owned());
824
825 let conn_with_option = make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Connect], true);
826 let create_without_option =
827 make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Create], false);
828 mgr.grant_privilege(
830 vec![user_1.user_id],
831 std::slice::from_ref(&conn_with_option),
832 TEST_ROOT_USER_ID,
833 true,
834 )
835 .await?;
836 mgr.grant_privilege(
838 vec![user_1.user_id],
839 std::slice::from_ref(&create_without_option),
840 TEST_ROOT_USER_ID,
841 false,
842 )
843 .await?;
844 mgr.grant_privilege(
846 vec![user_2.user_id],
847 std::slice::from_ref(&conn_with_option),
848 user_1.user_id,
849 true,
850 )
851 .await?;
852 mgr.grant_privilege(
853 vec![user_2.user_id],
854 std::slice::from_ref(&create_without_option),
855 user_1.user_id,
856 false,
857 )
858 .await?;
859
860 assert!(
861 mgr.drop_user(user_1.user_id).await.is_err(),
862 "user_1 can't be dropped"
863 );
864
865 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
866 assert_eq!(privilege_1.len(), 2);
867 assert!(
868 privilege_1
869 .iter()
870 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
871 && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32)
872 );
873
874 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
875 assert_eq!(privilege_2.len(), 1);
876 assert!(
877 privilege_2
878 .iter()
879 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
880 && gp.action_with_opts[0].granted_by == user_1.user_id as u32
881 && gp.action_with_opts[0].with_grant_option)
882 );
883
884 assert!(
886 mgr.revoke_privilege(
887 vec![user_1.user_id],
888 std::slice::from_ref(&conn_with_option),
889 TEST_ROOT_USER_ID,
890 user_2.user_id,
891 false,
892 false
893 )
894 .await
895 .is_err(),
896 "user_2 can't revoke for user_1"
897 );
898
899 assert!(
901 mgr.revoke_privilege(
902 vec![user_2.user_id],
903 std::slice::from_ref(&create_without_option),
904 user_1.user_id,
905 user_1.user_id,
906 false,
907 false
908 )
909 .await
910 .is_err(),
911 "user_2 don't have grant option for CREATE"
912 );
913
914 assert!(
916 mgr.revoke_privilege(
917 vec![user_1.user_id],
918 std::slice::from_ref(&conn_with_option),
919 TEST_ROOT_USER_ID,
920 TEST_ROOT_USER_ID,
921 false,
922 false
923 )
924 .await
925 .is_err(),
926 "permission deny in restrict mode, CONN granted to user_2"
927 );
928
929 mgr.revoke_privilege(
931 vec![user_1.user_id],
932 std::slice::from_ref(&create_without_option),
933 TEST_ROOT_USER_ID,
934 TEST_ROOT_USER_ID,
935 false,
936 false,
937 )
938 .await?;
939
940 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
941 assert_eq!(privilege_1.len(), 1);
942 assert!(
943 privilege_1
944 .iter()
945 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
946 && gp.action_with_opts[0].action == PbAction::Connect as i32)
947 );
948
949 mgr.revoke_privilege(
951 vec![user_1.user_id],
952 std::slice::from_ref(&conn_with_option),
953 TEST_ROOT_USER_ID,
954 TEST_ROOT_USER_ID,
955 true,
956 true,
957 )
958 .await?;
959 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
960 assert_eq!(privilege_1.len(), 1);
961 assert!(
962 privilege_1
963 .iter()
964 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
965 && gp.action_with_opts[0].action == PbAction::Connect as i32
966 && !gp.action_with_opts[0].with_grant_option)
967 );
968 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
969 assert_eq!(privilege_2.len(), 1);
970 assert!(
971 privilege_2
972 .iter()
973 .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
974 && gp.action_with_opts[0].action == PbAction::Connect as i32
975 && !gp.action_with_opts[0].with_grant_option)
976 );
977
978 mgr.revoke_privilege(
980 vec![user_1.user_id],
981 std::slice::from_ref(&conn_with_option),
982 TEST_ROOT_USER_ID,
983 TEST_ROOT_USER_ID,
984 false,
985 true,
986 )
987 .await?;
988 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
989 assert!(privilege_1.is_empty());
990 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
991 assert!(privilege_2.is_empty());
992
993 mgr.drop_user(user_1.user_id).await?;
994 mgr.drop_user(user_2.user_id).await?;
995 Ok(())
996 }
997}