risingwave_meta/controller/
user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
21use risingwave_meta_model::user_privilege::Action;
22use risingwave_meta_model::{
23    AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
24    user_default_privilege, user_privilege,
25};
26use risingwave_pb::common::PbObjectType;
27use risingwave_pb::meta::subscribe_response::{
28    Info as NotificationInfo, Operation as NotificationOperation,
29};
30use risingwave_pb::user::update_user_request::PbUpdateField;
31use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
32use sea_orm::ActiveValue::Set;
33use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
34use sea_orm::{
35    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
36    QuerySelect, TransactionTrait,
37};
38
39use crate::controller::catalog::CatalogController;
40use crate::controller::utils::{
41    PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
42    ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
43    get_index_state_tables_by_table_id, get_internal_tables_by_id, get_object_owner,
44    get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
45};
46use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
47use crate::{MetaError, MetaResult};
48
49impl CatalogController {
50    pub(crate) async fn notify_users_update(
51        &self,
52        user_infos: Vec<PbUserInfo>,
53    ) -> NotificationVersion {
54        let mut version = 0;
55        for info in user_infos {
56            version = self
57                .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
58                .await;
59        }
60        version
61    }
62
63    pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
64        let inner = self.inner.write().await;
65        let txn = inner.db.begin().await?;
66        check_user_name_duplicate(&pb_user.name, &txn).await?;
67
68        let grant_privileges = pb_user.grant_privileges.clone();
69        let user: user::ActiveModel = pb_user.into();
70        let user = user.insert(&txn).await?;
71
72        if !grant_privileges.is_empty() {
73            let mut privileges = vec![];
74            for gp in &grant_privileges {
75                let id = extract_grant_obj_id(gp.get_object()?);
76                for action_with_opt in &gp.action_with_opts {
77                    privileges.push(user_privilege::ActiveModel {
78                        user_id: Set(user.user_id),
79                        oid: Set(id),
80                        granted_by: Set(action_with_opt.granted_by as _),
81                        action: Set(action_with_opt.get_action()?.into()),
82                        with_grant_option: Set(action_with_opt.with_grant_option),
83                        ..Default::default()
84                    });
85                }
86            }
87            UserPrivilege::insert_many(privileges).exec(&txn).await?;
88        }
89        txn.commit().await?;
90
91        let mut user_info: PbUserInfo = user.into();
92        user_info.grant_privileges = grant_privileges;
93        let version = self
94            .notify_frontend(
95                NotificationOperation::Add,
96                NotificationInfo::User(user_info),
97            )
98            .await;
99
100        Ok(version)
101    }
102
103    pub async fn update_user(
104        &self,
105        update_user: PbUserInfo,
106        update_fields: &[PbUpdateField],
107    ) -> MetaResult<NotificationVersion> {
108        let inner = self.inner.write().await;
109        let rename_flag = update_fields.contains(&PbUpdateField::Rename);
110        if rename_flag {
111            check_user_name_duplicate(&update_user.name, &inner.db).await?;
112        }
113
114        let user = User::find_by_id(update_user.id as UserId)
115            .one(&inner.db)
116            .await?
117            .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
118        let mut user = user.into_active_model();
119        update_fields.iter().for_each(|&field| match field {
120            PbUpdateField::Unspecified => unreachable!(),
121            PbUpdateField::Super => user.is_super = Set(update_user.is_super),
122            PbUpdateField::Login => user.can_login = Set(update_user.can_login),
123            PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
124            PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
125            PbUpdateField::AuthInfo => {
126                user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
127            }
128            PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
129            PbUpdateField::Admin => user.is_admin = Set(update_user.is_admin),
130        });
131
132        let user = user.update(&inner.db).await?;
133        let mut user_info: PbUserInfo = user.into();
134        user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
135        let version = self
136            .notify_frontend(
137                NotificationOperation::Update,
138                NotificationInfo::User(user_info),
139            )
140            .await;
141
142        Ok(version)
143    }
144
145    #[cfg(test)]
146    pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
147        let inner = self.inner.read().await;
148        let user = User::find_by_id(id)
149            .one(&inner.db)
150            .await?
151            .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
152        Ok(user)
153    }
154
155    #[cfg(test)]
156    pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
157        let inner = self.inner.read().await;
158        let user = User::find()
159            .filter(user::Column::Name.eq(name))
160            .one(&inner.db)
161            .await?
162            .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
163        Ok(user)
164    }
165
166    pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
167        let inner = self.inner.write().await;
168        let txn = inner.db.begin().await?;
169        let user = User::find_by_id(user_id)
170            .one(&txn)
171            .await?
172            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
173        if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
174            return Err(MetaError::permission_denied(format!(
175                "drop default super user {} is not allowed",
176                user.name
177            )));
178        }
179
180        // check if the user is the owner of any objects.
181        let count = Object::find()
182            .filter(object::Column::OwnerId.eq(user_id))
183            .count(&txn)
184            .await?;
185        if count != 0 {
186            return Err(MetaError::permission_denied(format!(
187                "drop user {} is not allowed, because it owns {} objects",
188                user.name, count
189            )));
190        }
191
192        // check if the user granted any privileges to other users.
193        let count = UserPrivilege::find()
194            .filter(user_privilege::Column::GrantedBy.eq(user_id))
195            .count(&txn)
196            .await?;
197        if count != 0 {
198            return Err(MetaError::permission_denied(format!(
199                "drop user {} is not allowed, because it granted {} privileges to others",
200                user.name, count
201            )));
202        }
203
204        let res = User::delete_by_id(user_id).exec(&txn).await?;
205        if res.rows_affected != 1 {
206            return Err(MetaError::catalog_id_not_found("user", user_id));
207        }
208        txn.commit().await?;
209
210        let version = self
211            .notify_frontend(
212                NotificationOperation::Delete,
213                NotificationInfo::User(PbUserInfo {
214                    id: user_id as _,
215                    ..Default::default()
216                }),
217            )
218            .await;
219
220        Ok(version)
221    }
222
223    pub async fn grant_privilege(
224        &self,
225        user_ids: Vec<UserId>,
226        new_grant_privileges: &[PbGrantPrivilege],
227        grantor: UserId,
228        with_grant_option: bool,
229    ) -> MetaResult<NotificationVersion> {
230        let inner = self.inner.write().await;
231        let txn = inner.db.begin().await?;
232        for user_id in &user_ids {
233            ensure_user_id(*user_id, &txn).await?;
234        }
235
236        let mut privileges = vec![];
237        for gp in new_grant_privileges {
238            let id = extract_grant_obj_id(gp.get_object()?);
239            let internal_table_ids = get_internal_tables_by_id(id.as_job_id(), &txn).await?;
240            let index_state_table_ids =
241                get_index_state_tables_by_table_id(id.as_table_id(), &txn).await?;
242            for action_with_opt in &gp.action_with_opts {
243                let action = action_with_opt.get_action()?.into();
244                privileges.push(user_privilege::ActiveModel {
245                    oid: Set(id),
246                    granted_by: Set(grantor),
247                    action: Set(action),
248                    with_grant_option: Set(with_grant_option),
249                    ..Default::default()
250                });
251                if action == Action::Select {
252                    privileges.extend(
253                        internal_table_ids
254                            .iter()
255                            .chain(index_state_table_ids.iter())
256                            .map(|&tid| user_privilege::ActiveModel {
257                                oid: Set(tid.as_object_id()),
258                                granted_by: Set(grantor),
259                                action: Set(Action::Select),
260                                with_grant_option: Set(with_grant_option),
261                                ..Default::default()
262                            }),
263                    );
264                }
265            }
266        }
267
268        // check whether grantor has the privilege to grant the privilege.
269        let user = User::find_by_id(grantor)
270            .one(&txn)
271            .await?
272            .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
273        let mut filtered_privileges = vec![];
274        if !user.is_super {
275            for mut privilege in privileges {
276                if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
277                    filtered_privileges.push(privilege);
278                    continue;
279                }
280                let filter = user_privilege::Column::UserId
281                    .eq(grantor)
282                    .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
283                    .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
284                    .and(user_privilege::Column::WithGrantOption.eq(true));
285                let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
286                    .select_only()
287                    .column(user_privilege::Column::Id)
288                    .filter(filter)
289                    .into_tuple()
290                    .one(&txn)
291                    .await?;
292                let Some(privilege_id) = privilege_id else {
293                    tracing::warn!(
294                        "user {} don't have privilege {:?} or grant option",
295                        grantor,
296                        privilege.action
297                    );
298                    continue;
299                };
300                privilege.dependent_id = Set(Some(privilege_id));
301                filtered_privileges.push(privilege);
302            }
303        } else {
304            filtered_privileges = privileges;
305        }
306
307        // insert privileges
308        let user_privileges = user_ids
309            .iter()
310            .flat_map(|user_id| {
311                filtered_privileges.iter().map(|p| {
312                    let mut p = p.clone();
313                    p.user_id = Set(*user_id);
314                    p
315                })
316            })
317            .collect_vec();
318        for privilege in user_privileges {
319            let mut on_conflict = OnConflict::columns([
320                user_privilege::Column::UserId,
321                user_privilege::Column::Oid,
322                user_privilege::Column::Action,
323                user_privilege::Column::GrantedBy,
324            ]);
325            if *privilege.with_grant_option.as_ref() {
326                on_conflict.update_column(user_privilege::Column::WithGrantOption);
327            } else {
328                // Workaround to support MYSQL for `DO NOTHING`.
329                on_conflict.update_column(user_privilege::Column::UserId);
330            }
331
332            UserPrivilege::insert(privilege)
333                .on_conflict(on_conflict)
334                .do_nothing()
335                .exec(&txn)
336                .await?;
337        }
338
339        let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
340
341        txn.commit().await?;
342
343        let version = self.notify_users_update(user_infos).await;
344        Ok(version)
345    }
346
347    pub async fn revoke_privilege(
348        &self,
349        user_ids: Vec<UserId>,
350        revoke_grant_privileges: &[PbGrantPrivilege],
351        granted_by: UserId,
352        revoke_by: UserId,
353        revoke_grant_option: bool,
354        cascade: bool,
355    ) -> MetaResult<NotificationVersion> {
356        let inner = self.inner.write().await;
357        let txn = inner.db.begin().await?;
358        for user_id in &user_ids {
359            ensure_user_id(*user_id, &txn).await?;
360        }
361        // check whether revoke has the privilege to grant the privilege.
362        let revoke_user = User::find_by_id(revoke_by)
363            .one(&txn)
364            .await?
365            .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
366
367        // check whether user can revoke the privilege.
368        if !revoke_user.is_super && granted_by != revoke_by {
369            let granted_user_name: String = User::find_by_id(granted_by)
370                .select_only()
371                .column(user::Column::Name)
372                .into_tuple()
373                .one(&txn)
374                .await?
375                .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
376            return Err(MetaError::permission_denied(format!(
377                "user {} is not super, can't revoke privileges for {}",
378                revoke_user.name, granted_user_name
379            )));
380        }
381
382        let mut revoke_items = HashMap::new();
383        for privilege in revoke_grant_privileges {
384            let obj = extract_grant_obj_id(privilege.get_object()?);
385            let internal_table_ids = get_internal_tables_by_id(obj.as_job_id(), &txn).await?;
386            let index_state_table_ids =
387                get_index_state_tables_by_table_id(obj.as_table_id(), &txn).await?;
388            let mut include_select = false;
389            let actions = privilege
390                .action_with_opts
391                .iter()
392                .map(|ao| {
393                    let action = Action::from(ao.get_action().unwrap());
394                    if action == Action::Select {
395                        include_select = true;
396                    }
397                    action
398                })
399                .collect_vec();
400            revoke_items.insert(obj, actions);
401            if include_select {
402                revoke_items.extend(
403                    internal_table_ids
404                        .iter()
405                        .chain(index_state_table_ids.iter())
406                        .map(|&tid| (tid.as_object_id(), vec![Action::Select])),
407                );
408            }
409        }
410
411        let filter = if !revoke_user.is_super {
412            // ensure user have grant options or is owner of the object.
413            for (obj, actions) in &revoke_items {
414                if revoke_by == get_object_owner(*obj, &txn).await? {
415                    continue;
416                }
417                let owned_actions: HashSet<Action> = UserPrivilege::find()
418                    .select_only()
419                    .column(user_privilege::Column::Action)
420                    .filter(
421                        user_privilege::Column::UserId
422                            .eq(granted_by)
423                            .and(user_privilege::Column::Oid.eq(*obj))
424                            .and(user_privilege::Column::WithGrantOption.eq(true)),
425                    )
426                    .into_tuple::<Action>()
427                    .all(&txn)
428                    .await?
429                    .into_iter()
430                    .collect();
431                if actions.iter().any(|ac| !owned_actions.contains(ac)) {
432                    return Err(MetaError::permission_denied(format!(
433                        "user {} don't have privileges {:?} or grant option",
434                        revoke_user.name, actions,
435                    )));
436                }
437            }
438
439            user_privilege::Column::GrantedBy
440                .eq(granted_by)
441                .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
442        } else {
443            user_privilege::Column::UserId.is_in(user_ids.clone())
444        };
445        let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
446        for (obj, actions) in revoke_items {
447            let filter = filter
448                .clone()
449                .and(user_privilege::Column::Oid.eq(obj))
450                .and(user_privilege::Column::Action.is_in(actions));
451            root_user_privileges.extend(
452                UserPrivilege::find()
453                    .select_only()
454                    .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
455                    .filter(filter)
456                    .into_partial_model()
457                    .all(&txn)
458                    .await?,
459            );
460        }
461        if root_user_privileges.is_empty() {
462            tracing::warn!("no privilege to revoke, ignore it");
463            return Ok(IGNORED_NOTIFICATION_VERSION);
464        }
465
466        // check if the user granted any privileges to other users.
467        let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
468        let (all_privilege_ids, to_update_user_ids) = if !cascade {
469            ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
470            (
471                root_privilege_ids.clone(),
472                root_user_privileges
473                    .iter()
474                    .map(|ur| ur.user_id)
475                    .collect_vec(),
476            )
477        } else {
478            let all_user_privileges =
479                get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
480            (
481                all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
482                all_user_privileges
483                    .iter()
484                    .map(|ur| ur.user_id)
485                    .collect_vec(),
486            )
487        };
488
489        if revoke_grant_option {
490            UserPrivilege::update_many()
491                .col_expr(
492                    user_privilege::Column::WithGrantOption,
493                    SimpleExpr::Value(Value::Bool(Some(false))),
494                )
495                .filter(
496                    user_privilege::Column::Id
497                        .is_in(all_privilege_ids)
498                        .and(user_privilege::Column::WithGrantOption.eq(true)),
499                )
500                .exec(&txn)
501                .await?;
502        } else {
503            // The dependent privileges will be deleted cascade.
504            UserPrivilege::delete_many()
505                .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
506                .exec(&txn)
507                .await?;
508        }
509
510        let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
511
512        txn.commit().await?;
513
514        let version = self.notify_users_update(user_infos).await;
515        Ok(version)
516    }
517
518    pub async fn grant_default_privileges(
519        &self,
520        user_ids: Vec<UserId>,
521        database_id: DatabaseId,
522        schema_ids: Vec<SchemaId>,
523        grantor: UserId,
524        actions: Vec<PbAction>,
525        object_type: PbObjectType,
526        grantees: Vec<UserId>,
527        with_grant_option: bool,
528    ) -> MetaResult<()> {
529        tracing::debug!(
530            ?user_ids,
531            %database_id,
532            ?schema_ids,
533            ?actions,
534            ?object_type,
535            ?grantees,
536            with_grant_option,
537            "grant default privileges",
538        );
539        let inner = self.inner.write().await;
540        let txn = inner.db.begin().await?;
541        for user_id in &user_ids {
542            ensure_user_id(*user_id, &txn).await?;
543        }
544        ensure_object_id(ObjectType::Database, database_id, &txn).await?;
545        for schema_id in &schema_ids {
546            ensure_object_id(ObjectType::Schema, *schema_id, &txn).await?;
547        }
548        for grantee in &grantees {
549            ensure_user_id(*grantee, &txn).await?;
550        }
551        if object_type == PbObjectType::Schema {
552            assert!(
553                schema_ids.is_empty(),
554                "schema_ids should be empty when object_type is Schema"
555            );
556
557            // Note that the UNIQUE constraint does not treat NULL values as equal, we cannot rely on conflict check
558            // to update it and have to check existing default privileges manually.
559            let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
560            let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
561                .select_only()
562                .columns([
563                    user_default_privilege::Column::Id,
564                    user_default_privilege::Column::UserId,
565                    user_default_privilege::Column::Grantee,
566                    user_default_privilege::Column::Action,
567                ])
568                .filter(
569                    user_default_privilege::Column::DatabaseId
570                        .eq(database_id)
571                        .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
572                        .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
573                        .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
574                        .and(user_default_privilege::Column::Action.is_in(actions.clone())),
575                )
576                .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
577                .all(&txn)
578                .await?
579                .into_iter()
580                .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
581                .collect();
582
583            for user_id in user_ids {
584                for grantee in &grantees {
585                    for action in &actions {
586                        if let Some(existing_id) =
587                            existing_default_privileges.get(&(user_id, *grantee, *action))
588                            && with_grant_option
589                        {
590                            // If the default privilege already exists, we should update the grant option.
591                            UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
592                                id: Set(*existing_id),
593                                with_grant_option: Set(true),
594                                granted_by: Set(grantor as _),
595                                ..Default::default()
596                            })
597                            .exec(&txn)
598                            .await?;
599                        } else {
600                            UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
601                                id: Default::default(),
602                                database_id: Set(database_id),
603                                schema_id: Set(None),
604                                object_type: Set(ObjectType::Schema),
605                                for_materialized_view: Set(false),
606                                user_id: Set(user_id),
607                                grantee: Set(*grantee),
608                                granted_by: Set(grantor as _),
609                                action: Set(*action),
610                                with_grant_option: Set(with_grant_option),
611                            })
612                            .exec(&txn)
613                            .await?;
614                        }
615                    }
616                }
617            }
618        } else {
619            let mut default_privileges = vec![];
620            for user_id in user_ids {
621                for grantee in &grantees {
622                    for action in &actions {
623                        if schema_ids.is_empty() {
624                            default_privileges.push(user_default_privilege::ActiveModel {
625                                id: Default::default(),
626                                database_id: Set(database_id),
627                                schema_id: Set(None),
628                                object_type: Set(object_type.into()),
629                                for_materialized_view: Set(object_type == PbObjectType::Mview),
630                                user_id: Set(user_id),
631                                grantee: Set(*grantee),
632                                granted_by: Set(grantor as _),
633                                action: Set((*action).into()),
634                                with_grant_option: Set(with_grant_option),
635                            });
636                            continue;
637                        }
638                        for schema_id in &schema_ids {
639                            default_privileges.push(user_default_privilege::ActiveModel {
640                                id: Default::default(),
641                                database_id: Set(database_id),
642                                schema_id: Set(Some(*schema_id)),
643                                object_type: Set(object_type.into()),
644                                for_materialized_view: Set(object_type == PbObjectType::Mview),
645                                user_id: Set(user_id),
646                                grantee: Set(*grantee),
647                                granted_by: Set(grantor as _),
648                                action: Set((*action).into()),
649                                with_grant_option: Set(with_grant_option),
650                            });
651                        }
652                    }
653                }
654            }
655
656            let mut on_conflict = OnConflict::columns([
657                user_default_privilege::Column::UserId,
658                user_default_privilege::Column::DatabaseId,
659                user_default_privilege::Column::SchemaId,
660                user_default_privilege::Column::ObjectType,
661                user_default_privilege::Column::ForMaterializedView,
662                user_default_privilege::Column::Grantee,
663                user_default_privilege::Column::Action,
664            ]);
665            if with_grant_option {
666                on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
667            } else {
668                // Workaround to support MYSQL for `DO NOTHING`.
669                on_conflict.update_column(user_default_privilege::Column::UserId);
670            }
671            UserDefaultPrivilege::insert_many(default_privileges)
672                .on_conflict(on_conflict)
673                .do_nothing()
674                .exec(&txn)
675                .await?;
676        }
677
678        txn.commit().await?;
679        Ok(())
680    }
681
682    pub async fn revoke_default_privileges(
683        &self,
684        user_ids: Vec<UserId>,
685        database_id: DatabaseId,
686        schema_ids: Vec<SchemaId>,
687        actions: Vec<PbAction>,
688        object_type: PbObjectType,
689        grantees: Vec<UserId>,
690        revoke_grant_option: bool,
691    ) -> MetaResult<()> {
692        let inner = self.inner.write().await;
693        let txn = inner.db.begin().await?;
694        for user_id in &user_ids {
695            ensure_user_id(*user_id, &txn).await?;
696        }
697
698        let schema_filter = if schema_ids.is_empty() {
699            user_default_privilege::Column::SchemaId.is_null()
700        } else {
701            user_default_privilege::Column::SchemaId.is_in(schema_ids)
702        };
703        let filter = user_default_privilege::Column::DatabaseId
704            .eq(database_id)
705            .and(schema_filter)
706            .and(user_default_privilege::Column::UserId.is_in(user_ids))
707            .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
708            .and(user_default_privilege::Column::Grantee.is_in(grantees))
709            .and(
710                user_default_privilege::Column::Action
711                    .is_in(actions.iter().map(|&a| Action::from(a))),
712            );
713
714        if revoke_grant_option {
715            // update the `with_grant_option` field to false
716            let res = UserDefaultPrivilege::update_many()
717                .col_expr(
718                    user_default_privilege::Column::WithGrantOption,
719                    SimpleExpr::Value(Value::Bool(Some(false))),
720                )
721                .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
722                .exec(&txn)
723                .await?;
724            tracing::info!(
725                "revoke {count} grant option for default privileges",
726                count = res.rows_affected
727            );
728        } else {
729            let res = UserDefaultPrivilege::delete_many()
730                .filter(filter)
731                .exec(&txn)
732                .await?;
733            tracing::info!(
734                "revoke {count} default privileges",
735                count = res.rows_affected
736            );
737        }
738
739        txn.commit().await?;
740        Ok(())
741    }
742}
743
744#[cfg(test)]
745mod tests {
746    use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
747
748    use super::*;
749    use crate::manager::MetaSrvEnv;
750
751    const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
752    const TEST_ROOT_USER_ID: UserId = 1;
753
754    fn make_test_user(name: &str) -> PbUserInfo {
755        PbUserInfo {
756            name: name.to_owned(),
757            ..Default::default()
758        }
759    }
760
761    fn make_privilege(
762        object: PbObject,
763        actions: &[PbAction],
764        with_grant_option: bool,
765    ) -> PbGrantPrivilege {
766        PbGrantPrivilege {
767            object: Some(object),
768            action_with_opts: actions
769                .iter()
770                .map(|&action| PbActionWithGrantOption {
771                    action: action as _,
772                    with_grant_option,
773                    ..Default::default()
774                })
775                .collect(),
776        }
777    }
778
779    #[tokio::test]
780    async fn test_user_and_privilege() -> MetaResult<()> {
781        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
782        mgr.create_user(make_test_user("test_user_1")).await?;
783        mgr.create_user(make_test_user("test_user_2")).await?;
784        let user_1 = mgr.get_user_by_name("test_user_1").await?;
785        let user_2 = mgr.get_user_by_name("test_user_2").await?;
786
787        assert!(
788            mgr.create_user(make_test_user("test_user_1"))
789                .await
790                .is_err(),
791            "user_1 already exists"
792        );
793        mgr.update_user(
794            PbUserInfo {
795                id: user_1.user_id as _,
796                name: "test_user_1_new".to_owned(),
797                ..Default::default()
798            },
799            &[PbUpdateField::Rename],
800        )
801        .await?;
802        let user_1 = mgr.get_user(user_1.user_id).await?;
803        assert_eq!(user_1.name, "test_user_1_new".to_owned());
804
805        let conn_with_option = make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Connect], true);
806        let create_without_option =
807            make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Create], false);
808        // ROOT grant CONN with grant option to user_1.
809        mgr.grant_privilege(
810            vec![user_1.user_id],
811            std::slice::from_ref(&conn_with_option),
812            TEST_ROOT_USER_ID,
813            true,
814        )
815        .await?;
816        // ROOT grant CREATE without grant option to user_1.
817        mgr.grant_privilege(
818            vec![user_1.user_id],
819            std::slice::from_ref(&create_without_option),
820            TEST_ROOT_USER_ID,
821            false,
822        )
823        .await?;
824        // user_1 grant CONN with grant option to user_2.
825        mgr.grant_privilege(
826            vec![user_2.user_id],
827            std::slice::from_ref(&conn_with_option),
828            user_1.user_id,
829            true,
830        )
831        .await?;
832        mgr.grant_privilege(
833            vec![user_2.user_id],
834            std::slice::from_ref(&create_without_option),
835            user_1.user_id,
836            false,
837        )
838        .await?;
839
840        assert!(
841            mgr.drop_user(user_1.user_id).await.is_err(),
842            "user_1 can't be dropped"
843        );
844
845        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
846        assert_eq!(privilege_1.len(), 2);
847        assert!(
848            privilege_1
849                .iter()
850                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
851                    && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32)
852        );
853
854        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
855        assert_eq!(privilege_2.len(), 1);
856        assert!(
857            privilege_2
858                .iter()
859                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
860                    && gp.action_with_opts[0].granted_by == user_1.user_id as u32
861                    && gp.action_with_opts[0].with_grant_option)
862        );
863
864        // revoke privilege for others by non-super user.
865        assert!(
866            mgr.revoke_privilege(
867                vec![user_1.user_id],
868                std::slice::from_ref(&conn_with_option),
869                TEST_ROOT_USER_ID,
870                user_2.user_id,
871                false,
872                false
873            )
874            .await
875            .is_err(),
876            "user_2 can't revoke for user_1"
877        );
878
879        // revoke privilege without grant option.
880        assert!(
881            mgr.revoke_privilege(
882                vec![user_2.user_id],
883                std::slice::from_ref(&create_without_option),
884                user_1.user_id,
885                user_1.user_id,
886                false,
887                false
888            )
889            .await
890            .is_err(),
891            "user_2 don't have grant option for CREATE"
892        );
893
894        // revoke referred privilege in restrict mode.
895        assert!(
896            mgr.revoke_privilege(
897                vec![user_1.user_id],
898                std::slice::from_ref(&conn_with_option),
899                TEST_ROOT_USER_ID,
900                TEST_ROOT_USER_ID,
901                false,
902                false
903            )
904            .await
905            .is_err(),
906            "permission deny in restrict mode, CONN granted to user_2"
907        );
908
909        // revoke non-referred privilege in restrict mode.
910        mgr.revoke_privilege(
911            vec![user_1.user_id],
912            std::slice::from_ref(&create_without_option),
913            TEST_ROOT_USER_ID,
914            TEST_ROOT_USER_ID,
915            false,
916            false,
917        )
918        .await?;
919
920        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
921        assert_eq!(privilege_1.len(), 1);
922        assert!(
923            privilege_1
924                .iter()
925                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
926                    && gp.action_with_opts[0].action == PbAction::Connect as i32)
927        );
928
929        // revoke grant option for referred privilege in cascade mode.
930        mgr.revoke_privilege(
931            vec![user_1.user_id],
932            std::slice::from_ref(&conn_with_option),
933            TEST_ROOT_USER_ID,
934            TEST_ROOT_USER_ID,
935            true,
936            true,
937        )
938        .await?;
939        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
940        assert_eq!(privilege_1.len(), 1);
941        assert!(
942            privilege_1
943                .iter()
944                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
945                    && gp.action_with_opts[0].action == PbAction::Connect as i32
946                    && !gp.action_with_opts[0].with_grant_option)
947        );
948        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
949        assert_eq!(privilege_2.len(), 1);
950        assert!(
951            privilege_2
952                .iter()
953                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
954                    && gp.action_with_opts[0].action == PbAction::Connect as i32
955                    && !gp.action_with_opts[0].with_grant_option)
956        );
957
958        // revoke referred privilege in cascade mode.
959        mgr.revoke_privilege(
960            vec![user_1.user_id],
961            std::slice::from_ref(&conn_with_option),
962            TEST_ROOT_USER_ID,
963            TEST_ROOT_USER_ID,
964            false,
965            true,
966        )
967        .await?;
968        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
969        assert!(privilege_1.is_empty());
970        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
971        assert!(privilege_2.is_empty());
972
973        mgr.drop_user(user_1.user_id).await?;
974        mgr.drop_user(user_2.user_id).await?;
975        Ok(())
976    }
977}