risingwave_meta/controller/
user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_common::id::{JobId, TableId};
20use risingwave_meta_model::object::ObjectType;
21use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
22use risingwave_meta_model::user_privilege::Action;
23use risingwave_meta_model::{
24    AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
25    user_default_privilege, user_privilege,
26};
27use risingwave_pb::common::PbObjectType;
28use risingwave_pb::meta::subscribe_response::{
29    Info as NotificationInfo, Operation as NotificationOperation,
30};
31use risingwave_pb::user::update_user_request::PbUpdateField;
32use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
33use sea_orm::ActiveValue::Set;
34use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
35use sea_orm::{
36    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
37    QuerySelect, TransactionTrait,
38};
39
40use crate::controller::catalog::CatalogController;
41use crate::controller::utils::{
42    PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
43    ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
44    get_index_state_tables_by_table_id, get_internal_tables_by_id, get_object_owner,
45    get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
46};
47use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
48use crate::{MetaError, MetaResult};
49
50impl CatalogController {
51    pub(crate) async fn notify_users_update(
52        &self,
53        user_infos: Vec<PbUserInfo>,
54    ) -> NotificationVersion {
55        let mut version = 0;
56        for info in user_infos {
57            version = self
58                .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
59                .await;
60        }
61        version
62    }
63
64    pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
65        let inner = self.inner.write().await;
66        let txn = inner.db.begin().await?;
67        check_user_name_duplicate(&pb_user.name, &txn).await?;
68
69        let grant_privileges = pb_user.grant_privileges.clone();
70        let user: user::ActiveModel = pb_user.into();
71        let user = user.insert(&txn).await?;
72
73        if !grant_privileges.is_empty() {
74            let mut privileges = vec![];
75            for gp in &grant_privileges {
76                let id = extract_grant_obj_id(gp.get_object()?);
77                for action_with_opt in &gp.action_with_opts {
78                    privileges.push(user_privilege::ActiveModel {
79                        user_id: Set(user.user_id),
80                        oid: Set(id),
81                        granted_by: Set(action_with_opt.granted_by as _),
82                        action: Set(action_with_opt.get_action()?.into()),
83                        with_grant_option: Set(action_with_opt.with_grant_option),
84                        ..Default::default()
85                    });
86                }
87            }
88            UserPrivilege::insert_many(privileges).exec(&txn).await?;
89        }
90        txn.commit().await?;
91
92        let mut user_info: PbUserInfo = user.into();
93        user_info.grant_privileges = grant_privileges;
94        let version = self
95            .notify_frontend(
96                NotificationOperation::Add,
97                NotificationInfo::User(user_info),
98            )
99            .await;
100
101        Ok(version)
102    }
103
104    pub async fn update_user(
105        &self,
106        update_user: PbUserInfo,
107        update_fields: &[PbUpdateField],
108    ) -> MetaResult<NotificationVersion> {
109        let inner = self.inner.write().await;
110        let rename_flag = update_fields.contains(&PbUpdateField::Rename);
111        if rename_flag {
112            check_user_name_duplicate(&update_user.name, &inner.db).await?;
113        }
114
115        let user = User::find_by_id(update_user.id as UserId)
116            .one(&inner.db)
117            .await?
118            .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
119        let mut user = user.into_active_model();
120        update_fields.iter().for_each(|&field| match field {
121            PbUpdateField::Unspecified => unreachable!(),
122            PbUpdateField::Super => user.is_super = Set(update_user.is_super),
123            PbUpdateField::Login => user.can_login = Set(update_user.can_login),
124            PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
125            PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
126            PbUpdateField::AuthInfo => {
127                user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
128            }
129            PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
130            PbUpdateField::Admin => user.is_admin = Set(update_user.is_admin),
131        });
132
133        let user = user.update(&inner.db).await?;
134        let mut user_info: PbUserInfo = user.into();
135        user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
136        let version = self
137            .notify_frontend(
138                NotificationOperation::Update,
139                NotificationInfo::User(user_info),
140            )
141            .await;
142
143        Ok(version)
144    }
145
146    #[cfg(test)]
147    pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
148        let inner = self.inner.read().await;
149        let user = User::find_by_id(id)
150            .one(&inner.db)
151            .await?
152            .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
153        Ok(user)
154    }
155
156    #[cfg(test)]
157    pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
158        let inner = self.inner.read().await;
159        let user = User::find()
160            .filter(user::Column::Name.eq(name))
161            .one(&inner.db)
162            .await?
163            .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
164        Ok(user)
165    }
166
167    pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
168        let inner = self.inner.write().await;
169        let txn = inner.db.begin().await?;
170        let user = User::find_by_id(user_id)
171            .one(&txn)
172            .await?
173            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
174        if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
175            return Err(MetaError::permission_denied(format!(
176                "drop default super user {} is not allowed",
177                user.name
178            )));
179        }
180
181        // check if the user is the owner of any objects.
182        let count = Object::find()
183            .filter(object::Column::OwnerId.eq(user_id))
184            .count(&txn)
185            .await?;
186        if count != 0 {
187            return Err(MetaError::permission_denied(format!(
188                "drop user {} is not allowed, because it owns {} objects",
189                user.name, count
190            )));
191        }
192
193        // check if the user granted any privileges to other users.
194        let count = UserPrivilege::find()
195            .filter(user_privilege::Column::GrantedBy.eq(user_id))
196            .count(&txn)
197            .await?;
198        if count != 0 {
199            return Err(MetaError::permission_denied(format!(
200                "drop user {} is not allowed, because it granted {} privileges to others",
201                user.name, count
202            )));
203        }
204
205        let res = User::delete_by_id(user_id).exec(&txn).await?;
206        if res.rows_affected != 1 {
207            return Err(MetaError::catalog_id_not_found("user", user_id));
208        }
209        txn.commit().await?;
210
211        let version = self
212            .notify_frontend(
213                NotificationOperation::Delete,
214                NotificationInfo::User(PbUserInfo {
215                    id: user_id as _,
216                    ..Default::default()
217                }),
218            )
219            .await;
220
221        Ok(version)
222    }
223
224    pub async fn grant_privilege(
225        &self,
226        user_ids: Vec<UserId>,
227        new_grant_privileges: &[PbGrantPrivilege],
228        grantor: UserId,
229        with_grant_option: bool,
230    ) -> MetaResult<NotificationVersion> {
231        let inner = self.inner.write().await;
232        let txn = inner.db.begin().await?;
233        for user_id in &user_ids {
234            ensure_user_id(*user_id, &txn).await?;
235        }
236
237        let mut privileges = vec![];
238        for gp in new_grant_privileges {
239            let id = extract_grant_obj_id(gp.get_object()?);
240            let internal_table_ids = get_internal_tables_by_id(JobId::new(id as _), &txn).await?;
241            let index_state_table_ids =
242                get_index_state_tables_by_table_id(TableId::new(id as _), &txn).await?;
243            for action_with_opt in &gp.action_with_opts {
244                let action = action_with_opt.get_action()?.into();
245                privileges.push(user_privilege::ActiveModel {
246                    oid: Set(id),
247                    granted_by: Set(grantor),
248                    action: Set(action),
249                    with_grant_option: Set(with_grant_option),
250                    ..Default::default()
251                });
252                if action == Action::Select {
253                    privileges.extend(
254                        internal_table_ids
255                            .iter()
256                            .chain(index_state_table_ids.iter())
257                            .map(|&tid| user_privilege::ActiveModel {
258                                oid: Set(tid.as_raw_id() as _),
259                                granted_by: Set(grantor),
260                                action: Set(Action::Select),
261                                with_grant_option: Set(with_grant_option),
262                                ..Default::default()
263                            }),
264                    );
265                }
266            }
267        }
268
269        // check whether grantor has the privilege to grant the privilege.
270        let user = User::find_by_id(grantor)
271            .one(&txn)
272            .await?
273            .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
274        let mut filtered_privileges = vec![];
275        if !user.is_super {
276            for mut privilege in privileges {
277                if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
278                    filtered_privileges.push(privilege);
279                    continue;
280                }
281                let filter = user_privilege::Column::UserId
282                    .eq(grantor)
283                    .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
284                    .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
285                    .and(user_privilege::Column::WithGrantOption.eq(true));
286                let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
287                    .select_only()
288                    .column(user_privilege::Column::Id)
289                    .filter(filter)
290                    .into_tuple()
291                    .one(&txn)
292                    .await?;
293                let Some(privilege_id) = privilege_id else {
294                    tracing::warn!(
295                        "user {} don't have privilege {:?} or grant option",
296                        grantor,
297                        privilege.action
298                    );
299                    continue;
300                };
301                privilege.dependent_id = Set(Some(privilege_id));
302                filtered_privileges.push(privilege);
303            }
304        } else {
305            filtered_privileges = privileges;
306        }
307
308        // insert privileges
309        let user_privileges = user_ids
310            .iter()
311            .flat_map(|user_id| {
312                filtered_privileges.iter().map(|p| {
313                    let mut p = p.clone();
314                    p.user_id = Set(*user_id);
315                    p
316                })
317            })
318            .collect_vec();
319        for privilege in user_privileges {
320            let mut on_conflict = OnConflict::columns([
321                user_privilege::Column::UserId,
322                user_privilege::Column::Oid,
323                user_privilege::Column::Action,
324                user_privilege::Column::GrantedBy,
325            ]);
326            if *privilege.with_grant_option.as_ref() {
327                on_conflict.update_column(user_privilege::Column::WithGrantOption);
328            } else {
329                // Workaround to support MYSQL for `DO NOTHING`.
330                on_conflict.update_column(user_privilege::Column::UserId);
331            }
332
333            UserPrivilege::insert(privilege)
334                .on_conflict(on_conflict)
335                .do_nothing()
336                .exec(&txn)
337                .await?;
338        }
339
340        let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
341
342        txn.commit().await?;
343
344        let version = self.notify_users_update(user_infos).await;
345        Ok(version)
346    }
347
348    pub async fn revoke_privilege(
349        &self,
350        user_ids: Vec<UserId>,
351        revoke_grant_privileges: &[PbGrantPrivilege],
352        granted_by: UserId,
353        revoke_by: UserId,
354        revoke_grant_option: bool,
355        cascade: bool,
356    ) -> MetaResult<NotificationVersion> {
357        let inner = self.inner.write().await;
358        let txn = inner.db.begin().await?;
359        for user_id in &user_ids {
360            ensure_user_id(*user_id, &txn).await?;
361        }
362        // check whether revoke has the privilege to grant the privilege.
363        let revoke_user = User::find_by_id(revoke_by)
364            .one(&txn)
365            .await?
366            .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
367
368        // check whether user can revoke the privilege.
369        if !revoke_user.is_super && granted_by != revoke_by {
370            let granted_user_name: String = User::find_by_id(granted_by)
371                .select_only()
372                .column(user::Column::Name)
373                .into_tuple()
374                .one(&txn)
375                .await?
376                .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
377            return Err(MetaError::permission_denied(format!(
378                "user {} is not super, can't revoke privileges for {}",
379                revoke_user.name, granted_user_name
380            )));
381        }
382
383        let mut revoke_items = HashMap::new();
384        for privilege in revoke_grant_privileges {
385            let obj = extract_grant_obj_id(privilege.get_object()?);
386            let internal_table_ids = get_internal_tables_by_id(JobId::new(obj as _), &txn).await?;
387            let index_state_table_ids =
388                get_index_state_tables_by_table_id(TableId::new(obj as _), &txn).await?;
389            let mut include_select = false;
390            let actions = privilege
391                .action_with_opts
392                .iter()
393                .map(|ao| {
394                    let action = Action::from(ao.get_action().unwrap());
395                    if action == Action::Select {
396                        include_select = true;
397                    }
398                    action
399                })
400                .collect_vec();
401            revoke_items.insert(obj, actions);
402            if include_select {
403                revoke_items.extend(
404                    internal_table_ids
405                        .iter()
406                        .chain(index_state_table_ids.iter())
407                        .map(|&tid| (tid.as_raw_id() as _, vec![Action::Select])),
408                );
409            }
410        }
411
412        let filter = if !revoke_user.is_super {
413            // ensure user have grant options or is owner of the object.
414            for (obj, actions) in &revoke_items {
415                if revoke_by == get_object_owner(*obj, &txn).await? {
416                    continue;
417                }
418                let owned_actions: HashSet<Action> = UserPrivilege::find()
419                    .select_only()
420                    .column(user_privilege::Column::Action)
421                    .filter(
422                        user_privilege::Column::UserId
423                            .eq(granted_by)
424                            .and(user_privilege::Column::Oid.eq(*obj))
425                            .and(user_privilege::Column::WithGrantOption.eq(true)),
426                    )
427                    .into_tuple::<Action>()
428                    .all(&txn)
429                    .await?
430                    .into_iter()
431                    .collect();
432                if actions.iter().any(|ac| !owned_actions.contains(ac)) {
433                    return Err(MetaError::permission_denied(format!(
434                        "user {} don't have privileges {:?} or grant option",
435                        revoke_user.name, actions,
436                    )));
437                }
438            }
439
440            user_privilege::Column::GrantedBy
441                .eq(granted_by)
442                .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
443        } else {
444            user_privilege::Column::UserId.is_in(user_ids.clone())
445        };
446        let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
447        for (obj, actions) in revoke_items {
448            let filter = filter
449                .clone()
450                .and(user_privilege::Column::Oid.eq(obj))
451                .and(user_privilege::Column::Action.is_in(actions));
452            root_user_privileges.extend(
453                UserPrivilege::find()
454                    .select_only()
455                    .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
456                    .filter(filter)
457                    .into_partial_model()
458                    .all(&txn)
459                    .await?,
460            );
461        }
462        if root_user_privileges.is_empty() {
463            tracing::warn!("no privilege to revoke, ignore it");
464            return Ok(IGNORED_NOTIFICATION_VERSION);
465        }
466
467        // check if the user granted any privileges to other users.
468        let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
469        let (all_privilege_ids, to_update_user_ids) = if !cascade {
470            ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
471            (
472                root_privilege_ids.clone(),
473                root_user_privileges
474                    .iter()
475                    .map(|ur| ur.user_id)
476                    .collect_vec(),
477            )
478        } else {
479            let all_user_privileges =
480                get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
481            (
482                all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
483                all_user_privileges
484                    .iter()
485                    .map(|ur| ur.user_id)
486                    .collect_vec(),
487            )
488        };
489
490        if revoke_grant_option {
491            UserPrivilege::update_many()
492                .col_expr(
493                    user_privilege::Column::WithGrantOption,
494                    SimpleExpr::Value(Value::Bool(Some(false))),
495                )
496                .filter(
497                    user_privilege::Column::Id
498                        .is_in(all_privilege_ids)
499                        .and(user_privilege::Column::WithGrantOption.eq(true)),
500                )
501                .exec(&txn)
502                .await?;
503        } else {
504            // The dependent privileges will be deleted cascade.
505            UserPrivilege::delete_many()
506                .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
507                .exec(&txn)
508                .await?;
509        }
510
511        let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
512
513        txn.commit().await?;
514
515        let version = self.notify_users_update(user_infos).await;
516        Ok(version)
517    }
518
519    pub async fn grant_default_privileges(
520        &self,
521        user_ids: Vec<UserId>,
522        database_id: DatabaseId,
523        schema_ids: Vec<SchemaId>,
524        grantor: UserId,
525        actions: Vec<PbAction>,
526        object_type: PbObjectType,
527        grantees: Vec<UserId>,
528        with_grant_option: bool,
529    ) -> MetaResult<()> {
530        tracing::debug!(
531            ?user_ids,
532            %database_id,
533            ?schema_ids,
534            ?actions,
535            ?object_type,
536            ?grantees,
537            with_grant_option,
538            "grant default privileges",
539        );
540        let inner = self.inner.write().await;
541        let txn = inner.db.begin().await?;
542        for user_id in &user_ids {
543            ensure_user_id(*user_id, &txn).await?;
544        }
545        ensure_object_id(ObjectType::Database, database_id.as_raw_id() as _, &txn).await?;
546        for schema_id in &schema_ids {
547            ensure_object_id(ObjectType::Schema, schema_id.as_raw_id() as _, &txn).await?;
548        }
549        for grantee in &grantees {
550            ensure_user_id(*grantee, &txn).await?;
551        }
552        if object_type == PbObjectType::Schema {
553            assert!(
554                schema_ids.is_empty(),
555                "schema_ids should be empty when object_type is Schema"
556            );
557
558            // Note that the UNIQUE constraint does not treat NULL values as equal, we cannot rely on conflict check
559            // to update it and have to check existing default privileges manually.
560            let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
561            let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
562                .select_only()
563                .columns([
564                    user_default_privilege::Column::Id,
565                    user_default_privilege::Column::UserId,
566                    user_default_privilege::Column::Grantee,
567                    user_default_privilege::Column::Action,
568                ])
569                .filter(
570                    user_default_privilege::Column::DatabaseId
571                        .eq(database_id)
572                        .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
573                        .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
574                        .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
575                        .and(user_default_privilege::Column::Action.is_in(actions.clone())),
576                )
577                .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
578                .all(&txn)
579                .await?
580                .into_iter()
581                .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
582                .collect();
583
584            for user_id in user_ids {
585                for grantee in &grantees {
586                    for action in &actions {
587                        if let Some(existing_id) =
588                            existing_default_privileges.get(&(user_id, *grantee, *action))
589                            && with_grant_option
590                        {
591                            // If the default privilege already exists, we should update the grant option.
592                            UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
593                                id: Set(*existing_id),
594                                with_grant_option: Set(true),
595                                granted_by: Set(grantor as _),
596                                ..Default::default()
597                            })
598                            .exec(&txn)
599                            .await?;
600                        } else {
601                            UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
602                                id: Default::default(),
603                                database_id: Set(database_id),
604                                schema_id: Set(None),
605                                object_type: Set(ObjectType::Schema),
606                                for_materialized_view: Set(false),
607                                user_id: Set(user_id),
608                                grantee: Set(*grantee),
609                                granted_by: Set(grantor as _),
610                                action: Set(*action),
611                                with_grant_option: Set(with_grant_option),
612                            })
613                            .exec(&txn)
614                            .await?;
615                        }
616                    }
617                }
618            }
619        } else {
620            let mut default_privileges = vec![];
621            for user_id in user_ids {
622                for grantee in &grantees {
623                    for action in &actions {
624                        if schema_ids.is_empty() {
625                            default_privileges.push(user_default_privilege::ActiveModel {
626                                id: Default::default(),
627                                database_id: Set(database_id),
628                                schema_id: Set(None),
629                                object_type: Set(object_type.into()),
630                                for_materialized_view: Set(object_type == PbObjectType::Mview),
631                                user_id: Set(user_id),
632                                grantee: Set(*grantee),
633                                granted_by: Set(grantor as _),
634                                action: Set((*action).into()),
635                                with_grant_option: Set(with_grant_option),
636                            });
637                            continue;
638                        }
639                        for schema_id in &schema_ids {
640                            default_privileges.push(user_default_privilege::ActiveModel {
641                                id: Default::default(),
642                                database_id: Set(database_id),
643                                schema_id: Set(Some(*schema_id)),
644                                object_type: Set(object_type.into()),
645                                for_materialized_view: Set(object_type == PbObjectType::Mview),
646                                user_id: Set(user_id),
647                                grantee: Set(*grantee),
648                                granted_by: Set(grantor as _),
649                                action: Set((*action).into()),
650                                with_grant_option: Set(with_grant_option),
651                            });
652                        }
653                    }
654                }
655            }
656
657            let mut on_conflict = OnConflict::columns([
658                user_default_privilege::Column::UserId,
659                user_default_privilege::Column::DatabaseId,
660                user_default_privilege::Column::SchemaId,
661                user_default_privilege::Column::ObjectType,
662                user_default_privilege::Column::ForMaterializedView,
663                user_default_privilege::Column::Grantee,
664                user_default_privilege::Column::Action,
665            ]);
666            if with_grant_option {
667                on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
668            } else {
669                // Workaround to support MYSQL for `DO NOTHING`.
670                on_conflict.update_column(user_default_privilege::Column::UserId);
671            }
672            UserDefaultPrivilege::insert_many(default_privileges)
673                .on_conflict(on_conflict)
674                .do_nothing()
675                .exec(&txn)
676                .await?;
677        }
678
679        txn.commit().await?;
680        Ok(())
681    }
682
683    pub async fn revoke_default_privileges(
684        &self,
685        user_ids: Vec<UserId>,
686        database_id: DatabaseId,
687        schema_ids: Vec<SchemaId>,
688        actions: Vec<PbAction>,
689        object_type: PbObjectType,
690        grantees: Vec<UserId>,
691        revoke_grant_option: bool,
692    ) -> MetaResult<()> {
693        let inner = self.inner.write().await;
694        let txn = inner.db.begin().await?;
695        for user_id in &user_ids {
696            ensure_user_id(*user_id, &txn).await?;
697        }
698
699        let schema_filter = if schema_ids.is_empty() {
700            user_default_privilege::Column::SchemaId.is_null()
701        } else {
702            user_default_privilege::Column::SchemaId.is_in(schema_ids)
703        };
704        let filter = user_default_privilege::Column::DatabaseId
705            .eq(database_id)
706            .and(schema_filter)
707            .and(user_default_privilege::Column::UserId.is_in(user_ids))
708            .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
709            .and(user_default_privilege::Column::Grantee.is_in(grantees))
710            .and(
711                user_default_privilege::Column::Action
712                    .is_in(actions.iter().map(|&a| Action::from(a))),
713            );
714
715        if revoke_grant_option {
716            // update the `with_grant_option` field to false
717            let res = UserDefaultPrivilege::update_many()
718                .col_expr(
719                    user_default_privilege::Column::WithGrantOption,
720                    SimpleExpr::Value(Value::Bool(Some(false))),
721                )
722                .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
723                .exec(&txn)
724                .await?;
725            tracing::info!(
726                "revoke {count} grant option for default privileges",
727                count = res.rows_affected
728            );
729        } else {
730            let res = UserDefaultPrivilege::delete_many()
731                .filter(filter)
732                .exec(&txn)
733                .await?;
734            tracing::info!(
735                "revoke {count} default privileges",
736                count = res.rows_affected
737            );
738        }
739
740        txn.commit().await?;
741        Ok(())
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
748
749    use super::*;
750    use crate::manager::MetaSrvEnv;
751
752    const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
753    const TEST_ROOT_USER_ID: UserId = 1;
754
755    fn make_test_user(name: &str) -> PbUserInfo {
756        PbUserInfo {
757            name: name.to_owned(),
758            ..Default::default()
759        }
760    }
761
762    fn make_privilege(
763        object: PbObject,
764        actions: &[PbAction],
765        with_grant_option: bool,
766    ) -> PbGrantPrivilege {
767        PbGrantPrivilege {
768            object: Some(object),
769            action_with_opts: actions
770                .iter()
771                .map(|&action| PbActionWithGrantOption {
772                    action: action as _,
773                    with_grant_option,
774                    ..Default::default()
775                })
776                .collect(),
777        }
778    }
779
780    #[tokio::test]
781    async fn test_user_and_privilege() -> MetaResult<()> {
782        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
783        mgr.create_user(make_test_user("test_user_1")).await?;
784        mgr.create_user(make_test_user("test_user_2")).await?;
785        let user_1 = mgr.get_user_by_name("test_user_1").await?;
786        let user_2 = mgr.get_user_by_name("test_user_2").await?;
787
788        assert!(
789            mgr.create_user(make_test_user("test_user_1"))
790                .await
791                .is_err(),
792            "user_1 already exists"
793        );
794        mgr.update_user(
795            PbUserInfo {
796                id: user_1.user_id as _,
797                name: "test_user_1_new".to_owned(),
798                ..Default::default()
799            },
800            &[PbUpdateField::Rename],
801        )
802        .await?;
803        let user_1 = mgr.get_user(user_1.user_id).await?;
804        assert_eq!(user_1.name, "test_user_1_new".to_owned());
805
806        let conn_with_option = make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Connect], true);
807        let create_without_option =
808            make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Create], false);
809        // ROOT grant CONN with grant option to user_1.
810        mgr.grant_privilege(
811            vec![user_1.user_id],
812            std::slice::from_ref(&conn_with_option),
813            TEST_ROOT_USER_ID,
814            true,
815        )
816        .await?;
817        // ROOT grant CREATE without grant option to user_1.
818        mgr.grant_privilege(
819            vec![user_1.user_id],
820            std::slice::from_ref(&create_without_option),
821            TEST_ROOT_USER_ID,
822            false,
823        )
824        .await?;
825        // user_1 grant CONN with grant option to user_2.
826        mgr.grant_privilege(
827            vec![user_2.user_id],
828            std::slice::from_ref(&conn_with_option),
829            user_1.user_id,
830            true,
831        )
832        .await?;
833        mgr.grant_privilege(
834            vec![user_2.user_id],
835            std::slice::from_ref(&create_without_option),
836            user_1.user_id,
837            false,
838        )
839        .await?;
840
841        assert!(
842            mgr.drop_user(user_1.user_id).await.is_err(),
843            "user_1 can't be dropped"
844        );
845
846        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
847        assert_eq!(privilege_1.len(), 2);
848        assert!(
849            privilege_1
850                .iter()
851                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
852                    && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32)
853        );
854
855        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
856        assert_eq!(privilege_2.len(), 1);
857        assert!(
858            privilege_2
859                .iter()
860                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
861                    && gp.action_with_opts[0].granted_by == user_1.user_id as u32
862                    && gp.action_with_opts[0].with_grant_option)
863        );
864
865        // revoke privilege for others by non-super user.
866        assert!(
867            mgr.revoke_privilege(
868                vec![user_1.user_id],
869                std::slice::from_ref(&conn_with_option),
870                TEST_ROOT_USER_ID,
871                user_2.user_id,
872                false,
873                false
874            )
875            .await
876            .is_err(),
877            "user_2 can't revoke for user_1"
878        );
879
880        // revoke privilege without grant option.
881        assert!(
882            mgr.revoke_privilege(
883                vec![user_2.user_id],
884                std::slice::from_ref(&create_without_option),
885                user_1.user_id,
886                user_1.user_id,
887                false,
888                false
889            )
890            .await
891            .is_err(),
892            "user_2 don't have grant option for CREATE"
893        );
894
895        // revoke referred privilege in restrict mode.
896        assert!(
897            mgr.revoke_privilege(
898                vec![user_1.user_id],
899                std::slice::from_ref(&conn_with_option),
900                TEST_ROOT_USER_ID,
901                TEST_ROOT_USER_ID,
902                false,
903                false
904            )
905            .await
906            .is_err(),
907            "permission deny in restrict mode, CONN granted to user_2"
908        );
909
910        // revoke non-referred privilege in restrict mode.
911        mgr.revoke_privilege(
912            vec![user_1.user_id],
913            std::slice::from_ref(&create_without_option),
914            TEST_ROOT_USER_ID,
915            TEST_ROOT_USER_ID,
916            false,
917            false,
918        )
919        .await?;
920
921        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
922        assert_eq!(privilege_1.len(), 1);
923        assert!(
924            privilege_1
925                .iter()
926                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
927                    && gp.action_with_opts[0].action == PbAction::Connect as i32)
928        );
929
930        // revoke grant option for referred privilege in cascade mode.
931        mgr.revoke_privilege(
932            vec![user_1.user_id],
933            std::slice::from_ref(&conn_with_option),
934            TEST_ROOT_USER_ID,
935            TEST_ROOT_USER_ID,
936            true,
937            true,
938        )
939        .await?;
940        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
941        assert_eq!(privilege_1.len(), 1);
942        assert!(
943            privilege_1
944                .iter()
945                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
946                    && gp.action_with_opts[0].action == PbAction::Connect as i32
947                    && !gp.action_with_opts[0].with_grant_option)
948        );
949        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
950        assert_eq!(privilege_2.len(), 1);
951        assert!(
952            privilege_2
953                .iter()
954                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
955                    && gp.action_with_opts[0].action == PbAction::Connect as i32
956                    && !gp.action_with_opts[0].with_grant_option)
957        );
958
959        // revoke referred privilege in cascade mode.
960        mgr.revoke_privilege(
961            vec![user_1.user_id],
962            std::slice::from_ref(&conn_with_option),
963            TEST_ROOT_USER_ID,
964            TEST_ROOT_USER_ID,
965            false,
966            true,
967        )
968        .await?;
969        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
970        assert!(privilege_1.is_empty());
971        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
972        assert!(privilege_2.is_empty());
973
974        mgr.drop_user(user_1.user_id).await?;
975        mgr.drop_user(user_2.user_id).await?;
976        Ok(())
977    }
978}