1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::prelude::{Object, User, UserPrivilege};
20use risingwave_meta_model::user_privilege::Action;
21use risingwave_meta_model::{AuthInfo, PrivilegeId, UserId, object, user, user_privilege};
22use risingwave_pb::meta::subscribe_response::{
23 Info as NotificationInfo, Operation as NotificationOperation,
24};
25use risingwave_pb::user::update_user_request::PbUpdateField;
26use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
27use sea_orm::ActiveValue::Set;
28use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
29use sea_orm::{
30 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
31 QuerySelect, TransactionTrait,
32};
33
34use crate::controller::catalog::CatalogController;
35use crate::controller::utils::{
36 PartialUserPrivilege, check_user_name_duplicate, ensure_privileges_not_referred,
37 ensure_user_id, extract_grant_obj_id, get_index_state_tables_by_table_id,
38 get_internal_tables_by_id, get_object_owner, get_referring_privileges_cascade,
39 get_user_privilege, list_user_info_by_ids,
40};
41use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
42use crate::{MetaError, MetaResult};
43
44impl CatalogController {
45 pub(crate) async fn notify_users_update(
46 &self,
47 user_infos: Vec<PbUserInfo>,
48 ) -> NotificationVersion {
49 let mut version = 0;
50 for info in user_infos {
51 version = self
52 .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
53 .await;
54 }
55 version
56 }
57
58 pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
59 let inner = self.inner.write().await;
60 let txn = inner.db.begin().await?;
61 check_user_name_duplicate(&pb_user.name, &txn).await?;
62
63 let grant_privileges = pb_user.grant_privileges.clone();
64 let user: user::ActiveModel = pb_user.into();
65 let user = user.insert(&txn).await?;
66
67 if !grant_privileges.is_empty() {
68 let mut privileges = vec![];
69 for gp in &grant_privileges {
70 let id = extract_grant_obj_id(gp.get_object()?);
71 for action_with_opt in &gp.action_with_opts {
72 privileges.push(user_privilege::ActiveModel {
73 user_id: Set(user.user_id),
74 oid: Set(id),
75 granted_by: Set(action_with_opt.granted_by as _),
76 action: Set(action_with_opt.get_action()?.into()),
77 with_grant_option: Set(action_with_opt.with_grant_option),
78 ..Default::default()
79 });
80 }
81 }
82 UserPrivilege::insert_many(privileges).exec(&txn).await?;
83 }
84 txn.commit().await?;
85
86 let mut user_info: PbUserInfo = user.into();
87 user_info.grant_privileges = grant_privileges;
88 let version = self
89 .notify_frontend(
90 NotificationOperation::Add,
91 NotificationInfo::User(user_info),
92 )
93 .await;
94
95 Ok(version)
96 }
97
98 pub async fn update_user(
99 &self,
100 update_user: PbUserInfo,
101 update_fields: &[PbUpdateField],
102 ) -> MetaResult<NotificationVersion> {
103 let inner = self.inner.write().await;
104 let rename_flag = update_fields.contains(&PbUpdateField::Rename);
105 if rename_flag {
106 check_user_name_duplicate(&update_user.name, &inner.db).await?;
107 }
108
109 let user = User::find_by_id(update_user.id as UserId)
110 .one(&inner.db)
111 .await?
112 .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
113 let mut user = user.into_active_model();
114 update_fields.iter().for_each(|&field| match field {
115 PbUpdateField::Unspecified => unreachable!(),
116 PbUpdateField::Super => user.is_super = Set(update_user.is_super),
117 PbUpdateField::Login => user.can_login = Set(update_user.can_login),
118 PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
119 PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
120 PbUpdateField::AuthInfo => {
121 user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
122 }
123 PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
124 });
125
126 let user = user.update(&inner.db).await?;
127 let mut user_info: PbUserInfo = user.into();
128 user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
129 let version = self
130 .notify_frontend(
131 NotificationOperation::Update,
132 NotificationInfo::User(user_info),
133 )
134 .await;
135
136 Ok(version)
137 }
138
139 #[cfg(test)]
140 pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
141 let inner = self.inner.read().await;
142 let user = User::find_by_id(id)
143 .one(&inner.db)
144 .await?
145 .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
146 Ok(user)
147 }
148
149 #[cfg(test)]
150 pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
151 let inner = self.inner.read().await;
152 let user = User::find()
153 .filter(user::Column::Name.eq(name))
154 .one(&inner.db)
155 .await?
156 .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
157 Ok(user)
158 }
159
160 pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
161 let inner = self.inner.write().await;
162 let txn = inner.db.begin().await?;
163 let user = User::find_by_id(user_id)
164 .one(&txn)
165 .await?
166 .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
167 if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
168 return Err(MetaError::permission_denied(format!(
169 "drop default super user {} is not allowed",
170 user.name
171 )));
172 }
173
174 let count = Object::find()
176 .filter(object::Column::OwnerId.eq(user_id))
177 .count(&txn)
178 .await?;
179 if count != 0 {
180 return Err(MetaError::permission_denied(format!(
181 "drop user {} is not allowed, because it owns {} objects",
182 user.name, count
183 )));
184 }
185
186 let count = UserPrivilege::find()
188 .filter(user_privilege::Column::GrantedBy.eq(user_id))
189 .count(&txn)
190 .await?;
191 if count != 0 {
192 return Err(MetaError::permission_denied(format!(
193 "drop user {} is not allowed, because it granted {} privileges to others",
194 user.name, count
195 )));
196 }
197
198 let res = User::delete_by_id(user_id).exec(&txn).await?;
199 if res.rows_affected != 1 {
200 return Err(MetaError::catalog_id_not_found("user", user_id));
201 }
202 txn.commit().await?;
203
204 let version = self
205 .notify_frontend(
206 NotificationOperation::Delete,
207 NotificationInfo::User(PbUserInfo {
208 id: user_id as _,
209 ..Default::default()
210 }),
211 )
212 .await;
213
214 Ok(version)
215 }
216
217 pub async fn grant_privilege(
218 &self,
219 user_ids: Vec<UserId>,
220 new_grant_privileges: &[PbGrantPrivilege],
221 grantor: UserId,
222 with_grant_option: bool,
223 ) -> MetaResult<NotificationVersion> {
224 let inner = self.inner.write().await;
225 let txn = inner.db.begin().await?;
226 for user_id in &user_ids {
227 ensure_user_id(*user_id, &txn).await?;
228 }
229
230 let mut privileges = vec![];
231 for gp in new_grant_privileges {
232 let id = extract_grant_obj_id(gp.get_object()?);
233 let internal_table_ids = get_internal_tables_by_id(id, &txn).await?;
234 let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?;
235 for action_with_opt in &gp.action_with_opts {
236 let action = action_with_opt.get_action()?.into();
237 privileges.push(user_privilege::ActiveModel {
238 oid: Set(id),
239 granted_by: Set(grantor),
240 action: Set(action),
241 with_grant_option: Set(with_grant_option),
242 ..Default::default()
243 });
244 if action == Action::Select {
245 privileges.extend(
246 internal_table_ids
247 .iter()
248 .chain(index_state_table_ids.iter())
249 .map(|&tid| user_privilege::ActiveModel {
250 oid: Set(tid),
251 granted_by: Set(grantor),
252 action: Set(Action::Select),
253 with_grant_option: Set(with_grant_option),
254 ..Default::default()
255 }),
256 );
257 }
258 }
259 }
260
261 let user = User::find_by_id(grantor)
263 .one(&txn)
264 .await?
265 .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
266 let mut filtered_privileges = vec![];
267 if !user.is_super {
268 for mut privilege in privileges {
269 if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
270 filtered_privileges.push(privilege);
271 continue;
272 }
273 let filter = user_privilege::Column::UserId
274 .eq(grantor)
275 .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
276 .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
277 .and(user_privilege::Column::WithGrantOption.eq(true));
278 let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
279 .select_only()
280 .column(user_privilege::Column::Id)
281 .filter(filter)
282 .into_tuple()
283 .one(&txn)
284 .await?;
285 let Some(privilege_id) = privilege_id else {
286 tracing::warn!(
287 "user {} don't have privilege {:?} or grant option",
288 grantor,
289 privilege.action
290 );
291 continue;
292 };
293 privilege.dependent_id = Set(Some(privilege_id));
294 filtered_privileges.push(privilege);
295 }
296 } else {
297 filtered_privileges = privileges;
298 }
299
300 let user_privileges = user_ids
302 .iter()
303 .flat_map(|user_id| {
304 filtered_privileges.iter().map(|p| {
305 let mut p = p.clone();
306 p.user_id = Set(*user_id);
307 p
308 })
309 })
310 .collect_vec();
311 for privilege in user_privileges {
312 let mut on_conflict = OnConflict::columns([
313 user_privilege::Column::UserId,
314 user_privilege::Column::Oid,
315 user_privilege::Column::Action,
316 user_privilege::Column::GrantedBy,
317 ]);
318 if *privilege.with_grant_option.as_ref() {
319 on_conflict.update_columns([user_privilege::Column::WithGrantOption]);
320 } else {
321 on_conflict.update_column(user_privilege::Column::UserId);
323 }
324
325 UserPrivilege::insert(privilege)
326 .on_conflict(on_conflict)
327 .do_nothing()
328 .exec(&txn)
329 .await?;
330 }
331
332 let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
333
334 txn.commit().await?;
335
336 let version = self.notify_users_update(user_infos).await;
337 Ok(version)
338 }
339
340 pub async fn revoke_privilege(
341 &self,
342 user_ids: Vec<UserId>,
343 revoke_grant_privileges: &[PbGrantPrivilege],
344 granted_by: UserId,
345 revoke_by: UserId,
346 revoke_grant_option: bool,
347 cascade: bool,
348 ) -> MetaResult<NotificationVersion> {
349 let inner = self.inner.write().await;
350 let txn = inner.db.begin().await?;
351 for user_id in &user_ids {
352 ensure_user_id(*user_id, &txn).await?;
353 }
354 let revoke_user = User::find_by_id(revoke_by)
356 .one(&txn)
357 .await?
358 .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
359
360 if !revoke_user.is_super && granted_by != revoke_by {
362 let granted_user_name: String = User::find_by_id(granted_by)
363 .select_only()
364 .column(user::Column::Name)
365 .into_tuple()
366 .one(&txn)
367 .await?
368 .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
369 return Err(MetaError::permission_denied(format!(
370 "user {} is not super, can't revoke privileges for {}",
371 revoke_user.name, granted_user_name
372 )));
373 }
374
375 let mut revoke_items = HashMap::new();
376 for privilege in revoke_grant_privileges {
377 let obj = extract_grant_obj_id(privilege.get_object()?);
378 let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?;
379 let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?;
380 let mut include_select = false;
381 let actions = privilege
382 .action_with_opts
383 .iter()
384 .map(|ao| {
385 let action = Action::from(ao.get_action().unwrap());
386 if action == Action::Select {
387 include_select = true;
388 }
389 action
390 })
391 .collect_vec();
392 revoke_items.insert(obj, actions);
393 if include_select {
394 revoke_items.extend(
395 internal_table_ids
396 .iter()
397 .chain(index_state_table_ids.iter())
398 .map(|&tid| (tid, vec![Action::Select])),
399 );
400 }
401 }
402
403 let filter = if !revoke_user.is_super {
404 for (obj, actions) in &revoke_items {
406 if revoke_by == get_object_owner(*obj, &txn).await? {
407 continue;
408 }
409 let owned_actions: HashSet<Action> = UserPrivilege::find()
410 .select_only()
411 .column(user_privilege::Column::Action)
412 .filter(
413 user_privilege::Column::UserId
414 .eq(granted_by)
415 .and(user_privilege::Column::Oid.eq(*obj))
416 .and(user_privilege::Column::WithGrantOption.eq(true)),
417 )
418 .into_tuple::<Action>()
419 .all(&txn)
420 .await?
421 .into_iter()
422 .collect();
423 if actions.iter().any(|ac| !owned_actions.contains(ac)) {
424 return Err(MetaError::permission_denied(format!(
425 "user {} don't have privileges {:?} or grant option",
426 revoke_user.name, actions,
427 )));
428 }
429 }
430
431 user_privilege::Column::GrantedBy
432 .eq(granted_by)
433 .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
434 } else {
435 user_privilege::Column::UserId.is_in(user_ids.clone())
436 };
437 let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
438 for (obj, actions) in revoke_items {
439 let filter = filter
440 .clone()
441 .and(user_privilege::Column::Oid.eq(obj))
442 .and(user_privilege::Column::Action.is_in(actions));
443 root_user_privileges.extend(
444 UserPrivilege::find()
445 .select_only()
446 .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
447 .filter(filter)
448 .into_partial_model()
449 .all(&txn)
450 .await?,
451 );
452 }
453 if root_user_privileges.is_empty() {
454 tracing::warn!("no privilege to revoke, ignore it");
455 return Ok(IGNORED_NOTIFICATION_VERSION);
456 }
457
458 let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
460 let (all_privilege_ids, to_update_user_ids) = if !cascade {
461 ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
462 (
463 root_privilege_ids.clone(),
464 root_user_privileges
465 .iter()
466 .map(|ur| ur.user_id)
467 .collect_vec(),
468 )
469 } else {
470 let all_user_privileges =
471 get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
472 (
473 all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
474 all_user_privileges
475 .iter()
476 .map(|ur| ur.user_id)
477 .collect_vec(),
478 )
479 };
480
481 if revoke_grant_option {
482 UserPrivilege::update_many()
483 .col_expr(
484 user_privilege::Column::WithGrantOption,
485 SimpleExpr::Value(Value::Bool(Some(false))),
486 )
487 .filter(
488 user_privilege::Column::Id
489 .is_in(all_privilege_ids)
490 .and(user_privilege::Column::WithGrantOption.eq(true)),
491 )
492 .exec(&txn)
493 .await?;
494 } else {
495 UserPrivilege::delete_many()
497 .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
498 .exec(&txn)
499 .await?;
500 }
501
502 let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
503
504 txn.commit().await?;
505
506 let version = self.notify_users_update(user_infos).await;
507 Ok(version)
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use risingwave_meta_model::DatabaseId;
514 use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject};
515
516 use super::*;
517 use crate::manager::MetaSrvEnv;
518
519 const TEST_DATABASE_ID: DatabaseId = 1;
520 const TEST_ROOT_USER_ID: UserId = 1;
521
522 fn make_test_user(name: &str) -> PbUserInfo {
523 PbUserInfo {
524 name: name.to_owned(),
525 ..Default::default()
526 }
527 }
528
529 fn make_privilege(
530 object: PbObject,
531 actions: &[PbAction],
532 with_grant_option: bool,
533 ) -> PbGrantPrivilege {
534 PbGrantPrivilege {
535 object: Some(object),
536 action_with_opts: actions
537 .iter()
538 .map(|&action| PbActionWithGrantOption {
539 action: action as _,
540 with_grant_option,
541 ..Default::default()
542 })
543 .collect(),
544 }
545 }
546
547 #[tokio::test]
548 async fn test_user_and_privilege() -> MetaResult<()> {
549 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
550 mgr.create_user(make_test_user("test_user_1")).await?;
551 mgr.create_user(make_test_user("test_user_2")).await?;
552 let user_1 = mgr.get_user_by_name("test_user_1").await?;
553 let user_2 = mgr.get_user_by_name("test_user_2").await?;
554
555 assert!(
556 mgr.create_user(make_test_user("test_user_1"))
557 .await
558 .is_err(),
559 "user_1 already exists"
560 );
561 mgr.update_user(
562 PbUserInfo {
563 id: user_1.user_id as _,
564 name: "test_user_1_new".to_owned(),
565 ..Default::default()
566 },
567 &[PbUpdateField::Rename],
568 )
569 .await?;
570 let user_1 = mgr.get_user(user_1.user_id).await?;
571 assert_eq!(user_1.name, "test_user_1_new".to_owned());
572
573 let conn_with_option = make_privilege(
574 PbObject::DatabaseId(TEST_DATABASE_ID as _),
575 &[PbAction::Connect],
576 true,
577 );
578 let create_without_option = make_privilege(
579 PbObject::DatabaseId(TEST_DATABASE_ID as _),
580 &[PbAction::Create],
581 false,
582 );
583 mgr.grant_privilege(
585 vec![user_1.user_id],
586 &[conn_with_option.clone()],
587 TEST_ROOT_USER_ID,
588 true,
589 )
590 .await?;
591 mgr.grant_privilege(
593 vec![user_1.user_id],
594 &[create_without_option.clone()],
595 TEST_ROOT_USER_ID,
596 false,
597 )
598 .await?;
599 mgr.grant_privilege(
601 vec![user_2.user_id],
602 &[conn_with_option.clone()],
603 user_1.user_id,
604 true,
605 )
606 .await?;
607 mgr.grant_privilege(
608 vec![user_2.user_id],
609 &[create_without_option.clone()],
610 user_1.user_id,
611 false,
612 )
613 .await?;
614
615 assert!(
616 mgr.drop_user(user_1.user_id).await.is_err(),
617 "user_1 can't be dropped"
618 );
619
620 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
621 assert_eq!(privilege_1.len(), 2);
622 assert!(privilege_1.iter().all(|gp| gp.object
623 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
624 && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32));
625
626 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
627 assert_eq!(privilege_2.len(), 1);
628 assert!(privilege_2.iter().all(|gp| gp.object
629 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
630 && gp.action_with_opts[0].granted_by == user_1.user_id as u32
631 && gp.action_with_opts[0].with_grant_option));
632
633 assert!(
635 mgr.revoke_privilege(
636 vec![user_1.user_id],
637 &[conn_with_option.clone()],
638 TEST_ROOT_USER_ID,
639 user_2.user_id,
640 false,
641 false
642 )
643 .await
644 .is_err(),
645 "user_2 can't revoke for user_1"
646 );
647
648 assert!(
650 mgr.revoke_privilege(
651 vec![user_2.user_id],
652 &[create_without_option.clone()],
653 user_1.user_id,
654 user_1.user_id,
655 false,
656 false
657 )
658 .await
659 .is_err(),
660 "user_2 don't have grant option for CREATE"
661 );
662
663 assert!(
665 mgr.revoke_privilege(
666 vec![user_1.user_id],
667 &[conn_with_option.clone()],
668 TEST_ROOT_USER_ID,
669 TEST_ROOT_USER_ID,
670 false,
671 false
672 )
673 .await
674 .is_err(),
675 "permission deny in restrict mode, CONN granted to user_2"
676 );
677
678 mgr.revoke_privilege(
680 vec![user_1.user_id],
681 &[create_without_option.clone()],
682 TEST_ROOT_USER_ID,
683 TEST_ROOT_USER_ID,
684 false,
685 false,
686 )
687 .await?;
688
689 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
690 assert_eq!(privilege_1.len(), 1);
691 assert!(privilege_1.iter().all(|gp| gp.object
692 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
693 && gp.action_with_opts[0].action == PbAction::Connect as i32));
694
695 mgr.revoke_privilege(
697 vec![user_1.user_id],
698 &[conn_with_option.clone()],
699 TEST_ROOT_USER_ID,
700 TEST_ROOT_USER_ID,
701 true,
702 true,
703 )
704 .await?;
705 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
706 assert_eq!(privilege_1.len(), 1);
707 assert!(privilege_1.iter().all(|gp| gp.object
708 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
709 && gp.action_with_opts[0].action == PbAction::Connect as i32
710 && !gp.action_with_opts[0].with_grant_option));
711 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
712 assert_eq!(privilege_2.len(), 1);
713 assert!(privilege_2.iter().all(|gp| gp.object
714 == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
715 && gp.action_with_opts[0].action == PbAction::Connect as i32
716 && !gp.action_with_opts[0].with_grant_option));
717
718 mgr.revoke_privilege(
720 vec![user_1.user_id],
721 &[conn_with_option.clone()],
722 TEST_ROOT_USER_ID,
723 TEST_ROOT_USER_ID,
724 false,
725 true,
726 )
727 .await?;
728 let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
729 assert!(privilege_1.is_empty());
730 let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
731 assert!(privilege_2.is_empty());
732
733 mgr.drop_user(user_1.user_id).await?;
734 mgr.drop_user(user_2.user_id).await?;
735 Ok(())
736 }
737}