1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
21use risingwave_meta_model::user_privilege::Action;
22use risingwave_meta_model::{
23 AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
24 user_default_privilege, user_privilege,
25};
26use risingwave_pb::common::PbObjectType;
27use risingwave_pb::meta::subscribe_response::{
28 Info as NotificationInfo, Operation as NotificationOperation,
29};
30use risingwave_pb::user::update_user_request::PbUpdateField;
31use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
32use sea_orm::ActiveValue::Set;
33use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
34use sea_orm::{
35 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
36 QuerySelect, TransactionTrait,
37};
38
39use crate::controller::catalog::CatalogController;
40use crate::controller::utils::{
41 PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
42 ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
43 get_index_state_tables_by_table_id, get_internal_tables_by_id, get_object_owner,
44 get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
45};
46use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
47use crate::{MetaError, MetaResult};
48
49impl CatalogController {
50 pub(crate) async fn notify_users_update(
51 &self,
52 user_infos: Vec<PbUserInfo>,
53 ) -> NotificationVersion {
54 let mut version = 0;
55 for info in user_infos {
56 version = self
57 .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
58 .await;
59 }
60 version
61 }
62
63 pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
64 let inner = self.inner.write().await;
65 let txn = inner.db.begin().await?;
66 check_user_name_duplicate(&pb_user.name, &txn).await?;
67
68 let grant_privileges = pb_user.grant_privileges.clone();
69 let user: user::ActiveModel = pb_user.into();
70 let user = user.insert(&txn).await?;
71
72 if !grant_privileges.is_empty() {
73 let mut privileges = vec![];
74 for gp in &grant_privileges {
75 let id = extract_grant_obj_id(gp.get_object()?);
76 for action_with_opt in &gp.action_with_opts {
77 privileges.push(user_privilege::ActiveModel {
78 user_id: Set(user.user_id),
79 oid: Set(id),
80 granted_by: Set(action_with_opt.granted_by as _),
81 action: Set(action_with_opt.get_action()?.into()),
82 with_grant_option: Set(action_with_opt.with_grant_option),
83 ..Default::default()
84 });
85 }
86 }
87 UserPrivilege::insert_many(privileges).exec(&txn).await?;
88 }
89 txn.commit().await?;
90
91 let mut user_info: PbUserInfo = user.into();
92 user_info.grant_privileges = grant_privileges;
93 let version = self
94 .notify_frontend(
95 NotificationOperation::Add,
96 NotificationInfo::User(user_info),
97 )
98 .await;
99
100 Ok(version)
101 }
102
103 pub async fn update_user(
104 &self,
105 update_user: PbUserInfo,
106 update_fields: &[PbUpdateField],
107 ) -> MetaResult<NotificationVersion> {
108 let inner = self.inner.write().await;
109 let rename_flag = update_fields.contains(&PbUpdateField::Rename);
110 if rename_flag {
111 check_user_name_duplicate(&update_user.name, &inner.db).await?;
112 }
113
114 let user = User::find_by_id(update_user.id as UserId)
115 .one(&inner.db)
116 .await?
117 .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
118 let mut user = user.into_active_model();
119 update_fields.iter().for_each(|&field| match field {
120 PbUpdateField::Unspecified => unreachable!(),
121 PbUpdateField::Super => user.is_super = Set(update_user.is_super),
122 PbUpdateField::Login => user.can_login = Set(update_user.can_login),
123 PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
124 PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
125 PbUpdateField::AuthInfo => {
126 user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
127 }
128 PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
129 });
130
131 let user = user.update(&inner.db).await?;
132 let mut user_info: PbUserInfo = user.into();
133 user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
134 let version = self
135 .notify_frontend(
136 NotificationOperation::Update,
137 NotificationInfo::User(user_info),
138 )
139 .await;
140
141 Ok(version)
142 }
143
144 #[cfg(test)]
145 pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
146 let inner = self.inner.read().await;
147 let user = User::find_by_id(id)
148 .one(&inner.db)
149 .await?
150 .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
151 Ok(user)
152 }
153
154 #[cfg(test)]
155 pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
156 let inner = self.inner.read().await;
157 let user = User::find()
158 .filter(user::Column::Name.eq(name))
159 .one(&inner.db)
160 .await?
161 .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
162 Ok(user)
163 }
164
165 pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
166 let inner = self.inner.write().await;
167 let txn = inner.db.begin().await?;
168 let user = User::find_by_id(user_id)
169 .one(&txn)
170 .await?
171 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
172 if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
173 return Err(MetaError::permission_denied(format!(
174 "drop default super user {} is not allowed",
175 user.name
176 )));
177 }
178
179 let count = Object::find()
181 .filter(object::Column::OwnerId.eq(user_id))
182 .count(&txn)
183 .await?;
184 if count != 0 {
185 return Err(MetaError::permission_denied(format!(
186 "drop user {} is not allowed, because it owns {} objects",
187 user.name, count
188 )));
189 }
190
191 let count = UserPrivilege::find()
193 .filter(user_privilege::Column::GrantedBy.eq(user_id))
194 .count(&txn)
195 .await?;
196 if count != 0 {
197 return Err(MetaError::permission_denied(format!(
198 "drop user {} is not allowed, because it granted {} privileges to others",
199 user.name, count
200 )));
201 }
202
203 let res = User::delete_by_id(user_id).exec(&txn).await?;
204 if res.rows_affected != 1 {
205 return Err(MetaError::catalog_id_not_found("user", user_id));
206 }
207 txn.commit().await?;
208
209 let version = self
210 .notify_frontend(
211 NotificationOperation::Delete,
212 NotificationInfo::User(PbUserInfo {
213 id: user_id as _,
214 ..Default::default()
215 }),
216 )
217 .await;
218
219 Ok(version)
220 }
221
222 pub async fn grant_privilege(
223 &self,
224 user_ids: Vec<UserId>,
225 new_grant_privileges: &[PbGrantPrivilege],
226 grantor: UserId,
227 with_grant_option: bool,
228 ) -> MetaResult<NotificationVersion> {
229 let inner = self.inner.write().await;
230 let txn = inner.db.begin().await?;
231 for user_id in &user_ids {
232 ensure_user_id(*user_id, &txn).await?;
233 }
234
235 let mut privileges = vec![];
236 for gp in new_grant_privileges {
237 let id = extract_grant_obj_id(gp.get_object()?);
238 let internal_table_ids = get_internal_tables_by_id(id, &txn).await?;
239 let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?;
240 for action_with_opt in &gp.action_with_opts {
241 let action = action_with_opt.get_action()?.into();
242 privileges.push(user_privilege::ActiveModel {
243 oid: Set(id),
244 granted_by: Set(grantor),
245 action: Set(action),
246 with_grant_option: Set(with_grant_option),
247 ..Default::default()
248 });
249 if action == Action::Select {
250 privileges.extend(
251 internal_table_ids
252 .iter()
253 .chain(index_state_table_ids.iter())
254 .map(|&tid| user_privilege::ActiveModel {
255 oid: Set(tid),
256 granted_by: Set(grantor),
257 action: Set(Action::Select),
258 with_grant_option: Set(with_grant_option),
259 ..Default::default()
260 }),
261 );
262 }
263 }
264 }
265
266 let user = User::find_by_id(grantor)
268 .one(&txn)
269 .await?
270 .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
271 let mut filtered_privileges = vec![];
272 if !user.is_super {
273 for mut privilege in privileges {
274 if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
275 filtered_privileges.push(privilege);
276 continue;
277 }
278 let filter = user_privilege::Column::UserId
279 .eq(grantor)
280 .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
281 .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
282 .and(user_privilege::Column::WithGrantOption.eq(true));
283 let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
284 .select_only()
285 .column(user_privilege::Column::Id)
286 .filter(filter)
287 .into_tuple()
288 .one(&txn)
289 .await?;
290 let Some(privilege_id) = privilege_id else {
291 tracing::warn!(
292 "user {} don't have privilege {:?} or grant option",
293 grantor,
294 privilege.action
295 );
296 continue;
297 };
298 privilege.dependent_id = Set(Some(privilege_id));
299 filtered_privileges.push(privilege);
300 }
301 } else {
302 filtered_privileges = privileges;
303 }
304
305 let user_privileges = user_ids
307 .iter()
308 .flat_map(|user_id| {
309 filtered_privileges.iter().map(|p| {
310 let mut p = p.clone();
311 p.user_id = Set(*user_id);
312 p
313 })
314 })
315 .collect_vec();
316 for privilege in user_privileges {
317 let mut on_conflict = OnConflict::columns([
318 user_privilege::Column::UserId,
319 user_privilege::Column::Oid,
320 user_privilege::Column::Action,
321 user_privilege::Column::GrantedBy,
322 ]);
323 if *privilege.with_grant_option.as_ref() {
324 on_conflict.update_column(user_privilege::Column::WithGrantOption);
325 } else {
326 on_conflict.update_column(user_privilege::Column::UserId);
328 }
329
330 UserPrivilege::insert(privilege)
331 .on_conflict(on_conflict)
332 .do_nothing()
333 .exec(&txn)
334 .await?;
335 }
336
337 let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
338
339 txn.commit().await?;
340
341 let version = self.notify_users_update(user_infos).await;
342 Ok(version)
343 }
344
345 pub async fn revoke_privilege(
346 &self,
347 user_ids: Vec<UserId>,
348 revoke_grant_privileges: &[PbGrantPrivilege],
349 granted_by: UserId,
350 revoke_by: UserId,
351 revoke_grant_option: bool,
352 cascade: bool,
353 ) -> MetaResult<NotificationVersion> {
354 let inner = self.inner.write().await;
355 let txn = inner.db.begin().await?;
356 for user_id in &user_ids {
357 ensure_user_id(*user_id, &txn).await?;
358 }
359 let revoke_user = User::find_by_id(revoke_by)
361 .one(&txn)
362 .await?
363 .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
364
365 if !revoke_user.is_super && granted_by != revoke_by {
367 let granted_user_name: String = User::find_by_id(granted_by)
368 .select_only()
369 .column(user::Column::Name)
370 .into_tuple()
371 .one(&txn)
372 .await?
373 .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
374 return Err(MetaError::permission_denied(format!(
375 "user {} is not super, can't revoke privileges for {}",
376 revoke_user.name, granted_user_name
377 )));
378 }
379
380 let mut revoke_items = HashMap::new();
381 for privilege in revoke_grant_privileges {
382 let obj = extract_grant_obj_id(privilege.get_object()?);
383 let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?;
384 let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?;
385 let mut include_select = false;
386 let actions = privilege
387 .action_with_opts
388 .iter()
389 .map(|ao| {
390 let action = Action::from(ao.get_action().unwrap());
391 if action == Action::Select {
392 include_select = true;
393 }
394 action
395 })
396 .collect_vec();
397 revoke_items.insert(obj, actions);
398 if include_select {
399 revoke_items.extend(
400 internal_table_ids
401 .iter()
402 .chain(index_state_table_ids.iter())
403 .map(|&tid| (tid, vec![Action::Select])),
404 );
405 }
406 }
407
408 let filter = if !revoke_user.is_super {
409 for (obj, actions) in &revoke_items {
411 if revoke_by == get_object_owner(*obj, &txn).await? {
412 continue;
413 }
414 let owned_actions: HashSet<Action> = UserPrivilege::find()
415 .select_only()
416 .column(user_privilege::Column::Action)
417 .filter(
418 user_privilege::Column::UserId
419 .eq(granted_by)
420 .and(user_privilege::Column::Oid.eq(*obj))
421 .and(user_privilege::Column::WithGrantOption.eq(true)),
422 )
423 .into_tuple::<Action>()
424 .all(&txn)
425 .await?
426 .into_iter()
427 .collect();
428 if actions.iter().any(|ac| !owned_actions.contains(ac)) {
429 return Err(MetaError::permission_denied(format!(
430 "user {} don't have privileges {:?} or grant option",
431 revoke_user.name, actions,
432 )));
433 }
434 }
435
436 user_privilege::Column::GrantedBy
437 .eq(granted_by)
438 .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
439 } else {
440 user_privilege::Column::UserId.is_in(user_ids.clone())
441 };
442 let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
443 for (obj, actions) in revoke_items {
444 let filter = filter
445 .clone()
446 .and(user_privilege::Column::Oid.eq(obj))
447 .and(user_privilege::Column::Action.is_in(actions));
448 root_user_privileges.extend(
449 UserPrivilege::find()
450 .select_only()
451 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
452 .filter(filter)
453 .into_partial_model()
454 .all(&txn)
455 .await?,
456 );
457 }
458 if root_user_privileges.is_empty() {
459 tracing::warn!("no privilege to revoke, ignore it");
460 return Ok(IGNORED_NOTIFICATION_VERSION);
461 }
462
463 let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
465 let (all_privilege_ids, to_update_user_ids) = if !cascade {
466 ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
467 (
468 root_privilege_ids.clone(),
469 root_user_privileges
470 .iter()
471 .map(|ur| ur.user_id)
472 .collect_vec(),
473 )
474 } else {
475 let all_user_privileges =
476 get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
477 (
478 all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
479 all_user_privileges
480 .iter()
481 .map(|ur| ur.user_id)
482 .collect_vec(),
483 )
484 };
485
486 if revoke_grant_option {
487 UserPrivilege::update_many()
488 .col_expr(
489 user_privilege::Column::WithGrantOption,
490 SimpleExpr::Value(Value::Bool(Some(false))),
491 )
492 .filter(
493 user_privilege::Column::Id
494 .is_in(all_privilege_ids)
495 .and(user_privilege::Column::WithGrantOption.eq(true)),
496 )
497 .exec(&txn)
498 .await?;
499 } else {
500 UserPrivilege::delete_many()
502 .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
503 .exec(&txn)
504 .await?;
505 }
506
507 let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
508
509 txn.commit().await?;
510
511 let version = self.notify_users_update(user_infos).await;
512 Ok(version)
513 }
514
515 pub async fn grant_default_privileges(
516 &self,
517 user_ids: Vec<UserId>,
518 database_id: DatabaseId,
519 schema_ids: Vec<SchemaId>,
520 grantor: UserId,
521 actions: Vec<PbAction>,
522 object_type: PbObjectType,
523 grantees: Vec<UserId>,
524 with_grant_option: bool,
525 ) -> MetaResult<()> {
526 tracing::debug!(
527 ?user_ids,
528 database_id,
529 ?schema_ids,
530 ?actions,
531 ?object_type,
532 ?grantees,
533 with_grant_option,
534 "grant default privileges",
535 );
536 let inner = self.inner.write().await;
537 let txn = inner.db.begin().await?;
538 for user_id in &user_ids {
539 ensure_user_id(*user_id, &txn).await?;
540 }
541 ensure_object_id(ObjectType::Database, database_id, &txn).await?;
542 for schema_id in &schema_ids {
543 ensure_object_id(ObjectType::Schema, *schema_id, &txn).await?;
544 }
545 for grantee in &grantees {
546 ensure_user_id(*grantee, &txn).await?;
547 }
548 if object_type == PbObjectType::Schema {
549 assert!(
550 schema_ids.is_empty(),
551 "schema_ids should be empty when object_type is Schema"
552 );
553
554 let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
557 let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
558 .select_only()
559 .columns([
560 user_default_privilege::Column::Id,
561 user_default_privilege::Column::UserId,
562 user_default_privilege::Column::Grantee,
563 user_default_privilege::Column::Action,
564 ])
565 .filter(
566 user_default_privilege::Column::DatabaseId
567 .eq(database_id)
568 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
569 .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
570 .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
571 .and(user_default_privilege::Column::Action.is_in(actions.clone())),
572 )
573 .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
574 .all(&txn)
575 .await?
576 .into_iter()
577 .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
578 .collect();
579
580 for user_id in user_ids {
581 for grantee in &grantees {
582 for action in &actions {
583 if let Some(existing_id) =
584 existing_default_privileges.get(&(user_id, *grantee, *action))
585 && with_grant_option
586 {
587 UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
589 id: Set(*existing_id),
590 with_grant_option: Set(true),
591 granted_by: Set(grantor as _),
592 ..Default::default()
593 })
594 .exec(&txn)
595 .await?;
596 } else {
597 UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
598 id: Default::default(),
599 database_id: Set(database_id),
600 schema_id: Set(None),
601 object_type: Set(ObjectType::Schema),
602 for_materialized_view: Set(false),
603 user_id: Set(user_id),
604 grantee: Set(*grantee),
605 granted_by: Set(grantor as _),
606 action: Set(*action),
607 with_grant_option: Set(with_grant_option),
608 })
609 .exec(&txn)
610 .await?;
611 }
612 }
613 }
614 }
615 } else {
616 let mut default_privileges = vec![];
617 for user_id in user_ids {
618 for grantee in &grantees {
619 for action in &actions {
620 if schema_ids.is_empty() {
621 default_privileges.push(user_default_privilege::ActiveModel {
622 id: Default::default(),
623 database_id: Set(database_id),
624 schema_id: Set(None),
625 object_type: Set(object_type.into()),
626 for_materialized_view: Set(object_type == PbObjectType::Mview),
627 user_id: Set(user_id),
628 grantee: Set(*grantee),
629 granted_by: Set(grantor as _),
630 action: Set((*action).into()),
631 with_grant_option: Set(with_grant_option),
632 });
633 continue;
634 }
635 for schema_id in &schema_ids {
636 default_privileges.push(user_default_privilege::ActiveModel {
637 id: Default::default(),
638 database_id: Set(database_id),
639 schema_id: Set(Some(*schema_id)),
640 object_type: Set(object_type.into()),
641 for_materialized_view: Set(object_type == PbObjectType::Mview),
642 user_id: Set(user_id),
643 grantee: Set(*grantee),
644 granted_by: Set(grantor as _),
645 action: Set((*action).into()),
646 with_grant_option: Set(with_grant_option),
647 });
648 }
649 }
650 }
651 }
652
653 let mut on_conflict = OnConflict::columns([
654 user_default_privilege::Column::UserId,
655 user_default_privilege::Column::DatabaseId,
656 user_default_privilege::Column::SchemaId,
657 user_default_privilege::Column::ObjectType,
658 user_default_privilege::Column::ForMaterializedView,
659 user_default_privilege::Column::Grantee,
660 user_default_privilege::Column::Action,
661 ]);
662 if with_grant_option {
663 on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
664 } else {
665 on_conflict.update_column(user_default_privilege::Column::UserId);
667 }
668 UserDefaultPrivilege::insert_many(default_privileges)
669 .on_conflict(on_conflict)
670 .do_nothing()
671 .exec(&txn)
672 .await?;
673 }
674
675 txn.commit().await?;
676 Ok(())
677 }
678
679 pub async fn revoke_default_privileges(
680 &self,
681 user_ids: Vec<UserId>,
682 database_id: DatabaseId,
683 schema_ids: Vec<SchemaId>,
684 actions: Vec<PbAction>,
685 object_type: PbObjectType,
686 grantees: Vec<UserId>,
687 revoke_grant_option: bool,
688 ) -> MetaResult<()> {
689 let inner = self.inner.write().await;
690 let txn = inner.db.begin().await?;
691 for user_id in &user_ids {
692 ensure_user_id(*user_id, &txn).await?;
693 }
694
695 let schema_filter = if schema_ids.is_empty() {
696 user_default_privilege::Column::SchemaId.is_null()
697 } else {
698 user_default_privilege::Column::SchemaId.is_in(schema_ids)
699 };
700 let filter = user_default_privilege::Column::DatabaseId
701 .eq(database_id)
702 .and(schema_filter)
703 .and(user_default_privilege::Column::UserId.is_in(user_ids))
704 .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
705 .and(user_default_privilege::Column::Grantee.is_in(grantees))
706 .and(
707 user_default_privilege::Column::Action
708 .is_in(actions.iter().map(|&a| Action::from(a))),
709 );
710
711 if revoke_grant_option {
712 let res = UserDefaultPrivilege::update_many()
714 .col_expr(
715 user_default_privilege::Column::WithGrantOption,
716 SimpleExpr::Value(Value::Bool(Some(false))),
717 )
718 .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
719 .exec(&txn)
720 .await?;
721 tracing::info!(
722 "revoke {count} grant option for default privileges",
723 count = res.rows_affected
724 );
725 } else {
726 let res = UserDefaultPrivilege::delete_many()
727 .filter(filter)
728 .exec(&txn)
729 .await?;
730 tracing::info!(
731 "revoke {count} default privileges",
732 count = res.rows_affected
733 );
734 }
735
736 txn.commit().await?;
737 Ok(())
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
744
745 use super::*;
746 use crate::manager::MetaSrvEnv;
747
748 const TEST_DATABASE_ID: DatabaseId = 1;
749 const TEST_ROOT_USER_ID: UserId = 1;
750
751 fn make_test_user(name: &str) -> PbUserInfo {
752 PbUserInfo {
753 name: name.to_owned(),
754 ..Default::default()
755 }
756 }
757
758 fn make_privilege(
759 object: PbObject,
760 actions: &[PbAction],
761 with_grant_option: bool,
762 ) -> PbGrantPrivilege {
763 PbGrantPrivilege {
764 object: Some(object),
765 action_with_opts: actions
766 .iter()
767 .map(|&action| PbActionWithGrantOption {
768 action: action as _,
769 with_grant_option,
770 ..Default::default()
771 })
772 .collect(),
773 }
774 }
775
776 #[tokio::test]
777 async fn test_user_and_privilege() -> MetaResult<()> {
778 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
779 mgr.create_user(make_test_user("test_user_1")).await?;
780 mgr.create_user(make_test_user("test_user_2")).await?;
781 let user_1 = mgr.get_user_by_name("test_user_1").await?;
782 let user_2 = mgr.get_user_by_name("test_user_2").await?;
783
784 assert!(
785 mgr.create_user(make_test_user("test_user_1"))
786 .await
787 .is_err(),
788 "user_1 already exists"
789 );
790 mgr.update_user(
791 PbUserInfo {
792 id: user_1.user_id as _,
793 name: "test_user_1_new".to_owned(),
794 ..Default::default()
795 },
796 &[PbUpdateField::Rename],
797 )
798 .await?;
799 let user_1 = mgr.get_user(user_1.user_id).await?;
800 assert_eq!(user_1.name, "test_user_1_new".to_owned());
801
802 let conn_with_option = make_privilege(
803 PbObject::DatabaseId(TEST_DATABASE_ID as _),
804 &[PbAction::Connect],
805 true,
806 );
807 let create_without_option = make_privilege(
808 PbObject::DatabaseId(TEST_DATABASE_ID as _),
809 &[PbAction::Create],
810 false,
811 );
812 mgr.grant_privilege(
814 vec![user_1.user_id],
815 &[conn_with_option.clone()],
816 TEST_ROOT_USER_ID,
817 true,
818 )
819 .await?;
820 mgr.grant_privilege(
822 vec![user_1.user_id],
823 &[create_without_option.clone()],
824 TEST_ROOT_USER_ID,
825 false,
826 )
827 .await?;
828 mgr.grant_privilege(
830 vec![user_2.user_id],
831 &[conn_with_option.clone()],
832 user_1.user_id,
833 true,
834 )
835 .await?;
836 mgr.grant_privilege(
837 vec![user_2.user_id],
838 &[create_without_option.clone()],
839 user_1.user_id,
840 false,
841 )
842 .await?;
843
844 assert!(
845 mgr.drop_user(user_1.user_id).await.is_err(),
846 "user_1 can't be dropped"
847 );
848
849 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
850 assert_eq!(privilege_1.len(), 2);
851 assert!(privilege_1.iter().all(|gp| gp.object
852 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
853 && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32));
854
855 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
856 assert_eq!(privilege_2.len(), 1);
857 assert!(privilege_2.iter().all(|gp| gp.object
858 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
859 && gp.action_with_opts[0].granted_by == user_1.user_id as u32
860 && gp.action_with_opts[0].with_grant_option));
861
862 assert!(
864 mgr.revoke_privilege(
865 vec![user_1.user_id],
866 &[conn_with_option.clone()],
867 TEST_ROOT_USER_ID,
868 user_2.user_id,
869 false,
870 false
871 )
872 .await
873 .is_err(),
874 "user_2 can't revoke for user_1"
875 );
876
877 assert!(
879 mgr.revoke_privilege(
880 vec![user_2.user_id],
881 &[create_without_option.clone()],
882 user_1.user_id,
883 user_1.user_id,
884 false,
885 false
886 )
887 .await
888 .is_err(),
889 "user_2 don't have grant option for CREATE"
890 );
891
892 assert!(
894 mgr.revoke_privilege(
895 vec![user_1.user_id],
896 &[conn_with_option.clone()],
897 TEST_ROOT_USER_ID,
898 TEST_ROOT_USER_ID,
899 false,
900 false
901 )
902 .await
903 .is_err(),
904 "permission deny in restrict mode, CONN granted to user_2"
905 );
906
907 mgr.revoke_privilege(
909 vec![user_1.user_id],
910 &[create_without_option.clone()],
911 TEST_ROOT_USER_ID,
912 TEST_ROOT_USER_ID,
913 false,
914 false,
915 )
916 .await?;
917
918 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
919 assert_eq!(privilege_1.len(), 1);
920 assert!(privilege_1.iter().all(|gp| gp.object
921 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
922 && gp.action_with_opts[0].action == PbAction::Connect as i32));
923
924 mgr.revoke_privilege(
926 vec![user_1.user_id],
927 &[conn_with_option.clone()],
928 TEST_ROOT_USER_ID,
929 TEST_ROOT_USER_ID,
930 true,
931 true,
932 )
933 .await?;
934 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
935 assert_eq!(privilege_1.len(), 1);
936 assert!(privilege_1.iter().all(|gp| gp.object
937 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
938 && gp.action_with_opts[0].action == PbAction::Connect as i32
939 && !gp.action_with_opts[0].with_grant_option));
940 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
941 assert_eq!(privilege_2.len(), 1);
942 assert!(privilege_2.iter().all(|gp| gp.object
943 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
944 && gp.action_with_opts[0].action == PbAction::Connect as i32
945 && !gp.action_with_opts[0].with_grant_option));
946
947 mgr.revoke_privilege(
949 vec![user_1.user_id],
950 &[conn_with_option.clone()],
951 TEST_ROOT_USER_ID,
952 TEST_ROOT_USER_ID,
953 false,
954 true,
955 )
956 .await?;
957 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
958 assert!(privilege_1.is_empty());
959 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
960 assert!(privilege_2.is_empty());
961
962 mgr.drop_user(user_1.user_id).await?;
963 mgr.drop_user(user_2.user_id).await?;
964 Ok(())
965 }
966}