risingwave_meta/controller/
user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
21use risingwave_meta_model::user_privilege::Action;
22use risingwave_meta_model::{
23    AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
24    user_default_privilege, user_privilege,
25};
26use risingwave_pb::common::PbObjectType;
27use risingwave_pb::meta::subscribe_response::{
28    Info as NotificationInfo, Operation as NotificationOperation,
29};
30use risingwave_pb::user::update_user_request::PbUpdateField;
31use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
32use sea_orm::ActiveValue::Set;
33use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
34use sea_orm::{
35    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
36    QuerySelect, TransactionTrait,
37};
38
39use crate::controller::catalog::CatalogController;
40use crate::controller::utils::{
41    PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
42    ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
43    get_index_state_tables_by_table_id, get_internal_tables_by_id, get_object_owner,
44    get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
45};
46use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
47use crate::{MetaError, MetaResult};
48
49impl CatalogController {
50    pub(crate) async fn notify_users_update(
51        &self,
52        user_infos: Vec<PbUserInfo>,
53    ) -> NotificationVersion {
54        let mut version = 0;
55        for info in user_infos {
56            version = self
57                .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
58                .await;
59        }
60        version
61    }
62
63    pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
64        let inner = self.inner.write().await;
65        let txn = inner.db.begin().await?;
66        check_user_name_duplicate(&pb_user.name, &txn).await?;
67
68        let grant_privileges = pb_user.grant_privileges.clone();
69        let user: user::ActiveModel = pb_user.into();
70        let user = user.insert(&txn).await?;
71
72        if !grant_privileges.is_empty() {
73            let mut privileges = vec![];
74            for gp in &grant_privileges {
75                let id = extract_grant_obj_id(gp.get_object()?);
76                for action_with_opt in &gp.action_with_opts {
77                    privileges.push(user_privilege::ActiveModel {
78                        user_id: Set(user.user_id),
79                        oid: Set(id),
80                        granted_by: Set(action_with_opt.granted_by as _),
81                        action: Set(action_with_opt.get_action()?.into()),
82                        with_grant_option: Set(action_with_opt.with_grant_option),
83                        ..Default::default()
84                    });
85                }
86            }
87            UserPrivilege::insert_many(privileges).exec(&txn).await?;
88        }
89        txn.commit().await?;
90
91        let mut user_info: PbUserInfo = user.into();
92        user_info.grant_privileges = grant_privileges;
93        let version = self
94            .notify_frontend(
95                NotificationOperation::Add,
96                NotificationInfo::User(user_info),
97            )
98            .await;
99
100        Ok(version)
101    }
102
103    pub async fn update_user(
104        &self,
105        update_user: PbUserInfo,
106        update_fields: &[PbUpdateField],
107    ) -> MetaResult<NotificationVersion> {
108        let inner = self.inner.write().await;
109        let rename_flag = update_fields.contains(&PbUpdateField::Rename);
110        if rename_flag {
111            check_user_name_duplicate(&update_user.name, &inner.db).await?;
112        }
113
114        let user = User::find_by_id(update_user.id as UserId)
115            .one(&inner.db)
116            .await?
117            .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
118        let mut user = user.into_active_model();
119        update_fields.iter().for_each(|&field| match field {
120            PbUpdateField::Unspecified => unreachable!(),
121            PbUpdateField::Super => user.is_super = Set(update_user.is_super),
122            PbUpdateField::Login => user.can_login = Set(update_user.can_login),
123            PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
124            PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
125            PbUpdateField::AuthInfo => {
126                user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
127            }
128            PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
129            PbUpdateField::Admin => user.is_admin = Set(update_user.is_admin),
130        });
131
132        let user = user.update(&inner.db).await?;
133        let mut user_info: PbUserInfo = user.into();
134        user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
135        let version = self
136            .notify_frontend(
137                NotificationOperation::Update,
138                NotificationInfo::User(user_info),
139            )
140            .await;
141
142        Ok(version)
143    }
144
145    #[cfg(test)]
146    pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
147        let inner = self.inner.read().await;
148        let user = User::find_by_id(id)
149            .one(&inner.db)
150            .await?
151            .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
152        Ok(user)
153    }
154
155    #[cfg(test)]
156    pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
157        let inner = self.inner.read().await;
158        let user = User::find()
159            .filter(user::Column::Name.eq(name))
160            .one(&inner.db)
161            .await?
162            .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
163        Ok(user)
164    }
165
166    pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
167        let inner = self.inner.write().await;
168        let txn = inner.db.begin().await?;
169        let user = User::find_by_id(user_id)
170            .one(&txn)
171            .await?
172            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
173        if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
174            return Err(MetaError::permission_denied(format!(
175                "drop default super user {} is not allowed",
176                user.name
177            )));
178        }
179
180        // check if the user is the owner of any objects.
181        let count = Object::find()
182            .filter(object::Column::OwnerId.eq(user_id))
183            .count(&txn)
184            .await?;
185        if count != 0 {
186            return Err(MetaError::permission_denied(format!(
187                "drop user {} is not allowed, because it owns {} objects",
188                user.name, count
189            )));
190        }
191
192        // check if the user granted any privileges to other users.
193        let count = UserPrivilege::find()
194            .filter(user_privilege::Column::GrantedBy.eq(user_id))
195            .count(&txn)
196            .await?;
197        if count != 0 {
198            return Err(MetaError::permission_denied(format!(
199                "drop user {} is not allowed, because it granted {} privileges to others",
200                user.name, count
201            )));
202        }
203
204        let res = User::delete_by_id(user_id).exec(&txn).await?;
205        if res.rows_affected != 1 {
206            return Err(MetaError::catalog_id_not_found("user", user_id));
207        }
208        txn.commit().await?;
209
210        let version = self
211            .notify_frontend(
212                NotificationOperation::Delete,
213                NotificationInfo::User(PbUserInfo {
214                    id: user_id as _,
215                    ..Default::default()
216                }),
217            )
218            .await;
219
220        Ok(version)
221    }
222
223    pub async fn grant_privilege(
224        &self,
225        user_ids: Vec<UserId>,
226        new_grant_privileges: &[PbGrantPrivilege],
227        grantor: UserId,
228        with_grant_option: bool,
229    ) -> MetaResult<NotificationVersion> {
230        let inner = self.inner.write().await;
231        let txn = inner.db.begin().await?;
232        for user_id in &user_ids {
233            ensure_user_id(*user_id, &txn).await?;
234        }
235
236        let mut privileges = vec![];
237        for gp in new_grant_privileges {
238            let id = extract_grant_obj_id(gp.get_object()?);
239            let internal_table_ids = get_internal_tables_by_id(id, &txn).await?;
240            let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?;
241            for action_with_opt in &gp.action_with_opts {
242                let action = action_with_opt.get_action()?.into();
243                privileges.push(user_privilege::ActiveModel {
244                    oid: Set(id),
245                    granted_by: Set(grantor),
246                    action: Set(action),
247                    with_grant_option: Set(with_grant_option),
248                    ..Default::default()
249                });
250                if action == Action::Select {
251                    privileges.extend(
252                        internal_table_ids
253                            .iter()
254                            .chain(index_state_table_ids.iter())
255                            .map(|&tid| user_privilege::ActiveModel {
256                                oid: Set(tid),
257                                granted_by: Set(grantor),
258                                action: Set(Action::Select),
259                                with_grant_option: Set(with_grant_option),
260                                ..Default::default()
261                            }),
262                    );
263                }
264            }
265        }
266
267        // check whether grantor has the privilege to grant the privilege.
268        let user = User::find_by_id(grantor)
269            .one(&txn)
270            .await?
271            .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
272        let mut filtered_privileges = vec![];
273        if !user.is_super {
274            for mut privilege in privileges {
275                if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
276                    filtered_privileges.push(privilege);
277                    continue;
278                }
279                let filter = user_privilege::Column::UserId
280                    .eq(grantor)
281                    .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
282                    .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
283                    .and(user_privilege::Column::WithGrantOption.eq(true));
284                let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
285                    .select_only()
286                    .column(user_privilege::Column::Id)
287                    .filter(filter)
288                    .into_tuple()
289                    .one(&txn)
290                    .await?;
291                let Some(privilege_id) = privilege_id else {
292                    tracing::warn!(
293                        "user {} don't have privilege {:?} or grant option",
294                        grantor,
295                        privilege.action
296                    );
297                    continue;
298                };
299                privilege.dependent_id = Set(Some(privilege_id));
300                filtered_privileges.push(privilege);
301            }
302        } else {
303            filtered_privileges = privileges;
304        }
305
306        // insert privileges
307        let user_privileges = user_ids
308            .iter()
309            .flat_map(|user_id| {
310                filtered_privileges.iter().map(|p| {
311                    let mut p = p.clone();
312                    p.user_id = Set(*user_id);
313                    p
314                })
315            })
316            .collect_vec();
317        for privilege in user_privileges {
318            let mut on_conflict = OnConflict::columns([
319                user_privilege::Column::UserId,
320                user_privilege::Column::Oid,
321                user_privilege::Column::Action,
322                user_privilege::Column::GrantedBy,
323            ]);
324            if *privilege.with_grant_option.as_ref() {
325                on_conflict.update_column(user_privilege::Column::WithGrantOption);
326            } else {
327                // Workaround to support MYSQL for `DO NOTHING`.
328                on_conflict.update_column(user_privilege::Column::UserId);
329            }
330
331            UserPrivilege::insert(privilege)
332                .on_conflict(on_conflict)
333                .do_nothing()
334                .exec(&txn)
335                .await?;
336        }
337
338        let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
339
340        txn.commit().await?;
341
342        let version = self.notify_users_update(user_infos).await;
343        Ok(version)
344    }
345
346    pub async fn revoke_privilege(
347        &self,
348        user_ids: Vec<UserId>,
349        revoke_grant_privileges: &[PbGrantPrivilege],
350        granted_by: UserId,
351        revoke_by: UserId,
352        revoke_grant_option: bool,
353        cascade: bool,
354    ) -> MetaResult<NotificationVersion> {
355        let inner = self.inner.write().await;
356        let txn = inner.db.begin().await?;
357        for user_id in &user_ids {
358            ensure_user_id(*user_id, &txn).await?;
359        }
360        // check whether revoke has the privilege to grant the privilege.
361        let revoke_user = User::find_by_id(revoke_by)
362            .one(&txn)
363            .await?
364            .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
365
366        // check whether user can revoke the privilege.
367        if !revoke_user.is_super && granted_by != revoke_by {
368            let granted_user_name: String = User::find_by_id(granted_by)
369                .select_only()
370                .column(user::Column::Name)
371                .into_tuple()
372                .one(&txn)
373                .await?
374                .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
375            return Err(MetaError::permission_denied(format!(
376                "user {} is not super, can't revoke privileges for {}",
377                revoke_user.name, granted_user_name
378            )));
379        }
380
381        let mut revoke_items = HashMap::new();
382        for privilege in revoke_grant_privileges {
383            let obj = extract_grant_obj_id(privilege.get_object()?);
384            let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?;
385            let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?;
386            let mut include_select = false;
387            let actions = privilege
388                .action_with_opts
389                .iter()
390                .map(|ao| {
391                    let action = Action::from(ao.get_action().unwrap());
392                    if action == Action::Select {
393                        include_select = true;
394                    }
395                    action
396                })
397                .collect_vec();
398            revoke_items.insert(obj, actions);
399            if include_select {
400                revoke_items.extend(
401                    internal_table_ids
402                        .iter()
403                        .chain(index_state_table_ids.iter())
404                        .map(|&tid| (tid, vec![Action::Select])),
405                );
406            }
407        }
408
409        let filter = if !revoke_user.is_super {
410            // ensure user have grant options or is owner of the object.
411            for (obj, actions) in &revoke_items {
412                if revoke_by == get_object_owner(*obj, &txn).await? {
413                    continue;
414                }
415                let owned_actions: HashSet<Action> = UserPrivilege::find()
416                    .select_only()
417                    .column(user_privilege::Column::Action)
418                    .filter(
419                        user_privilege::Column::UserId
420                            .eq(granted_by)
421                            .and(user_privilege::Column::Oid.eq(*obj))
422                            .and(user_privilege::Column::WithGrantOption.eq(true)),
423                    )
424                    .into_tuple::<Action>()
425                    .all(&txn)
426                    .await?
427                    .into_iter()
428                    .collect();
429                if actions.iter().any(|ac| !owned_actions.contains(ac)) {
430                    return Err(MetaError::permission_denied(format!(
431                        "user {} don't have privileges {:?} or grant option",
432                        revoke_user.name, actions,
433                    )));
434                }
435            }
436
437            user_privilege::Column::GrantedBy
438                .eq(granted_by)
439                .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
440        } else {
441            user_privilege::Column::UserId.is_in(user_ids.clone())
442        };
443        let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
444        for (obj, actions) in revoke_items {
445            let filter = filter
446                .clone()
447                .and(user_privilege::Column::Oid.eq(obj))
448                .and(user_privilege::Column::Action.is_in(actions));
449            root_user_privileges.extend(
450                UserPrivilege::find()
451                    .select_only()
452                    .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
453                    .filter(filter)
454                    .into_partial_model()
455                    .all(&txn)
456                    .await?,
457            );
458        }
459        if root_user_privileges.is_empty() {
460            tracing::warn!("no privilege to revoke, ignore it");
461            return Ok(IGNORED_NOTIFICATION_VERSION);
462        }
463
464        // check if the user granted any privileges to other users.
465        let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
466        let (all_privilege_ids, to_update_user_ids) = if !cascade {
467            ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
468            (
469                root_privilege_ids.clone(),
470                root_user_privileges
471                    .iter()
472                    .map(|ur| ur.user_id)
473                    .collect_vec(),
474            )
475        } else {
476            let all_user_privileges =
477                get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
478            (
479                all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
480                all_user_privileges
481                    .iter()
482                    .map(|ur| ur.user_id)
483                    .collect_vec(),
484            )
485        };
486
487        if revoke_grant_option {
488            UserPrivilege::update_many()
489                .col_expr(
490                    user_privilege::Column::WithGrantOption,
491                    SimpleExpr::Value(Value::Bool(Some(false))),
492                )
493                .filter(
494                    user_privilege::Column::Id
495                        .is_in(all_privilege_ids)
496                        .and(user_privilege::Column::WithGrantOption.eq(true)),
497                )
498                .exec(&txn)
499                .await?;
500        } else {
501            // The dependent privileges will be deleted cascade.
502            UserPrivilege::delete_many()
503                .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
504                .exec(&txn)
505                .await?;
506        }
507
508        let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
509
510        txn.commit().await?;
511
512        let version = self.notify_users_update(user_infos).await;
513        Ok(version)
514    }
515
516    pub async fn grant_default_privileges(
517        &self,
518        user_ids: Vec<UserId>,
519        database_id: DatabaseId,
520        schema_ids: Vec<SchemaId>,
521        grantor: UserId,
522        actions: Vec<PbAction>,
523        object_type: PbObjectType,
524        grantees: Vec<UserId>,
525        with_grant_option: bool,
526    ) -> MetaResult<()> {
527        tracing::debug!(
528            ?user_ids,
529            database_id,
530            ?schema_ids,
531            ?actions,
532            ?object_type,
533            ?grantees,
534            with_grant_option,
535            "grant default privileges",
536        );
537        let inner = self.inner.write().await;
538        let txn = inner.db.begin().await?;
539        for user_id in &user_ids {
540            ensure_user_id(*user_id, &txn).await?;
541        }
542        ensure_object_id(ObjectType::Database, database_id, &txn).await?;
543        for schema_id in &schema_ids {
544            ensure_object_id(ObjectType::Schema, *schema_id, &txn).await?;
545        }
546        for grantee in &grantees {
547            ensure_user_id(*grantee, &txn).await?;
548        }
549        if object_type == PbObjectType::Schema {
550            assert!(
551                schema_ids.is_empty(),
552                "schema_ids should be empty when object_type is Schema"
553            );
554
555            // Note that the UNIQUE constraint does not treat NULL values as equal, we cannot rely on conflict check
556            // to update it and have to check existing default privileges manually.
557            let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
558            let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
559                .select_only()
560                .columns([
561                    user_default_privilege::Column::Id,
562                    user_default_privilege::Column::UserId,
563                    user_default_privilege::Column::Grantee,
564                    user_default_privilege::Column::Action,
565                ])
566                .filter(
567                    user_default_privilege::Column::DatabaseId
568                        .eq(database_id)
569                        .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
570                        .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
571                        .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
572                        .and(user_default_privilege::Column::Action.is_in(actions.clone())),
573                )
574                .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
575                .all(&txn)
576                .await?
577                .into_iter()
578                .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
579                .collect();
580
581            for user_id in user_ids {
582                for grantee in &grantees {
583                    for action in &actions {
584                        if let Some(existing_id) =
585                            existing_default_privileges.get(&(user_id, *grantee, *action))
586                            && with_grant_option
587                        {
588                            // If the default privilege already exists, we should update the grant option.
589                            UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
590                                id: Set(*existing_id),
591                                with_grant_option: Set(true),
592                                granted_by: Set(grantor as _),
593                                ..Default::default()
594                            })
595                            .exec(&txn)
596                            .await?;
597                        } else {
598                            UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
599                                id: Default::default(),
600                                database_id: Set(database_id),
601                                schema_id: Set(None),
602                                object_type: Set(ObjectType::Schema),
603                                for_materialized_view: Set(false),
604                                user_id: Set(user_id),
605                                grantee: Set(*grantee),
606                                granted_by: Set(grantor as _),
607                                action: Set(*action),
608                                with_grant_option: Set(with_grant_option),
609                            })
610                            .exec(&txn)
611                            .await?;
612                        }
613                    }
614                }
615            }
616        } else {
617            let mut default_privileges = vec![];
618            for user_id in user_ids {
619                for grantee in &grantees {
620                    for action in &actions {
621                        if schema_ids.is_empty() {
622                            default_privileges.push(user_default_privilege::ActiveModel {
623                                id: Default::default(),
624                                database_id: Set(database_id),
625                                schema_id: Set(None),
626                                object_type: Set(object_type.into()),
627                                for_materialized_view: Set(object_type == PbObjectType::Mview),
628                                user_id: Set(user_id),
629                                grantee: Set(*grantee),
630                                granted_by: Set(grantor as _),
631                                action: Set((*action).into()),
632                                with_grant_option: Set(with_grant_option),
633                            });
634                            continue;
635                        }
636                        for schema_id in &schema_ids {
637                            default_privileges.push(user_default_privilege::ActiveModel {
638                                id: Default::default(),
639                                database_id: Set(database_id),
640                                schema_id: Set(Some(*schema_id)),
641                                object_type: Set(object_type.into()),
642                                for_materialized_view: Set(object_type == PbObjectType::Mview),
643                                user_id: Set(user_id),
644                                grantee: Set(*grantee),
645                                granted_by: Set(grantor as _),
646                                action: Set((*action).into()),
647                                with_grant_option: Set(with_grant_option),
648                            });
649                        }
650                    }
651                }
652            }
653
654            let mut on_conflict = OnConflict::columns([
655                user_default_privilege::Column::UserId,
656                user_default_privilege::Column::DatabaseId,
657                user_default_privilege::Column::SchemaId,
658                user_default_privilege::Column::ObjectType,
659                user_default_privilege::Column::ForMaterializedView,
660                user_default_privilege::Column::Grantee,
661                user_default_privilege::Column::Action,
662            ]);
663            if with_grant_option {
664                on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
665            } else {
666                // Workaround to support MYSQL for `DO NOTHING`.
667                on_conflict.update_column(user_default_privilege::Column::UserId);
668            }
669            UserDefaultPrivilege::insert_many(default_privileges)
670                .on_conflict(on_conflict)
671                .do_nothing()
672                .exec(&txn)
673                .await?;
674        }
675
676        txn.commit().await?;
677        Ok(())
678    }
679
680    pub async fn revoke_default_privileges(
681        &self,
682        user_ids: Vec<UserId>,
683        database_id: DatabaseId,
684        schema_ids: Vec<SchemaId>,
685        actions: Vec<PbAction>,
686        object_type: PbObjectType,
687        grantees: Vec<UserId>,
688        revoke_grant_option: bool,
689    ) -> MetaResult<()> {
690        let inner = self.inner.write().await;
691        let txn = inner.db.begin().await?;
692        for user_id in &user_ids {
693            ensure_user_id(*user_id, &txn).await?;
694        }
695
696        let schema_filter = if schema_ids.is_empty() {
697            user_default_privilege::Column::SchemaId.is_null()
698        } else {
699            user_default_privilege::Column::SchemaId.is_in(schema_ids)
700        };
701        let filter = user_default_privilege::Column::DatabaseId
702            .eq(database_id)
703            .and(schema_filter)
704            .and(user_default_privilege::Column::UserId.is_in(user_ids))
705            .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
706            .and(user_default_privilege::Column::Grantee.is_in(grantees))
707            .and(
708                user_default_privilege::Column::Action
709                    .is_in(actions.iter().map(|&a| Action::from(a))),
710            );
711
712        if revoke_grant_option {
713            // update the `with_grant_option` field to false
714            let res = UserDefaultPrivilege::update_many()
715                .col_expr(
716                    user_default_privilege::Column::WithGrantOption,
717                    SimpleExpr::Value(Value::Bool(Some(false))),
718                )
719                .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
720                .exec(&txn)
721                .await?;
722            tracing::info!(
723                "revoke {count} grant option for default privileges",
724                count = res.rows_affected
725            );
726        } else {
727            let res = UserDefaultPrivilege::delete_many()
728                .filter(filter)
729                .exec(&txn)
730                .await?;
731            tracing::info!(
732                "revoke {count} default privileges",
733                count = res.rows_affected
734            );
735        }
736
737        txn.commit().await?;
738        Ok(())
739    }
740}
741
742#[cfg(test)]
743mod tests {
744    use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
745
746    use super::*;
747    use crate::manager::MetaSrvEnv;
748
749    const TEST_DATABASE_ID: DatabaseId = 1;
750    const TEST_ROOT_USER_ID: UserId = 1;
751
752    fn make_test_user(name: &str) -> PbUserInfo {
753        PbUserInfo {
754            name: name.to_owned(),
755            ..Default::default()
756        }
757    }
758
759    fn make_privilege(
760        object: PbObject,
761        actions: &[PbAction],
762        with_grant_option: bool,
763    ) -> PbGrantPrivilege {
764        PbGrantPrivilege {
765            object: Some(object),
766            action_with_opts: actions
767                .iter()
768                .map(|&action| PbActionWithGrantOption {
769                    action: action as _,
770                    with_grant_option,
771                    ..Default::default()
772                })
773                .collect(),
774        }
775    }
776
777    #[tokio::test]
778    async fn test_user_and_privilege() -> MetaResult<()> {
779        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
780        mgr.create_user(make_test_user("test_user_1")).await?;
781        mgr.create_user(make_test_user("test_user_2")).await?;
782        let user_1 = mgr.get_user_by_name("test_user_1").await?;
783        let user_2 = mgr.get_user_by_name("test_user_2").await?;
784
785        assert!(
786            mgr.create_user(make_test_user("test_user_1"))
787                .await
788                .is_err(),
789            "user_1 already exists"
790        );
791        mgr.update_user(
792            PbUserInfo {
793                id: user_1.user_id as _,
794                name: "test_user_1_new".to_owned(),
795                ..Default::default()
796            },
797            &[PbUpdateField::Rename],
798        )
799        .await?;
800        let user_1 = mgr.get_user(user_1.user_id).await?;
801        assert_eq!(user_1.name, "test_user_1_new".to_owned());
802
803        let conn_with_option = make_privilege(
804            PbObject::DatabaseId(TEST_DATABASE_ID as _),
805            &[PbAction::Connect],
806            true,
807        );
808        let create_without_option = make_privilege(
809            PbObject::DatabaseId(TEST_DATABASE_ID as _),
810            &[PbAction::Create],
811            false,
812        );
813        // ROOT grant CONN with grant option to user_1.
814        mgr.grant_privilege(
815            vec![user_1.user_id],
816            std::slice::from_ref(&conn_with_option),
817            TEST_ROOT_USER_ID,
818            true,
819        )
820        .await?;
821        // ROOT grant CREATE without grant option to user_1.
822        mgr.grant_privilege(
823            vec![user_1.user_id],
824            std::slice::from_ref(&create_without_option),
825            TEST_ROOT_USER_ID,
826            false,
827        )
828        .await?;
829        // user_1 grant CONN with grant option to user_2.
830        mgr.grant_privilege(
831            vec![user_2.user_id],
832            std::slice::from_ref(&conn_with_option),
833            user_1.user_id,
834            true,
835        )
836        .await?;
837        mgr.grant_privilege(
838            vec![user_2.user_id],
839            std::slice::from_ref(&create_without_option),
840            user_1.user_id,
841            false,
842        )
843        .await?;
844
845        assert!(
846            mgr.drop_user(user_1.user_id).await.is_err(),
847            "user_1 can't be dropped"
848        );
849
850        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
851        assert_eq!(privilege_1.len(), 2);
852        assert!(privilege_1.iter().all(|gp| gp.object
853            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
854            && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32));
855
856        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
857        assert_eq!(privilege_2.len(), 1);
858        assert!(privilege_2.iter().all(|gp| gp.object
859            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
860            && gp.action_with_opts[0].granted_by == user_1.user_id as u32
861            && gp.action_with_opts[0].with_grant_option));
862
863        // revoke privilege for others by non-super user.
864        assert!(
865            mgr.revoke_privilege(
866                vec![user_1.user_id],
867                std::slice::from_ref(&conn_with_option),
868                TEST_ROOT_USER_ID,
869                user_2.user_id,
870                false,
871                false
872            )
873            .await
874            .is_err(),
875            "user_2 can't revoke for user_1"
876        );
877
878        // revoke privilege without grant option.
879        assert!(
880            mgr.revoke_privilege(
881                vec![user_2.user_id],
882                std::slice::from_ref(&create_without_option),
883                user_1.user_id,
884                user_1.user_id,
885                false,
886                false
887            )
888            .await
889            .is_err(),
890            "user_2 don't have grant option for CREATE"
891        );
892
893        // revoke referred privilege in restrict mode.
894        assert!(
895            mgr.revoke_privilege(
896                vec![user_1.user_id],
897                std::slice::from_ref(&conn_with_option),
898                TEST_ROOT_USER_ID,
899                TEST_ROOT_USER_ID,
900                false,
901                false
902            )
903            .await
904            .is_err(),
905            "permission deny in restrict mode, CONN granted to user_2"
906        );
907
908        // revoke non-referred privilege in restrict mode.
909        mgr.revoke_privilege(
910            vec![user_1.user_id],
911            std::slice::from_ref(&create_without_option),
912            TEST_ROOT_USER_ID,
913            TEST_ROOT_USER_ID,
914            false,
915            false,
916        )
917        .await?;
918
919        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
920        assert_eq!(privilege_1.len(), 1);
921        assert!(privilege_1.iter().all(|gp| gp.object
922            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
923            && gp.action_with_opts[0].action == PbAction::Connect as i32));
924
925        // revoke grant option for referred privilege in cascade mode.
926        mgr.revoke_privilege(
927            vec![user_1.user_id],
928            std::slice::from_ref(&conn_with_option),
929            TEST_ROOT_USER_ID,
930            TEST_ROOT_USER_ID,
931            true,
932            true,
933        )
934        .await?;
935        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
936        assert_eq!(privilege_1.len(), 1);
937        assert!(privilege_1.iter().all(|gp| gp.object
938            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
939            && gp.action_with_opts[0].action == PbAction::Connect as i32
940            && !gp.action_with_opts[0].with_grant_option));
941        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
942        assert_eq!(privilege_2.len(), 1);
943        assert!(privilege_2.iter().all(|gp| gp.object
944            == Some(PbObject::DatabaseId(TEST_DATABASE_ID as _))
945            && gp.action_with_opts[0].action == PbAction::Connect as i32
946            && !gp.action_with_opts[0].with_grant_option));
947
948        // revoke referred privilege in cascade mode.
949        mgr.revoke_privilege(
950            vec![user_1.user_id],
951            std::slice::from_ref(&conn_with_option),
952            TEST_ROOT_USER_ID,
953            TEST_ROOT_USER_ID,
954            false,
955            true,
956        )
957        .await?;
958        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
959        assert!(privilege_1.is_empty());
960        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
961        assert!(privilege_2.is_empty());
962
963        mgr.drop_user(user_1.user_id).await?;
964        mgr.drop_user(user_2.user_id).await?;
965        Ok(())
966    }
967}