risingwave_meta/controller/
user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::prelude::{Object, User, UserPrivilege};
20use risingwave_meta_model::user_privilege::Action;
21use risingwave_meta_model::{AuthInfo, PrivilegeId, UserId, object, user, user_privilege};
22use risingwave_pb::meta::subscribe_response::{
23    Info as NotificationInfo, Operation as NotificationOperation,
24};
25use risingwave_pb::user::update_user_request::PbUpdateField;
26use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
27use sea_orm::ActiveValue::Set;
28use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
29use sea_orm::{
30    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
31    QuerySelect, TransactionTrait,
32};
33
34use crate::controller::catalog::CatalogController;
35use crate::controller::utils::{
36    PartialUserPrivilege, check_user_name_duplicate, ensure_privileges_not_referred,
37    ensure_user_id, extract_grant_obj_id, get_index_state_tables_by_table_id,
38    get_internal_tables_by_id, get_object_owner, get_referring_privileges_cascade,
39    get_user_privilege, list_user_info_by_ids,
40};
41use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
42use crate::{MetaError, MetaResult};
43
44impl CatalogController {
45    pub(crate) async fn notify_users_update(
46        &self,
47        user_infos: Vec<PbUserInfo>,
48    ) -> NotificationVersion {
49        let mut version = 0;
50        for info in user_infos {
51            version = self
52                .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
53                .await;
54        }
55        version
56    }
57
58    pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
59        let inner = self.inner.write().await;
60        let txn = inner.db.begin().await?;
61        check_user_name_duplicate(&pb_user.name, &txn).await?;
62
63        let grant_privileges = pb_user.grant_privileges.clone();
64        let user: user::ActiveModel = pb_user.into();
65        let user = user.insert(&txn).await?;
66
67        if !grant_privileges.is_empty() {
68            let mut privileges = vec![];
69            for gp in &grant_privileges {
70                let id = extract_grant_obj_id(gp.get_object()?);
71                for action_with_opt in &gp.action_with_opts {
72                    privileges.push(user_privilege::ActiveModel {
73                        user_id: Set(user.user_id),
74                        oid: Set(id),
75                        granted_by: Set(action_with_opt.granted_by as _),
76                        action: Set(action_with_opt.get_action()?.into()),
77                        with_grant_option: Set(action_with_opt.with_grant_option),
78                        ..Default::default()
79                    });
80                }
81            }
82            UserPrivilege::insert_many(privileges).exec(&txn).await?;
83        }
84        txn.commit().await?;
85
86        let mut user_info: PbUserInfo = user.into();
87        user_info.grant_privileges = grant_privileges;
88        let version = self
89            .notify_frontend(
90                NotificationOperation::Add,
91                NotificationInfo::User(user_info),
92            )
93            .await;
94
95        Ok(version)
96    }
97
98    pub async fn update_user(
99        &self,
100        update_user: PbUserInfo,
101        update_fields: &[PbUpdateField],
102    ) -> MetaResult<NotificationVersion> {
103        let inner = self.inner.write().await;
104        let rename_flag = update_fields.contains(&PbUpdateField::Rename);
105        if rename_flag {
106            check_user_name_duplicate(&update_user.name, &inner.db).await?;
107        }
108
109        let user = User::find_by_id(update_user.id as UserId)
110            .one(&inner.db)
111            .await?
112            .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
113        let mut user = user.into_active_model();
114        update_fields.iter().for_each(|&field| match field {
115            PbUpdateField::Unspecified => unreachable!(),
116            PbUpdateField::Super => user.is_super = Set(update_user.is_super),
117            PbUpdateField::Login => user.can_login = Set(update_user.can_login),
118            PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
119            PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
120            PbUpdateField::AuthInfo => {
121                user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
122            }
123            PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
124        });
125
126        let user = user.update(&inner.db).await?;
127        let mut user_info: PbUserInfo = user.into();
128        user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
129        let version = self
130            .notify_frontend(
131                NotificationOperation::Update,
132                NotificationInfo::User(user_info),
133            )
134            .await;
135
136        Ok(version)
137    }
138
139    #[cfg(test)]
140    pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
141        let inner = self.inner.read().await;
142        let user = User::find_by_id(id)
143            .one(&inner.db)
144            .await?
145            .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
146        Ok(user)
147    }
148
149    #[cfg(test)]
150    pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
151        let inner = self.inner.read().await;
152        let user = User::find()
153            .filter(user::Column::Name.eq(name))
154            .one(&inner.db)
155            .await?
156            .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
157        Ok(user)
158    }
159
160    pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
161        let inner = self.inner.write().await;
162        let txn = inner.db.begin().await?;
163        let user = User::find_by_id(user_id)
164            .one(&txn)
165            .await?
166            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
167        if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
168            return Err(MetaError::permission_denied(format!(
169                "drop default super user {} is not allowed",
170                user.name
171            )));
172        }
173
174        // check if the user is the owner of any objects.
175        let count = Object::find()
176            .filter(object::Column::OwnerId.eq(user_id))
177            .count(&txn)
178            .await?;
179        if count != 0 {
180            return Err(MetaError::permission_denied(format!(
181                "drop user {} is not allowed, because it owns {} objects",
182                user.name, count
183            )));
184        }
185
186        // check if the user granted any privileges to other users.
187        let count = UserPrivilege::find()
188            .filter(user_privilege::Column::GrantedBy.eq(user_id))
189            .count(&txn)
190            .await?;
191        if count != 0 {
192            return Err(MetaError::permission_denied(format!(
193                "drop user {} is not allowed, because it granted {} privileges to others",
194                user.name, count
195            )));
196        }
197
198        let res = User::delete_by_id(user_id).exec(&txn).await?;
199        if res.rows_affected != 1 {
200            return Err(MetaError::catalog_id_not_found("user", user_id));
201        }
202        txn.commit().await?;
203
204        let version = self
205            .notify_frontend(
206                NotificationOperation::Delete,
207                NotificationInfo::User(PbUserInfo {
208                    id: user_id as _,
209                    ..Default::default()
210                }),
211            )
212            .await;
213
214        Ok(version)
215    }
216
217    pub async fn grant_privilege(
218        &self,
219        user_ids: Vec<UserId>,
220        new_grant_privileges: &[PbGrantPrivilege],
221        grantor: UserId,
222        with_grant_option: bool,
223    ) -> MetaResult<NotificationVersion> {
224        let inner = self.inner.write().await;
225        let txn = inner.db.begin().await?;
226        for user_id in &user_ids {
227            ensure_user_id(*user_id, &txn).await?;
228        }
229
230        let mut privileges = vec![];
231        for gp in new_grant_privileges {
232            let id = extract_grant_obj_id(gp.get_object()?);
233            let internal_table_ids = get_internal_tables_by_id(id, &txn).await?;
234            let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?;
235            for action_with_opt in &gp.action_with_opts {
236                let action = action_with_opt.get_action()?.into();
237                privileges.push(user_privilege::ActiveModel {
238                    oid: Set(id),
239                    granted_by: Set(grantor),
240                    action: Set(action),
241                    with_grant_option: Set(with_grant_option),
242                    ..Default::default()
243                });
244                if action == Action::Select {
245                    privileges.extend(
246                        internal_table_ids
247                            .iter()
248                            .chain(index_state_table_ids.iter())
249                            .map(|&tid| user_privilege::ActiveModel {
250                                oid: Set(tid),
251                                granted_by: Set(grantor),
252                                action: Set(Action::Select),
253                                with_grant_option: Set(with_grant_option),
254                                ..Default::default()
255                            }),
256                    );
257                }
258            }
259        }
260
261        // check whether grantor has the privilege to grant the privilege.
262        let user = User::find_by_id(grantor)
263            .one(&txn)
264            .await?
265            .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
266        let mut filtered_privileges = vec![];
267        if !user.is_super {
268            for mut privilege in privileges {
269                if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
270                    filtered_privileges.push(privilege);
271                    continue;
272                }
273                let filter = user_privilege::Column::UserId
274                    .eq(grantor)
275                    .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
276                    .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
277                    .and(user_privilege::Column::WithGrantOption.eq(true));
278                let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
279                    .select_only()
280                    .column(user_privilege::Column::Id)
281                    .filter(filter)
282                    .into_tuple()
283                    .one(&txn)
284                    .await?;
285                let Some(privilege_id) = privilege_id else {
286                    tracing::warn!(
287                        "user {} don't have privilege {:?} or grant option",
288                        grantor,
289                        privilege.action
290                    );
291                    continue;
292                };
293                privilege.dependent_id = Set(Some(privilege_id));
294                filtered_privileges.push(privilege);
295            }
296        } else {
297            filtered_privileges = privileges;
298        }
299
300        // insert privileges
301        let user_privileges = user_ids
302            .iter()
303            .flat_map(|user_id| {
304                filtered_privileges.iter().map(|p| {
305                    let mut p = p.clone();
306                    p.user_id = Set(*user_id);
307                    p
308                })
309            })
310            .collect_vec();
311        for privilege in user_privileges {
312            let mut on_conflict = OnConflict::columns([
313                user_privilege::Column::UserId,
314                user_privilege::Column::Oid,
315                user_privilege::Column::Action,
316                user_privilege::Column::GrantedBy,
317            ]);
318            if *privilege.with_grant_option.as_ref() {
319                on_conflict.update_columns([user_privilege::Column::WithGrantOption]);
320            } else {
321                // Workaround to support MYSQL for `DO NOTHING`.
322                on_conflict.update_column(user_privilege::Column::UserId);
323            }
324
325            UserPrivilege::insert(privilege)
326                .on_conflict(on_conflict)
327                .do_nothing()
328                .exec(&txn)
329                .await?;
330        }
331
332        let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
333
334        txn.commit().await?;
335
336        let version = self.notify_users_update(user_infos).await;
337        Ok(version)
338    }
339
340    pub async fn revoke_privilege(
341        &self,
342        user_ids: Vec<UserId>,
343        revoke_grant_privileges: &[PbGrantPrivilege],
344        granted_by: UserId,
345        revoke_by: UserId,
346        revoke_grant_option: bool,
347        cascade: bool,
348    ) -> MetaResult<NotificationVersion> {
349        let inner = self.inner.write().await;
350        let txn = inner.db.begin().await?;
351        for user_id in &user_ids {
352            ensure_user_id(*user_id, &txn).await?;
353        }
354        // check whether revoke has the privilege to grant the privilege.
355        let revoke_user = User::find_by_id(revoke_by)
356            .one(&txn)
357            .await?
358            .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
359
360        // check whether user can revoke the privilege.
361        if !revoke_user.is_super && granted_by != revoke_by {
362            let granted_user_name: String = User::find_by_id(granted_by)
363                .select_only()
364                .column(user::Column::Name)
365                .into_tuple()
366                .one(&txn)
367                .await?
368                .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
369            return Err(MetaError::permission_denied(format!(
370                "user {} is not super, can't revoke privileges for {}",
371                revoke_user.name, granted_user_name
372            )));
373        }
374
375        let mut revoke_items = HashMap::new();
376        for privilege in revoke_grant_privileges {
377            let obj = extract_grant_obj_id(privilege.get_object()?);
378            let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?;
379            let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?;
380            let mut include_select = false;
381            let actions = privilege
382                .action_with_opts
383                .iter()
384                .map(|ao| {
385                    let action = Action::from(ao.get_action().unwrap());
386                    if action == Action::Select {
387                        include_select = true;
388                    }
389                    action
390                })
391                .collect_vec();
392            revoke_items.insert(obj, actions);
393            if include_select {
394                revoke_items.extend(
395                    internal_table_ids
396                        .iter()
397                        .chain(index_state_table_ids.iter())
398                        .map(|&tid| (tid, vec![Action::Select])),
399                );
400            }
401        }
402
403        let filter = if !revoke_user.is_super {
404            // ensure user have grant options or is owner of the object.
405            for (obj, actions) in &revoke_items {
406                if revoke_by == get_object_owner(*obj, &txn).await? {
407                    continue;
408                }
409                let owned_actions: HashSet<Action> = UserPrivilege::find()
410                    .select_only()
411                    .column(user_privilege::Column::Action)
412                    .filter(
413                        user_privilege::Column::UserId
414                            .eq(granted_by)
415                            .and(user_privilege::Column::Oid.eq(*obj))
416                            .and(user_privilege::Column::WithGrantOption.eq(true)),
417                    )
418                    .into_tuple::<Action>()
419                    .all(&txn)
420                    .await?
421                    .into_iter()
422                    .collect();
423                if actions.iter().any(|ac| !owned_actions.contains(ac)) {
424                    return Err(MetaError::permission_denied(format!(
425                        "user {} don't have privileges {:?} or grant option",
426                        revoke_user.name, actions,
427                    )));
428                }
429            }
430
431            user_privilege::Column::GrantedBy
432                .eq(granted_by)
433                .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
434        } else {
435            user_privilege::Column::UserId.is_in(user_ids.clone())
436        };
437        let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
438        for (obj, actions) in revoke_items {
439            let filter = filter
440                .clone()
441                .and(user_privilege::Column::Oid.eq(obj))
442                .and(user_privilege::Column::Action.is_in(actions));
443            root_user_privileges.extend(
444                UserPrivilege::find()
445                    .select_only()
446                    .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
447                    .filter(filter)
448                    .into_partial_model()
449                    .all(&txn)
450                    .await?,
451            );
452        }
453        if root_user_privileges.is_empty() {
454            tracing::warn!("no privilege to revoke, ignore it");
455            return Ok(IGNORED_NOTIFICATION_VERSION);
456        }
457
458        // check if the user granted any privileges to other users.
459        let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
460        let (all_privilege_ids, to_update_user_ids) = if !cascade {
461            ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
462            (
463                root_privilege_ids.clone(),
464                root_user_privileges
465                    .iter()
466                    .map(|ur| ur.user_id)
467                    .collect_vec(),
468            )
469        } else {
470            let all_user_privileges =
471                get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
472            (
473                all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
474                all_user_privileges
475                    .iter()
476                    .map(|ur| ur.user_id)
477                    .collect_vec(),
478            )
479        };
480
481        if revoke_grant_option {
482            UserPrivilege::update_many()
483                .col_expr(
484                    user_privilege::Column::WithGrantOption,
485                    SimpleExpr::Value(Value::Bool(Some(false))),
486                )
487                .filter(
488                    user_privilege::Column::Id
489                        .is_in(all_privilege_ids)
490                        .and(user_privilege::Column::WithGrantOption.eq(true)),
491                )
492                .exec(&txn)
493                .await?;
494        } else {
495            // The dependent privileges will be deleted cascade.
496            UserPrivilege::delete_many()
497                .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
498                .exec(&txn)
499                .await?;
500        }
501
502        let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
503
504        txn.commit().await?;
505
506        let version = self.notify_users_update(user_infos).await;
507        Ok(version)
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use risingwave_meta_model::DatabaseId;
514    use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject};
515
516    use super::*;
517    use crate::manager::MetaSrvEnv;
518
519    const TEST_DATABASE_ID: DatabaseId = 1;
520    const TEST_ROOT_USER_ID: UserId = 1;
521
522    fn make_test_user(name: &str) -> PbUserInfo {
523        PbUserInfo {
524            name: name.to_owned(),
525            ..Default::default()
526        }
527    }
528
529    fn make_privilege(
530        object: PbObject,
531        actions: &[PbAction],
532        with_grant_option: bool,
533    ) -> PbGrantPrivilege {
534        PbGrantPrivilege {
535            object: Some(object),
536            action_with_opts: actions
537                .iter()
538                .map(|&action| PbActionWithGrantOption {
539                    action: action as _,
540                    with_grant_option,
541                    ..Default::default()
542                })
543                .collect(),
544        }
545    }
546
547    #[tokio::test]
548    async fn test_user_and_privilege() -> MetaResult<()> {
549        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
550        mgr.create_user(make_test_user("test_user_1")).await?;
551        mgr.create_user(make_test_user("test_user_2")).await?;
552        let user_1 = mgr.get_user_by_name("test_user_1").await?;
553        let user_2 = mgr.get_user_by_name("test_user_2").await?;
554
555        assert!(
556            mgr.create_user(make_test_user("test_user_1"))
557                .await
558                .is_err(),
559            "user_1 already exists"
560        );
561        mgr.update_user(
562            PbUserInfo {
563                id: user_1.user_id as _,
564                name: "test_user_1_new".to_owned(),
565                ..Default::default()
566            },
567            &[PbUpdateField::Rename],
568        )
569        .await?;
570        let user_1 = mgr.get_user(user_1.user_id).await?;
571        assert_eq!(user_1.name, "test_user_1_new".to_owned());
572
573        let conn_with_option = make_privilege(
574            PbObject::DatabaseId(TEST_DATABASE_ID as _),
575            &[PbAction::Connect],
576            true,
577        );
578        let create_without_option = make_privilege(
579            PbObject::DatabaseId(TEST_DATABASE_ID as _),
580            &[PbAction::Create],
581            false,
582        );
583        // ROOT grant CONN with grant option to user_1.
584        mgr.grant_privilege(
585            vec![user_1.user_id],
586            &[conn_with_option.clone()],
587            TEST_ROOT_USER_ID,
588            true,
589        )
590        .await?;
591        // ROOT grant CREATE without grant option to user_1.
592        mgr.grant_privilege(
593            vec![user_1.user_id],
594            &[create_without_option.clone()],
595            TEST_ROOT_USER_ID,
596            false,
597        )
598        .await?;
599        // user_1 grant CONN with grant option to user_2.
600        mgr.grant_privilege(
601            vec![user_2.user_id],
602            &[conn_with_option.clone()],
603            user_1.user_id,
604            true,
605        )
606        .await?;
607        mgr.grant_privilege(
608            vec![user_2.user_id],
609            &[create_without_option.clone()],
610            user_1.user_id,
611            false,
612        )
613        .await?;
614
615        assert!(
616            mgr.drop_user(user_1.user_id).await.is_err(),
617            "user_1 can't be dropped"
618        );
619
620        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
621        assert_eq!(privilege_1.len(), 2);
622        assert!(privilege_1.iter().all(|gp| gp.object
623            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
624            && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32));
625
626        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
627        assert_eq!(privilege_2.len(), 1);
628        assert!(privilege_2.iter().all(|gp| gp.object
629            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
630            && gp.action_with_opts[0].granted_by == user_1.user_id as u32
631            && gp.action_with_opts[0].with_grant_option));
632
633        // revoke privilege for others by non-super user.
634        assert!(
635            mgr.revoke_privilege(
636                vec![user_1.user_id],
637                &[conn_with_option.clone()],
638                TEST_ROOT_USER_ID,
639                user_2.user_id,
640                false,
641                false
642            )
643            .await
644            .is_err(),
645            "user_2 can't revoke for user_1"
646        );
647
648        // revoke privilege without grant option.
649        assert!(
650            mgr.revoke_privilege(
651                vec![user_2.user_id],
652                &[create_without_option.clone()],
653                user_1.user_id,
654                user_1.user_id,
655                false,
656                false
657            )
658            .await
659            .is_err(),
660            "user_2 don't have grant option for CREATE"
661        );
662
663        // revoke referred privilege in restrict mode.
664        assert!(
665            mgr.revoke_privilege(
666                vec![user_1.user_id],
667                &[conn_with_option.clone()],
668                TEST_ROOT_USER_ID,
669                TEST_ROOT_USER_ID,
670                false,
671                false
672            )
673            .await
674            .is_err(),
675            "permission deny in restrict mode, CONN granted to user_2"
676        );
677
678        // revoke non-referred privilege in restrict mode.
679        mgr.revoke_privilege(
680            vec![user_1.user_id],
681            &[create_without_option.clone()],
682            TEST_ROOT_USER_ID,
683            TEST_ROOT_USER_ID,
684            false,
685            false,
686        )
687        .await?;
688
689        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
690        assert_eq!(privilege_1.len(), 1);
691        assert!(privilege_1.iter().all(|gp| gp.object
692            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
693            && gp.action_with_opts[0].action == PbAction::Connect as i32));
694
695        // revoke grant option for referred privilege in cascade mode.
696        mgr.revoke_privilege(
697            vec![user_1.user_id],
698            &[conn_with_option.clone()],
699            TEST_ROOT_USER_ID,
700            TEST_ROOT_USER_ID,
701            true,
702            true,
703        )
704        .await?;
705        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
706        assert_eq!(privilege_1.len(), 1);
707        assert!(privilege_1.iter().all(|gp| gp.object
708            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
709            && gp.action_with_opts[0].action == PbAction::Connect as i32
710            && !gp.action_with_opts[0].with_grant_option));
711        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
712        assert_eq!(privilege_2.len(), 1);
713        assert!(privilege_2.iter().all(|gp| gp.object
714            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
715            && gp.action_with_opts[0].action == PbAction::Connect as i32
716            && !gp.action_with_opts[0].with_grant_option));
717
718        // revoke referred privilege in cascade mode.
719        mgr.revoke_privilege(
720            vec![user_1.user_id],
721            &[conn_with_option.clone()],
722            TEST_ROOT_USER_ID,
723            TEST_ROOT_USER_ID,
724            false,
725            true,
726        )
727        .await?;
728        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
729        assert!(privilege_1.is_empty());
730        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
731        assert!(privilege_2.is_empty());
732
733        mgr.drop_user(user_1.user_id).await?;
734        mgr.drop_user(user_2.user_id).await?;
735        Ok(())
736    }
737}