risingwave_meta/controller/
user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
21use risingwave_meta_model::user_privilege::Action;
22use risingwave_meta_model::{
23    AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
24    user_default_privilege, user_privilege,
25};
26use risingwave_pb::common::PbObjectType;
27use risingwave_pb::meta::subscribe_response::{
28    Info as NotificationInfo, Operation as NotificationOperation,
29};
30use risingwave_pb::user::update_user_request::PbUpdateField;
31use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
32use sea_orm::ActiveValue::Set;
33use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
34use sea_orm::{
35    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
36    QuerySelect, TransactionTrait,
37};
38
39use crate::controller::catalog::CatalogController;
40use crate::controller::utils::{
41    PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
42    ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
43    get_index_state_tables_by_table_id, get_internal_tables_by_id, get_object_owner,
44    get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
45};
46use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
47use crate::{MetaError, MetaResult};
48
49impl CatalogController {
50    pub(crate) async fn notify_users_update(
51        &self,
52        user_infos: Vec<PbUserInfo>,
53    ) -> NotificationVersion {
54        let mut version = 0;
55        for info in user_infos {
56            version = self
57                .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
58                .await;
59        }
60        version
61    }
62
63    pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
64        let inner = self.inner.write().await;
65        let txn = inner.db.begin().await?;
66        check_user_name_duplicate(&pb_user.name, &txn).await?;
67
68        let grant_privileges = pb_user.grant_privileges.clone();
69        let user: user::ActiveModel = pb_user.into();
70        let user = user.insert(&txn).await?;
71
72        if !grant_privileges.is_empty() {
73            let mut privileges = vec![];
74            for gp in &grant_privileges {
75                let id = extract_grant_obj_id(gp.get_object()?);
76                for action_with_opt in &gp.action_with_opts {
77                    privileges.push(user_privilege::ActiveModel {
78                        user_id: Set(user.user_id),
79                        oid: Set(id),
80                        granted_by: Set(action_with_opt.granted_by as _),
81                        action: Set(action_with_opt.get_action()?.into()),
82                        with_grant_option: Set(action_with_opt.with_grant_option),
83                        ..Default::default()
84                    });
85                }
86            }
87            UserPrivilege::insert_many(privileges).exec(&txn).await?;
88        }
89        txn.commit().await?;
90
91        let mut user_info: PbUserInfo = user.into();
92        user_info.grant_privileges = grant_privileges;
93        let version = self
94            .notify_frontend(
95                NotificationOperation::Add,
96                NotificationInfo::User(user_info),
97            )
98            .await;
99
100        Ok(version)
101    }
102
103    pub async fn update_user(
104        &self,
105        update_user: PbUserInfo,
106        update_fields: &[PbUpdateField],
107    ) -> MetaResult<NotificationVersion> {
108        let inner = self.inner.write().await;
109        let rename_flag = update_fields.contains(&PbUpdateField::Rename);
110        if rename_flag {
111            check_user_name_duplicate(&update_user.name, &inner.db).await?;
112        }
113
114        let user = User::find_by_id(update_user.id as UserId)
115            .one(&inner.db)
116            .await?
117            .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
118        let mut user = user.into_active_model();
119        update_fields.iter().for_each(|&field| match field {
120            PbUpdateField::Unspecified => unreachable!(),
121            PbUpdateField::Super => user.is_super = Set(update_user.is_super),
122            PbUpdateField::Login => user.can_login = Set(update_user.can_login),
123            PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
124            PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
125            PbUpdateField::AuthInfo => {
126                user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
127            }
128            PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
129        });
130
131        let user = user.update(&inner.db).await?;
132        let mut user_info: PbUserInfo = user.into();
133        user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
134        let version = self
135            .notify_frontend(
136                NotificationOperation::Update,
137                NotificationInfo::User(user_info),
138            )
139            .await;
140
141        Ok(version)
142    }
143
144    #[cfg(test)]
145    pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
146        let inner = self.inner.read().await;
147        let user = User::find_by_id(id)
148            .one(&inner.db)
149            .await?
150            .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
151        Ok(user)
152    }
153
154    #[cfg(test)]
155    pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
156        let inner = self.inner.read().await;
157        let user = User::find()
158            .filter(user::Column::Name.eq(name))
159            .one(&inner.db)
160            .await?
161            .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
162        Ok(user)
163    }
164
165    pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
166        let inner = self.inner.write().await;
167        let txn = inner.db.begin().await?;
168        let user = User::find_by_id(user_id)
169            .one(&txn)
170            .await?
171            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
172        if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
173            return Err(MetaError::permission_denied(format!(
174                "drop default super user {} is not allowed",
175                user.name
176            )));
177        }
178
179        // check if the user is the owner of any objects.
180        let count = Object::find()
181            .filter(object::Column::OwnerId.eq(user_id))
182            .count(&txn)
183            .await?;
184        if count != 0 {
185            return Err(MetaError::permission_denied(format!(
186                "drop user {} is not allowed, because it owns {} objects",
187                user.name, count
188            )));
189        }
190
191        // check if the user granted any privileges to other users.
192        let count = UserPrivilege::find()
193            .filter(user_privilege::Column::GrantedBy.eq(user_id))
194            .count(&txn)
195            .await?;
196        if count != 0 {
197            return Err(MetaError::permission_denied(format!(
198                "drop user {} is not allowed, because it granted {} privileges to others",
199                user.name, count
200            )));
201        }
202
203        let res = User::delete_by_id(user_id).exec(&txn).await?;
204        if res.rows_affected != 1 {
205            return Err(MetaError::catalog_id_not_found("user", user_id));
206        }
207        txn.commit().await?;
208
209        let version = self
210            .notify_frontend(
211                NotificationOperation::Delete,
212                NotificationInfo::User(PbUserInfo {
213                    id: user_id as _,
214                    ..Default::default()
215                }),
216            )
217            .await;
218
219        Ok(version)
220    }
221
222    pub async fn grant_privilege(
223        &self,
224        user_ids: Vec<UserId>,
225        new_grant_privileges: &[PbGrantPrivilege],
226        grantor: UserId,
227        with_grant_option: bool,
228    ) -> MetaResult<NotificationVersion> {
229        let inner = self.inner.write().await;
230        let txn = inner.db.begin().await?;
231        for user_id in &user_ids {
232            ensure_user_id(*user_id, &txn).await?;
233        }
234
235        let mut privileges = vec![];
236        for gp in new_grant_privileges {
237            let id = extract_grant_obj_id(gp.get_object()?);
238            let internal_table_ids = get_internal_tables_by_id(id, &txn).await?;
239            let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?;
240            for action_with_opt in &gp.action_with_opts {
241                let action = action_with_opt.get_action()?.into();
242                privileges.push(user_privilege::ActiveModel {
243                    oid: Set(id),
244                    granted_by: Set(grantor),
245                    action: Set(action),
246                    with_grant_option: Set(with_grant_option),
247                    ..Default::default()
248                });
249                if action == Action::Select {
250                    privileges.extend(
251                        internal_table_ids
252                            .iter()
253                            .chain(index_state_table_ids.iter())
254                            .map(|&tid| user_privilege::ActiveModel {
255                                oid: Set(tid),
256                                granted_by: Set(grantor),
257                                action: Set(Action::Select),
258                                with_grant_option: Set(with_grant_option),
259                                ..Default::default()
260                            }),
261                    );
262                }
263            }
264        }
265
266        // check whether grantor has the privilege to grant the privilege.
267        let user = User::find_by_id(grantor)
268            .one(&txn)
269            .await?
270            .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
271        let mut filtered_privileges = vec![];
272        if !user.is_super {
273            for mut privilege in privileges {
274                if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
275                    filtered_privileges.push(privilege);
276                    continue;
277                }
278                let filter = user_privilege::Column::UserId
279                    .eq(grantor)
280                    .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
281                    .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
282                    .and(user_privilege::Column::WithGrantOption.eq(true));
283                let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
284                    .select_only()
285                    .column(user_privilege::Column::Id)
286                    .filter(filter)
287                    .into_tuple()
288                    .one(&txn)
289                    .await?;
290                let Some(privilege_id) = privilege_id else {
291                    tracing::warn!(
292                        "user {} don't have privilege {:?} or grant option",
293                        grantor,
294                        privilege.action
295                    );
296                    continue;
297                };
298                privilege.dependent_id = Set(Some(privilege_id));
299                filtered_privileges.push(privilege);
300            }
301        } else {
302            filtered_privileges = privileges;
303        }
304
305        // insert privileges
306        let user_privileges = user_ids
307            .iter()
308            .flat_map(|user_id| {
309                filtered_privileges.iter().map(|p| {
310                    let mut p = p.clone();
311                    p.user_id = Set(*user_id);
312                    p
313                })
314            })
315            .collect_vec();
316        for privilege in user_privileges {
317            let mut on_conflict = OnConflict::columns([
318                user_privilege::Column::UserId,
319                user_privilege::Column::Oid,
320                user_privilege::Column::Action,
321                user_privilege::Column::GrantedBy,
322            ]);
323            if *privilege.with_grant_option.as_ref() {
324                on_conflict.update_column(user_privilege::Column::WithGrantOption);
325            } else {
326                // Workaround to support MYSQL for `DO NOTHING`.
327                on_conflict.update_column(user_privilege::Column::UserId);
328            }
329
330            UserPrivilege::insert(privilege)
331                .on_conflict(on_conflict)
332                .do_nothing()
333                .exec(&txn)
334                .await?;
335        }
336
337        let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
338
339        txn.commit().await?;
340
341        let version = self.notify_users_update(user_infos).await;
342        Ok(version)
343    }
344
345    pub async fn revoke_privilege(
346        &self,
347        user_ids: Vec<UserId>,
348        revoke_grant_privileges: &[PbGrantPrivilege],
349        granted_by: UserId,
350        revoke_by: UserId,
351        revoke_grant_option: bool,
352        cascade: bool,
353    ) -> MetaResult<NotificationVersion> {
354        let inner = self.inner.write().await;
355        let txn = inner.db.begin().await?;
356        for user_id in &user_ids {
357            ensure_user_id(*user_id, &txn).await?;
358        }
359        // check whether revoke has the privilege to grant the privilege.
360        let revoke_user = User::find_by_id(revoke_by)
361            .one(&txn)
362            .await?
363            .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
364
365        // check whether user can revoke the privilege.
366        if !revoke_user.is_super && granted_by != revoke_by {
367            let granted_user_name: String = User::find_by_id(granted_by)
368                .select_only()
369                .column(user::Column::Name)
370                .into_tuple()
371                .one(&txn)
372                .await?
373                .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
374            return Err(MetaError::permission_denied(format!(
375                "user {} is not super, can't revoke privileges for {}",
376                revoke_user.name, granted_user_name
377            )));
378        }
379
380        let mut revoke_items = HashMap::new();
381        for privilege in revoke_grant_privileges {
382            let obj = extract_grant_obj_id(privilege.get_object()?);
383            let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?;
384            let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?;
385            let mut include_select = false;
386            let actions = privilege
387                .action_with_opts
388                .iter()
389                .map(|ao| {
390                    let action = Action::from(ao.get_action().unwrap());
391                    if action == Action::Select {
392                        include_select = true;
393                    }
394                    action
395                })
396                .collect_vec();
397            revoke_items.insert(obj, actions);
398            if include_select {
399                revoke_items.extend(
400                    internal_table_ids
401                        .iter()
402                        .chain(index_state_table_ids.iter())
403                        .map(|&tid| (tid, vec![Action::Select])),
404                );
405            }
406        }
407
408        let filter = if !revoke_user.is_super {
409            // ensure user have grant options or is owner of the object.
410            for (obj, actions) in &revoke_items {
411                if revoke_by == get_object_owner(*obj, &txn).await? {
412                    continue;
413                }
414                let owned_actions: HashSet<Action> = UserPrivilege::find()
415                    .select_only()
416                    .column(user_privilege::Column::Action)
417                    .filter(
418                        user_privilege::Column::UserId
419                            .eq(granted_by)
420                            .and(user_privilege::Column::Oid.eq(*obj))
421                            .and(user_privilege::Column::WithGrantOption.eq(true)),
422                    )
423                    .into_tuple::<Action>()
424                    .all(&txn)
425                    .await?
426                    .into_iter()
427                    .collect();
428                if actions.iter().any(|ac| !owned_actions.contains(ac)) {
429                    return Err(MetaError::permission_denied(format!(
430                        "user {} don't have privileges {:?} or grant option",
431                        revoke_user.name, actions,
432                    )));
433                }
434            }
435
436            user_privilege::Column::GrantedBy
437                .eq(granted_by)
438                .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
439        } else {
440            user_privilege::Column::UserId.is_in(user_ids.clone())
441        };
442        let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
443        for (obj, actions) in revoke_items {
444            let filter = filter
445                .clone()
446                .and(user_privilege::Column::Oid.eq(obj))
447                .and(user_privilege::Column::Action.is_in(actions));
448            root_user_privileges.extend(
449                UserPrivilege::find()
450                    .select_only()
451                    .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
452                    .filter(filter)
453                    .into_partial_model()
454                    .all(&txn)
455                    .await?,
456            );
457        }
458        if root_user_privileges.is_empty() {
459            tracing::warn!("no privilege to revoke, ignore it");
460            return Ok(IGNORED_NOTIFICATION_VERSION);
461        }
462
463        // check if the user granted any privileges to other users.
464        let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
465        let (all_privilege_ids, to_update_user_ids) = if !cascade {
466            ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
467            (
468                root_privilege_ids.clone(),
469                root_user_privileges
470                    .iter()
471                    .map(|ur| ur.user_id)
472                    .collect_vec(),
473            )
474        } else {
475            let all_user_privileges =
476                get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
477            (
478                all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
479                all_user_privileges
480                    .iter()
481                    .map(|ur| ur.user_id)
482                    .collect_vec(),
483            )
484        };
485
486        if revoke_grant_option {
487            UserPrivilege::update_many()
488                .col_expr(
489                    user_privilege::Column::WithGrantOption,
490                    SimpleExpr::Value(Value::Bool(Some(false))),
491                )
492                .filter(
493                    user_privilege::Column::Id
494                        .is_in(all_privilege_ids)
495                        .and(user_privilege::Column::WithGrantOption.eq(true)),
496                )
497                .exec(&txn)
498                .await?;
499        } else {
500            // The dependent privileges will be deleted cascade.
501            UserPrivilege::delete_many()
502                .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
503                .exec(&txn)
504                .await?;
505        }
506
507        let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
508
509        txn.commit().await?;
510
511        let version = self.notify_users_update(user_infos).await;
512        Ok(version)
513    }
514
515    pub async fn grant_default_privileges(
516        &self,
517        user_ids: Vec<UserId>,
518        database_id: DatabaseId,
519        schema_ids: Vec<SchemaId>,
520        grantor: UserId,
521        actions: Vec<PbAction>,
522        object_type: PbObjectType,
523        grantees: Vec<UserId>,
524        with_grant_option: bool,
525    ) -> MetaResult<()> {
526        tracing::debug!(
527            ?user_ids,
528            database_id,
529            ?schema_ids,
530            ?actions,
531            ?object_type,
532            ?grantees,
533            with_grant_option,
534            "grant default privileges",
535        );
536        let inner = self.inner.write().await;
537        let txn = inner.db.begin().await?;
538        for user_id in &user_ids {
539            ensure_user_id(*user_id, &txn).await?;
540        }
541        ensure_object_id(ObjectType::Database, database_id, &txn).await?;
542        for schema_id in &schema_ids {
543            ensure_object_id(ObjectType::Schema, *schema_id, &txn).await?;
544        }
545        for grantee in &grantees {
546            ensure_user_id(*grantee, &txn).await?;
547        }
548        if object_type == PbObjectType::Schema {
549            assert!(
550                schema_ids.is_empty(),
551                "schema_ids should be empty when object_type is Schema"
552            );
553
554            // Note that the UNIQUE constraint does not treat NULL values as equal, we cannot rely on conflict check
555            // to update it and have to check existing default privileges manually.
556            let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
557            let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
558                .select_only()
559                .columns([
560                    user_default_privilege::Column::Id,
561                    user_default_privilege::Column::UserId,
562                    user_default_privilege::Column::Grantee,
563                    user_default_privilege::Column::Action,
564                ])
565                .filter(
566                    user_default_privilege::Column::DatabaseId
567                        .eq(database_id)
568                        .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
569                        .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
570                        .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
571                        .and(user_default_privilege::Column::Action.is_in(actions.clone())),
572                )
573                .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
574                .all(&txn)
575                .await?
576                .into_iter()
577                .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
578                .collect();
579
580            for user_id in user_ids {
581                for grantee in &grantees {
582                    for action in &actions {
583                        if let Some(existing_id) =
584                            existing_default_privileges.get(&(user_id, *grantee, *action))
585                            && with_grant_option
586                        {
587                            // If the default privilege already exists, we should update the grant option.
588                            UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
589                                id: Set(*existing_id),
590                                with_grant_option: Set(true),
591                                granted_by: Set(grantor as _),
592                                ..Default::default()
593                            })
594                            .exec(&txn)
595                            .await?;
596                        } else {
597                            UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
598                                id: Default::default(),
599                                database_id: Set(database_id),
600                                schema_id: Set(None),
601                                object_type: Set(ObjectType::Schema),
602                                for_materialized_view: Set(false),
603                                user_id: Set(user_id),
604                                grantee: Set(*grantee),
605                                granted_by: Set(grantor as _),
606                                action: Set(*action),
607                                with_grant_option: Set(with_grant_option),
608                            })
609                            .exec(&txn)
610                            .await?;
611                        }
612                    }
613                }
614            }
615        } else {
616            let mut default_privileges = vec![];
617            for user_id in user_ids {
618                for grantee in &grantees {
619                    for action in &actions {
620                        if schema_ids.is_empty() {
621                            default_privileges.push(user_default_privilege::ActiveModel {
622                                id: Default::default(),
623                                database_id: Set(database_id),
624                                schema_id: Set(None),
625                                object_type: Set(object_type.into()),
626                                for_materialized_view: Set(object_type == PbObjectType::Mview),
627                                user_id: Set(user_id),
628                                grantee: Set(*grantee),
629                                granted_by: Set(grantor as _),
630                                action: Set((*action).into()),
631                                with_grant_option: Set(with_grant_option),
632                            });
633                            continue;
634                        }
635                        for schema_id in &schema_ids {
636                            default_privileges.push(user_default_privilege::ActiveModel {
637                                id: Default::default(),
638                                database_id: Set(database_id),
639                                schema_id: Set(Some(*schema_id)),
640                                object_type: Set(object_type.into()),
641                                for_materialized_view: Set(object_type == PbObjectType::Mview),
642                                user_id: Set(user_id),
643                                grantee: Set(*grantee),
644                                granted_by: Set(grantor as _),
645                                action: Set((*action).into()),
646                                with_grant_option: Set(with_grant_option),
647                            });
648                        }
649                    }
650                }
651            }
652
653            let mut on_conflict = OnConflict::columns([
654                user_default_privilege::Column::UserId,
655                user_default_privilege::Column::DatabaseId,
656                user_default_privilege::Column::SchemaId,
657                user_default_privilege::Column::ObjectType,
658                user_default_privilege::Column::ForMaterializedView,
659                user_default_privilege::Column::Grantee,
660                user_default_privilege::Column::Action,
661            ]);
662            if with_grant_option {
663                on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
664            } else {
665                // Workaround to support MYSQL for `DO NOTHING`.
666                on_conflict.update_column(user_default_privilege::Column::UserId);
667            }
668            UserDefaultPrivilege::insert_many(default_privileges)
669                .on_conflict(on_conflict)
670                .do_nothing()
671                .exec(&txn)
672                .await?;
673        }
674
675        txn.commit().await?;
676        Ok(())
677    }
678
679    pub async fn revoke_default_privileges(
680        &self,
681        user_ids: Vec<UserId>,
682        database_id: DatabaseId,
683        schema_ids: Vec<SchemaId>,
684        actions: Vec<PbAction>,
685        object_type: PbObjectType,
686        grantees: Vec<UserId>,
687        revoke_grant_option: bool,
688    ) -> MetaResult<()> {
689        let inner = self.inner.write().await;
690        let txn = inner.db.begin().await?;
691        for user_id in &user_ids {
692            ensure_user_id(*user_id, &txn).await?;
693        }
694
695        let schema_filter = if schema_ids.is_empty() {
696            user_default_privilege::Column::SchemaId.is_null()
697        } else {
698            user_default_privilege::Column::SchemaId.is_in(schema_ids)
699        };
700        let filter = user_default_privilege::Column::DatabaseId
701            .eq(database_id)
702            .and(schema_filter)
703            .and(user_default_privilege::Column::UserId.is_in(user_ids))
704            .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
705            .and(user_default_privilege::Column::Grantee.is_in(grantees))
706            .and(
707                user_default_privilege::Column::Action
708                    .is_in(actions.iter().map(|&a| Action::from(a))),
709            );
710
711        if revoke_grant_option {
712            // update the `with_grant_option` field to false
713            let res = UserDefaultPrivilege::update_many()
714                .col_expr(
715                    user_default_privilege::Column::WithGrantOption,
716                    SimpleExpr::Value(Value::Bool(Some(false))),
717                )
718                .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
719                .exec(&txn)
720                .await?;
721            tracing::info!(
722                "revoke {count} grant option for default privileges",
723                count = res.rows_affected
724            );
725        } else {
726            let res = UserDefaultPrivilege::delete_many()
727                .filter(filter)
728                .exec(&txn)
729                .await?;
730            tracing::info!(
731                "revoke {count} default privileges",
732                count = res.rows_affected
733            );
734        }
735
736        txn.commit().await?;
737        Ok(())
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
744
745    use super::*;
746    use crate::manager::MetaSrvEnv;
747
748    const TEST_DATABASE_ID: DatabaseId = 1;
749    const TEST_ROOT_USER_ID: UserId = 1;
750
751    fn make_test_user(name: &str) -> PbUserInfo {
752        PbUserInfo {
753            name: name.to_owned(),
754            ..Default::default()
755        }
756    }
757
758    fn make_privilege(
759        object: PbObject,
760        actions: &[PbAction],
761        with_grant_option: bool,
762    ) -> PbGrantPrivilege {
763        PbGrantPrivilege {
764            object: Some(object),
765            action_with_opts: actions
766                .iter()
767                .map(|&action| PbActionWithGrantOption {
768                    action: action as _,
769                    with_grant_option,
770                    ..Default::default()
771                })
772                .collect(),
773        }
774    }
775
776    #[tokio::test]
777    async fn test_user_and_privilege() -> MetaResult<()> {
778        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
779        mgr.create_user(make_test_user("test_user_1")).await?;
780        mgr.create_user(make_test_user("test_user_2")).await?;
781        let user_1 = mgr.get_user_by_name("test_user_1").await?;
782        let user_2 = mgr.get_user_by_name("test_user_2").await?;
783
784        assert!(
785            mgr.create_user(make_test_user("test_user_1"))
786                .await
787                .is_err(),
788            "user_1 already exists"
789        );
790        mgr.update_user(
791            PbUserInfo {
792                id: user_1.user_id as _,
793                name: "test_user_1_new".to_owned(),
794                ..Default::default()
795            },
796            &[PbUpdateField::Rename],
797        )
798        .await?;
799        let user_1 = mgr.get_user(user_1.user_id).await?;
800        assert_eq!(user_1.name, "test_user_1_new".to_owned());
801
802        let conn_with_option = make_privilege(
803            PbObject::DatabaseId(TEST_DATABASE_ID as _),
804            &[PbAction::Connect],
805            true,
806        );
807        let create_without_option = make_privilege(
808            PbObject::DatabaseId(TEST_DATABASE_ID as _),
809            &[PbAction::Create],
810            false,
811        );
812        // ROOT grant CONN with grant option to user_1.
813        mgr.grant_privilege(
814            vec![user_1.user_id],
815            &[conn_with_option.clone()],
816            TEST_ROOT_USER_ID,
817            true,
818        )
819        .await?;
820        // ROOT grant CREATE without grant option to user_1.
821        mgr.grant_privilege(
822            vec![user_1.user_id],
823            &[create_without_option.clone()],
824            TEST_ROOT_USER_ID,
825            false,
826        )
827        .await?;
828        // user_1 grant CONN with grant option to user_2.
829        mgr.grant_privilege(
830            vec![user_2.user_id],
831            &[conn_with_option.clone()],
832            user_1.user_id,
833            true,
834        )
835        .await?;
836        mgr.grant_privilege(
837            vec![user_2.user_id],
838            &[create_without_option.clone()],
839            user_1.user_id,
840            false,
841        )
842        .await?;
843
844        assert!(
845            mgr.drop_user(user_1.user_id).await.is_err(),
846            "user_1 can't be dropped"
847        );
848
849        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
850        assert_eq!(privilege_1.len(), 2);
851        assert!(privilege_1.iter().all(|gp| gp.object
852            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
853            && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32));
854
855        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
856        assert_eq!(privilege_2.len(), 1);
857        assert!(privilege_2.iter().all(|gp| gp.object
858            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
859            && gp.action_with_opts[0].granted_by == user_1.user_id as u32
860            && gp.action_with_opts[0].with_grant_option));
861
862        // revoke privilege for others by non-super user.
863        assert!(
864            mgr.revoke_privilege(
865                vec![user_1.user_id],
866                &[conn_with_option.clone()],
867                TEST_ROOT_USER_ID,
868                user_2.user_id,
869                false,
870                false
871            )
872            .await
873            .is_err(),
874            "user_2 can't revoke for user_1"
875        );
876
877        // revoke privilege without grant option.
878        assert!(
879            mgr.revoke_privilege(
880                vec![user_2.user_id],
881                &[create_without_option.clone()],
882                user_1.user_id,
883                user_1.user_id,
884                false,
885                false
886            )
887            .await
888            .is_err(),
889            "user_2 don't have grant option for CREATE"
890        );
891
892        // revoke referred privilege in restrict mode.
893        assert!(
894            mgr.revoke_privilege(
895                vec![user_1.user_id],
896                &[conn_with_option.clone()],
897                TEST_ROOT_USER_ID,
898                TEST_ROOT_USER_ID,
899                false,
900                false
901            )
902            .await
903            .is_err(),
904            "permission deny in restrict mode, CONN granted to user_2"
905        );
906
907        // revoke non-referred privilege in restrict mode.
908        mgr.revoke_privilege(
909            vec![user_1.user_id],
910            &[create_without_option.clone()],
911            TEST_ROOT_USER_ID,
912            TEST_ROOT_USER_ID,
913            false,
914            false,
915        )
916        .await?;
917
918        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
919        assert_eq!(privilege_1.len(), 1);
920        assert!(privilege_1.iter().all(|gp| gp.object
921            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
922            && gp.action_with_opts[0].action == PbAction::Connect as i32));
923
924        // revoke grant option for referred privilege in cascade mode.
925        mgr.revoke_privilege(
926            vec![user_1.user_id],
927            &[conn_with_option.clone()],
928            TEST_ROOT_USER_ID,
929            TEST_ROOT_USER_ID,
930            true,
931            true,
932        )
933        .await?;
934        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
935        assert_eq!(privilege_1.len(), 1);
936        assert!(privilege_1.iter().all(|gp| gp.object
937            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
938            && gp.action_with_opts[0].action == PbAction::Connect as i32
939            && !gp.action_with_opts[0].with_grant_option));
940        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
941        assert_eq!(privilege_2.len(), 1);
942        assert!(privilege_2.iter().all(|gp| gp.object
943            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
944            && gp.action_with_opts[0].action == PbAction::Connect as i32
945            && !gp.action_with_opts[0].with_grant_option));
946
947        // revoke referred privilege in cascade mode.
948        mgr.revoke_privilege(
949            vec![user_1.user_id],
950            &[conn_with_option.clone()],
951            TEST_ROOT_USER_ID,
952            TEST_ROOT_USER_ID,
953            false,
954            true,
955        )
956        .await?;
957        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
958        assert!(privilege_1.is_empty());
959        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
960        assert!(privilege_2.is_empty());
961
962        mgr.drop_user(user_1.user_id).await?;
963        mgr.drop_user(user_2.user_id).await?;
964        Ok(())
965    }
966}