risingwave_meta/controller/
user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG};
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{Object, User, UserDefaultPrivilege, UserPrivilege};
21use risingwave_meta_model::user_privilege::Action;
22use risingwave_meta_model::{
23    AuthInfo, DatabaseId, DefaultPrivilegeId, PrivilegeId, SchemaId, UserId, object, user,
24    user_default_privilege, user_privilege,
25};
26use risingwave_pb::common::PbObjectType;
27use risingwave_pb::meta::subscribe_response::{
28    Info as NotificationInfo, Operation as NotificationOperation,
29};
30use risingwave_pb::user::update_user_request::PbUpdateField;
31use risingwave_pb::user::{PbAction, PbGrantPrivilege, PbUserInfo};
32use sea_orm::ActiveValue::Set;
33use sea_orm::sea_query::{OnConflict, SimpleExpr, Value};
34use sea_orm::{
35    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
36    QuerySelect, TransactionTrait,
37};
38
39use crate::controller::catalog::CatalogController;
40use crate::controller::utils::{
41    PartialUserPrivilege, check_user_name_duplicate, ensure_object_id,
42    ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id,
43    get_iceberg_related_object_ids, get_index_state_tables_by_table_id, get_internal_tables_by_id,
44    get_object_owner, get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
45};
46use crate::manager::{IGNORED_NOTIFICATION_VERSION, NotificationVersion};
47use crate::{MetaError, MetaResult};
48
49impl CatalogController {
50    pub(crate) async fn notify_users_update(
51        &self,
52        user_infos: Vec<PbUserInfo>,
53    ) -> NotificationVersion {
54        let mut version = 0;
55        for info in user_infos {
56            version = self
57                .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info))
58                .await;
59        }
60        version
61    }
62
63    pub async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult<NotificationVersion> {
64        let inner = self.inner.write().await;
65        let txn = inner.db.begin().await?;
66        check_user_name_duplicate(&pb_user.name, &txn).await?;
67
68        let grant_privileges = pb_user.grant_privileges.clone();
69        let user: user::ActiveModel = pb_user.into();
70        let user = user.insert(&txn).await?;
71
72        if !grant_privileges.is_empty() {
73            let mut privileges = vec![];
74            for gp in &grant_privileges {
75                let id = extract_grant_obj_id(gp.get_object()?);
76                for action_with_opt in &gp.action_with_opts {
77                    privileges.push(user_privilege::ActiveModel {
78                        user_id: Set(user.user_id),
79                        oid: Set(id),
80                        granted_by: Set(action_with_opt.granted_by as _),
81                        action: Set(action_with_opt.get_action()?.into()),
82                        with_grant_option: Set(action_with_opt.with_grant_option),
83                        ..Default::default()
84                    });
85                }
86            }
87            UserPrivilege::insert_many(privileges).exec(&txn).await?;
88        }
89        txn.commit().await?;
90
91        let mut user_info: PbUserInfo = user.into();
92        user_info.grant_privileges = grant_privileges;
93        let version = self
94            .notify_frontend(
95                NotificationOperation::Add,
96                NotificationInfo::User(user_info),
97            )
98            .await;
99
100        Ok(version)
101    }
102
103    pub async fn update_user(
104        &self,
105        update_user: PbUserInfo,
106        update_fields: &[PbUpdateField],
107    ) -> MetaResult<NotificationVersion> {
108        let inner = self.inner.write().await;
109        let rename_flag = update_fields.contains(&PbUpdateField::Rename);
110        if rename_flag {
111            check_user_name_duplicate(&update_user.name, &inner.db).await?;
112        }
113
114        let user = User::find_by_id(update_user.id as UserId)
115            .one(&inner.db)
116            .await?
117            .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?;
118        let mut user = user.into_active_model();
119        update_fields.iter().for_each(|&field| match field {
120            PbUpdateField::Unspecified => unreachable!(),
121            PbUpdateField::Super => user.is_super = Set(update_user.is_super),
122            PbUpdateField::Login => user.can_login = Set(update_user.can_login),
123            PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db),
124            PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user),
125            PbUpdateField::AuthInfo => {
126                user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from))
127            }
128            PbUpdateField::Rename => user.name = Set(update_user.name.clone()),
129            PbUpdateField::Admin => user.is_admin = Set(update_user.is_admin),
130        });
131
132        let user = user.update(&inner.db).await?;
133        let mut user_info: PbUserInfo = user.into();
134        user_info.grant_privileges = get_user_privilege(user_info.id as _, &inner.db).await?;
135        let version = self
136            .notify_frontend(
137                NotificationOperation::Update,
138                NotificationInfo::User(user_info),
139            )
140            .await;
141
142        Ok(version)
143    }
144
145    #[cfg(test)]
146    pub async fn get_user(&self, id: UserId) -> MetaResult<user::Model> {
147        let inner = self.inner.read().await;
148        let user = User::find_by_id(id)
149            .one(&inner.db)
150            .await?
151            .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?;
152        Ok(user)
153    }
154
155    #[cfg(test)]
156    pub async fn get_user_by_name(&self, name: &str) -> MetaResult<user::Model> {
157        let inner = self.inner.read().await;
158        let user = User::find()
159            .filter(user::Column::Name.eq(name))
160            .one(&inner.db)
161            .await?
162            .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?;
163        Ok(user)
164    }
165
166    pub async fn drop_user(&self, user_id: UserId) -> MetaResult<NotificationVersion> {
167        let inner = self.inner.write().await;
168        let txn = inner.db.begin().await?;
169        let user = User::find_by_id(user_id)
170            .one(&txn)
171            .await?
172            .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?;
173        if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG {
174            return Err(MetaError::permission_denied(format!(
175                "drop default super user {} is not allowed",
176                user.name
177            )));
178        }
179
180        // check if the user is the owner of any objects.
181        let count = Object::find()
182            .filter(object::Column::OwnerId.eq(user_id))
183            .count(&txn)
184            .await?;
185        if count != 0 {
186            return Err(MetaError::permission_denied(format!(
187                "drop user {} is not allowed, because it owns {} objects",
188                user.name, count
189            )));
190        }
191
192        // check if the user granted any privileges to other users.
193        let count = UserPrivilege::find()
194            .filter(user_privilege::Column::GrantedBy.eq(user_id))
195            .count(&txn)
196            .await?;
197        if count != 0 {
198            return Err(MetaError::permission_denied(format!(
199                "drop user {} is not allowed, because it granted {} privileges to others",
200                user.name, count
201            )));
202        }
203
204        let res = User::delete_by_id(user_id).exec(&txn).await?;
205        if res.rows_affected != 1 {
206            return Err(MetaError::catalog_id_not_found("user", user_id));
207        }
208        txn.commit().await?;
209
210        let version = self
211            .notify_frontend(
212                NotificationOperation::Delete,
213                NotificationInfo::User(PbUserInfo {
214                    id: user_id as _,
215                    ..Default::default()
216                }),
217            )
218            .await;
219
220        Ok(version)
221    }
222
223    pub async fn grant_privilege(
224        &self,
225        user_ids: Vec<UserId>,
226        new_grant_privileges: &[PbGrantPrivilege],
227        grantor: UserId,
228        with_grant_option: bool,
229    ) -> MetaResult<NotificationVersion> {
230        let inner = self.inner.write().await;
231        let txn = inner.db.begin().await?;
232        for user_id in &user_ids {
233            ensure_user_id(*user_id, &txn).await?;
234        }
235
236        let mut privileges = vec![];
237        for gp in new_grant_privileges {
238            let id = extract_grant_obj_id(gp.get_object()?);
239            let internal_table_ids = get_internal_tables_by_id(id.as_job_id(), &txn).await?;
240            let index_state_table_ids =
241                get_index_state_tables_by_table_id(id.as_table_id(), &txn).await?;
242            for action_with_opt in &gp.action_with_opts {
243                let action = action_with_opt.get_action()?.into();
244                privileges.push(user_privilege::ActiveModel {
245                    oid: Set(id),
246                    granted_by: Set(grantor),
247                    action: Set(action),
248                    with_grant_option: Set(with_grant_option),
249                    ..Default::default()
250                });
251                if action == Action::Select {
252                    privileges.extend(
253                        internal_table_ids
254                            .iter()
255                            .chain(index_state_table_ids.iter())
256                            .map(|&tid| user_privilege::ActiveModel {
257                                oid: Set(tid.as_object_id()),
258                                granted_by: Set(grantor),
259                                action: Set(Action::Select),
260                                with_grant_option: Set(with_grant_option),
261                                ..Default::default()
262                            }),
263                    );
264                    let iceberg_privilege_object_ids =
265                        get_iceberg_related_object_ids(id, &txn).await?;
266                    privileges.extend(iceberg_privilege_object_ids.iter().map(
267                        |&iceberg_object_id| user_privilege::ActiveModel {
268                            oid: Set(iceberg_object_id),
269                            granted_by: Set(grantor),
270                            action: Set(action),
271                            with_grant_option: Set(with_grant_option),
272                            ..Default::default()
273                        },
274                    ));
275                }
276            }
277        }
278
279        // check whether grantor has the privilege to grant the privilege.
280        let user = User::find_by_id(grantor)
281            .one(&txn)
282            .await?
283            .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?;
284        let mut filtered_privileges = vec![];
285        if !user.is_super {
286            for mut privilege in privileges {
287                if grantor == get_object_owner(*privilege.oid.as_ref(), &txn).await? {
288                    filtered_privileges.push(privilege);
289                    continue;
290                }
291                let filter = user_privilege::Column::UserId
292                    .eq(grantor)
293                    .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
294                    .and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
295                    .and(user_privilege::Column::WithGrantOption.eq(true));
296                let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
297                    .select_only()
298                    .column(user_privilege::Column::Id)
299                    .filter(filter)
300                    .into_tuple()
301                    .one(&txn)
302                    .await?;
303                let Some(privilege_id) = privilege_id else {
304                    tracing::warn!(
305                        "user {} don't have privilege {:?} or grant option",
306                        grantor,
307                        privilege.action
308                    );
309                    continue;
310                };
311                privilege.dependent_id = Set(Some(privilege_id));
312                filtered_privileges.push(privilege);
313            }
314        } else {
315            filtered_privileges = privileges;
316        }
317
318        // insert privileges
319        let user_privileges = user_ids
320            .iter()
321            .flat_map(|user_id| {
322                filtered_privileges.iter().map(|p| {
323                    let mut p = p.clone();
324                    p.user_id = Set(*user_id);
325                    p
326                })
327            })
328            .collect_vec();
329        for privilege in user_privileges {
330            let mut on_conflict = OnConflict::columns([
331                user_privilege::Column::UserId,
332                user_privilege::Column::Oid,
333                user_privilege::Column::Action,
334                user_privilege::Column::GrantedBy,
335            ]);
336            if *privilege.with_grant_option.as_ref() {
337                on_conflict.update_column(user_privilege::Column::WithGrantOption);
338            } else {
339                // Workaround to support MYSQL for `DO NOTHING`.
340                on_conflict.update_column(user_privilege::Column::UserId);
341            }
342
343            UserPrivilege::insert(privilege)
344                .on_conflict(on_conflict)
345                .do_nothing()
346                .exec(&txn)
347                .await?;
348        }
349
350        let user_infos = list_user_info_by_ids(user_ids, &txn).await?;
351
352        txn.commit().await?;
353
354        let version = self.notify_users_update(user_infos).await;
355        Ok(version)
356    }
357
358    pub async fn revoke_privilege(
359        &self,
360        user_ids: Vec<UserId>,
361        revoke_grant_privileges: &[PbGrantPrivilege],
362        granted_by: UserId,
363        revoke_by: UserId,
364        revoke_grant_option: bool,
365        cascade: bool,
366    ) -> MetaResult<NotificationVersion> {
367        let inner = self.inner.write().await;
368        let txn = inner.db.begin().await?;
369        for user_id in &user_ids {
370            ensure_user_id(*user_id, &txn).await?;
371        }
372        // check whether revoke has the privilege to grant the privilege.
373        let revoke_user = User::find_by_id(revoke_by)
374            .one(&txn)
375            .await?
376            .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?;
377
378        // check whether user can revoke the privilege.
379        if !revoke_user.is_super && granted_by != revoke_by {
380            let granted_user_name: String = User::find_by_id(granted_by)
381                .select_only()
382                .column(user::Column::Name)
383                .into_tuple()
384                .one(&txn)
385                .await?
386                .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?;
387            return Err(MetaError::permission_denied(format!(
388                "user {} is not super, can't revoke privileges for {}",
389                revoke_user.name, granted_user_name
390            )));
391        }
392
393        let mut revoke_items = HashMap::new();
394        for privilege in revoke_grant_privileges {
395            let obj = extract_grant_obj_id(privilege.get_object()?);
396            let internal_table_ids = get_internal_tables_by_id(obj.as_job_id(), &txn).await?;
397            let index_state_table_ids =
398                get_index_state_tables_by_table_id(obj.as_table_id(), &txn).await?;
399            let mut include_select = false;
400            let actions = privilege
401                .action_with_opts
402                .iter()
403                .map(|ao| {
404                    let action = Action::from(ao.get_action().unwrap());
405                    if action == Action::Select {
406                        include_select = true;
407                    }
408                    action
409                })
410                .collect_vec();
411            revoke_items.insert(obj, actions);
412            if include_select {
413                revoke_items.extend(
414                    internal_table_ids
415                        .iter()
416                        .chain(index_state_table_ids.iter())
417                        .map(|&tid| (tid.as_object_id(), vec![Action::Select])),
418                );
419                let iceberg_privilege_object_ids =
420                    get_iceberg_related_object_ids(obj, &txn).await?;
421                if !iceberg_privilege_object_ids.is_empty() {
422                    revoke_items.extend(
423                        iceberg_privilege_object_ids
424                            .into_iter()
425                            .map(|iceberg_object_id| (iceberg_object_id, vec![Action::Select])),
426                    );
427                }
428            }
429        }
430
431        let filter = if !revoke_user.is_super {
432            // ensure user have grant options or is owner of the object.
433            for (obj, actions) in &revoke_items {
434                if revoke_by == get_object_owner(*obj, &txn).await? {
435                    continue;
436                }
437                let owned_actions: HashSet<Action> = UserPrivilege::find()
438                    .select_only()
439                    .column(user_privilege::Column::Action)
440                    .filter(
441                        user_privilege::Column::UserId
442                            .eq(granted_by)
443                            .and(user_privilege::Column::Oid.eq(*obj))
444                            .and(user_privilege::Column::WithGrantOption.eq(true)),
445                    )
446                    .into_tuple::<Action>()
447                    .all(&txn)
448                    .await?
449                    .into_iter()
450                    .collect();
451                if actions.iter().any(|ac| !owned_actions.contains(ac)) {
452                    return Err(MetaError::permission_denied(format!(
453                        "user {} don't have privileges {:?} or grant option",
454                        revoke_user.name, actions,
455                    )));
456                }
457            }
458
459            user_privilege::Column::GrantedBy
460                .eq(granted_by)
461                .and(user_privilege::Column::UserId.is_in(user_ids.clone()))
462        } else {
463            user_privilege::Column::UserId.is_in(user_ids.clone())
464        };
465        let mut root_user_privileges: Vec<PartialUserPrivilege> = vec![];
466        for (obj, actions) in revoke_items {
467            let filter = filter
468                .clone()
469                .and(user_privilege::Column::Oid.eq(obj))
470                .and(user_privilege::Column::Action.is_in(actions));
471            root_user_privileges.extend(
472                UserPrivilege::find()
473                    .select_only()
474                    .columns([user_privilege::Column::Id, user_privilege::Column::UserId])
475                    .filter(filter)
476                    .into_partial_model()
477                    .all(&txn)
478                    .await?,
479            );
480        }
481        if root_user_privileges.is_empty() {
482            tracing::warn!("no privilege to revoke, ignore it");
483            return Ok(IGNORED_NOTIFICATION_VERSION);
484        }
485
486        // check if the user granted any privileges to other users.
487        let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec();
488        let (all_privilege_ids, to_update_user_ids) = if !cascade {
489            ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?;
490            (
491                root_privilege_ids.clone(),
492                root_user_privileges
493                    .iter()
494                    .map(|ur| ur.user_id)
495                    .collect_vec(),
496            )
497        } else {
498            let all_user_privileges =
499                get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?;
500            (
501                all_user_privileges.iter().map(|ur| ur.id).collect_vec(),
502                all_user_privileges
503                    .iter()
504                    .map(|ur| ur.user_id)
505                    .collect_vec(),
506            )
507        };
508
509        if revoke_grant_option {
510            UserPrivilege::update_many()
511                .col_expr(
512                    user_privilege::Column::WithGrantOption,
513                    SimpleExpr::Value(Value::Bool(Some(false))),
514                )
515                .filter(
516                    user_privilege::Column::Id
517                        .is_in(all_privilege_ids)
518                        .and(user_privilege::Column::WithGrantOption.eq(true)),
519                )
520                .exec(&txn)
521                .await?;
522        } else {
523            // The dependent privileges will be deleted cascade.
524            UserPrivilege::delete_many()
525                .filter(user_privilege::Column::Id.is_in(root_privilege_ids))
526                .exec(&txn)
527                .await?;
528        }
529
530        let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
531
532        txn.commit().await?;
533
534        let version = self.notify_users_update(user_infos).await;
535        Ok(version)
536    }
537
538    pub async fn grant_default_privileges(
539        &self,
540        user_ids: Vec<UserId>,
541        database_id: DatabaseId,
542        schema_ids: Vec<SchemaId>,
543        grantor: UserId,
544        actions: Vec<PbAction>,
545        object_type: PbObjectType,
546        grantees: Vec<UserId>,
547        with_grant_option: bool,
548    ) -> MetaResult<()> {
549        tracing::debug!(
550            ?user_ids,
551            %database_id,
552            ?schema_ids,
553            ?actions,
554            ?object_type,
555            ?grantees,
556            with_grant_option,
557            "grant default privileges",
558        );
559        let inner = self.inner.write().await;
560        let txn = inner.db.begin().await?;
561        for user_id in &user_ids {
562            ensure_user_id(*user_id, &txn).await?;
563        }
564        ensure_object_id(ObjectType::Database, database_id, &txn).await?;
565        for schema_id in &schema_ids {
566            ensure_object_id(ObjectType::Schema, *schema_id, &txn).await?;
567        }
568        for grantee in &grantees {
569            ensure_user_id(*grantee, &txn).await?;
570        }
571        if object_type == PbObjectType::Schema {
572            assert!(
573                schema_ids.is_empty(),
574                "schema_ids should be empty when object_type is Schema"
575            );
576
577            // Note that the UNIQUE constraint does not treat NULL values as equal, we cannot rely on conflict check
578            // to update it and have to check existing default privileges manually.
579            let actions = actions.iter().map(|&a| Action::from(a)).collect_vec();
580            let existing_default_privileges: HashMap<_, _> = UserDefaultPrivilege::find()
581                .select_only()
582                .columns([
583                    user_default_privilege::Column::Id,
584                    user_default_privilege::Column::UserId,
585                    user_default_privilege::Column::Grantee,
586                    user_default_privilege::Column::Action,
587                ])
588                .filter(
589                    user_default_privilege::Column::DatabaseId
590                        .eq(database_id)
591                        .and(user_default_privilege::Column::ObjectType.eq(ObjectType::Schema))
592                        .and(user_default_privilege::Column::UserId.is_in(user_ids.clone()))
593                        .and(user_default_privilege::Column::Grantee.is_in(grantees.clone()))
594                        .and(user_default_privilege::Column::Action.is_in(actions.clone())),
595                )
596                .into_tuple::<(DefaultPrivilegeId, UserId, UserId, Action)>()
597                .all(&txn)
598                .await?
599                .into_iter()
600                .map(|(id, user_id, grantee, action)| ((user_id, grantee, action), id))
601                .collect();
602
603            for user_id in user_ids {
604                for grantee in &grantees {
605                    for action in &actions {
606                        if let Some(existing_id) =
607                            existing_default_privileges.get(&(user_id, *grantee, *action))
608                            && with_grant_option
609                        {
610                            // If the default privilege already exists, we should update the grant option.
611                            UserDefaultPrivilege::update(user_default_privilege::ActiveModel {
612                                id: Set(*existing_id),
613                                with_grant_option: Set(true),
614                                granted_by: Set(grantor as _),
615                                ..Default::default()
616                            })
617                            .exec(&txn)
618                            .await?;
619                        } else {
620                            UserDefaultPrivilege::insert(user_default_privilege::ActiveModel {
621                                id: Default::default(),
622                                database_id: Set(database_id),
623                                schema_id: Set(None),
624                                object_type: Set(ObjectType::Schema),
625                                for_materialized_view: Set(false),
626                                user_id: Set(user_id),
627                                grantee: Set(*grantee),
628                                granted_by: Set(grantor as _),
629                                action: Set(*action),
630                                with_grant_option: Set(with_grant_option),
631                            })
632                            .exec(&txn)
633                            .await?;
634                        }
635                    }
636                }
637            }
638        } else {
639            let mut default_privileges = vec![];
640            for user_id in user_ids {
641                for grantee in &grantees {
642                    for action in &actions {
643                        if schema_ids.is_empty() {
644                            default_privileges.push(user_default_privilege::ActiveModel {
645                                id: Default::default(),
646                                database_id: Set(database_id),
647                                schema_id: Set(None),
648                                object_type: Set(object_type.into()),
649                                for_materialized_view: Set(object_type == PbObjectType::Mview),
650                                user_id: Set(user_id),
651                                grantee: Set(*grantee),
652                                granted_by: Set(grantor as _),
653                                action: Set((*action).into()),
654                                with_grant_option: Set(with_grant_option),
655                            });
656                            continue;
657                        }
658                        for schema_id in &schema_ids {
659                            default_privileges.push(user_default_privilege::ActiveModel {
660                                id: Default::default(),
661                                database_id: Set(database_id),
662                                schema_id: Set(Some(*schema_id)),
663                                object_type: Set(object_type.into()),
664                                for_materialized_view: Set(object_type == PbObjectType::Mview),
665                                user_id: Set(user_id),
666                                grantee: Set(*grantee),
667                                granted_by: Set(grantor as _),
668                                action: Set((*action).into()),
669                                with_grant_option: Set(with_grant_option),
670                            });
671                        }
672                    }
673                }
674            }
675
676            let mut on_conflict = OnConflict::columns([
677                user_default_privilege::Column::UserId,
678                user_default_privilege::Column::DatabaseId,
679                user_default_privilege::Column::SchemaId,
680                user_default_privilege::Column::ObjectType,
681                user_default_privilege::Column::ForMaterializedView,
682                user_default_privilege::Column::Grantee,
683                user_default_privilege::Column::Action,
684            ]);
685            if with_grant_option {
686                on_conflict.update_column(user_default_privilege::Column::WithGrantOption);
687            } else {
688                // Workaround to support MYSQL for `DO NOTHING`.
689                on_conflict.update_column(user_default_privilege::Column::UserId);
690            }
691            UserDefaultPrivilege::insert_many(default_privileges)
692                .on_conflict(on_conflict)
693                .do_nothing()
694                .exec(&txn)
695                .await?;
696        }
697
698        txn.commit().await?;
699        Ok(())
700    }
701
702    pub async fn revoke_default_privileges(
703        &self,
704        user_ids: Vec<UserId>,
705        database_id: DatabaseId,
706        schema_ids: Vec<SchemaId>,
707        actions: Vec<PbAction>,
708        object_type: PbObjectType,
709        grantees: Vec<UserId>,
710        revoke_grant_option: bool,
711    ) -> MetaResult<()> {
712        let inner = self.inner.write().await;
713        let txn = inner.db.begin().await?;
714        for user_id in &user_ids {
715            ensure_user_id(*user_id, &txn).await?;
716        }
717
718        let schema_filter = if schema_ids.is_empty() {
719            user_default_privilege::Column::SchemaId.is_null()
720        } else {
721            user_default_privilege::Column::SchemaId.is_in(schema_ids)
722        };
723        let filter = user_default_privilege::Column::DatabaseId
724            .eq(database_id)
725            .and(schema_filter)
726            .and(user_default_privilege::Column::UserId.is_in(user_ids))
727            .and(user_default_privilege::Column::ObjectType.eq(ObjectType::from(object_type)))
728            .and(user_default_privilege::Column::Grantee.is_in(grantees))
729            .and(
730                user_default_privilege::Column::Action
731                    .is_in(actions.iter().map(|&a| Action::from(a))),
732            );
733
734        if revoke_grant_option {
735            // update the `with_grant_option` field to false
736            let res = UserDefaultPrivilege::update_many()
737                .col_expr(
738                    user_default_privilege::Column::WithGrantOption,
739                    SimpleExpr::Value(Value::Bool(Some(false))),
740                )
741                .filter(filter.and(user_default_privilege::Column::WithGrantOption.eq(true)))
742                .exec(&txn)
743                .await?;
744            tracing::info!(
745                "revoke {count} grant option for default privileges",
746                count = res.rows_affected
747            );
748        } else {
749            let res = UserDefaultPrivilege::delete_many()
750                .filter(filter)
751                .exec(&txn)
752                .await?;
753            tracing::info!(
754                "revoke {count} default privileges",
755                count = res.rows_affected
756            );
757        }
758
759        txn.commit().await?;
760        Ok(())
761    }
762}
763
764#[cfg(test)]
765mod tests {
766    use risingwave_pb::user::grant_privilege::{PbActionWithGrantOption, PbObject};
767
768    use super::*;
769    use crate::manager::MetaSrvEnv;
770
771    const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
772    const TEST_ROOT_USER_ID: UserId = 1;
773
774    fn make_test_user(name: &str) -> PbUserInfo {
775        PbUserInfo {
776            name: name.to_owned(),
777            ..Default::default()
778        }
779    }
780
781    fn make_privilege(
782        object: PbObject,
783        actions: &[PbAction],
784        with_grant_option: bool,
785    ) -> PbGrantPrivilege {
786        PbGrantPrivilege {
787            object: Some(object),
788            action_with_opts: actions
789                .iter()
790                .map(|&action| PbActionWithGrantOption {
791                    action: action as _,
792                    with_grant_option,
793                    ..Default::default()
794                })
795                .collect(),
796        }
797    }
798
799    #[tokio::test]
800    async fn test_user_and_privilege() -> MetaResult<()> {
801        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
802        mgr.create_user(make_test_user("test_user_1")).await?;
803        mgr.create_user(make_test_user("test_user_2")).await?;
804        let user_1 = mgr.get_user_by_name("test_user_1").await?;
805        let user_2 = mgr.get_user_by_name("test_user_2").await?;
806
807        assert!(
808            mgr.create_user(make_test_user("test_user_1"))
809                .await
810                .is_err(),
811            "user_1 already exists"
812        );
813        mgr.update_user(
814            PbUserInfo {
815                id: user_1.user_id as _,
816                name: "test_user_1_new".to_owned(),
817                ..Default::default()
818            },
819            &[PbUpdateField::Rename],
820        )
821        .await?;
822        let user_1 = mgr.get_user(user_1.user_id).await?;
823        assert_eq!(user_1.name, "test_user_1_new".to_owned());
824
825        let conn_with_option = make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Connect], true);
826        let create_without_option =
827            make_privilege(TEST_DATABASE_ID.into(), &[PbAction::Create], false);
828        // ROOT grant CONN with grant option to user_1.
829        mgr.grant_privilege(
830            vec![user_1.user_id],
831            std::slice::from_ref(&conn_with_option),
832            TEST_ROOT_USER_ID,
833            true,
834        )
835        .await?;
836        // ROOT grant CREATE without grant option to user_1.
837        mgr.grant_privilege(
838            vec![user_1.user_id],
839            std::slice::from_ref(&create_without_option),
840            TEST_ROOT_USER_ID,
841            false,
842        )
843        .await?;
844        // user_1 grant CONN with grant option to user_2.
845        mgr.grant_privilege(
846            vec![user_2.user_id],
847            std::slice::from_ref(&conn_with_option),
848            user_1.user_id,
849            true,
850        )
851        .await?;
852        mgr.grant_privilege(
853            vec![user_2.user_id],
854            std::slice::from_ref(&create_without_option),
855            user_1.user_id,
856            false,
857        )
858        .await?;
859
860        assert!(
861            mgr.drop_user(user_1.user_id).await.is_err(),
862            "user_1 can't be dropped"
863        );
864
865        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
866        assert_eq!(privilege_1.len(), 2);
867        assert!(
868            privilege_1
869                .iter()
870                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
871                    && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID as u32)
872        );
873
874        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
875        assert_eq!(privilege_2.len(), 1);
876        assert!(
877            privilege_2
878                .iter()
879                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
880                    && gp.action_with_opts[0].granted_by == user_1.user_id as u32
881                    && gp.action_with_opts[0].with_grant_option)
882        );
883
884        // revoke privilege for others by non-super user.
885        assert!(
886            mgr.revoke_privilege(
887                vec![user_1.user_id],
888                std::slice::from_ref(&conn_with_option),
889                TEST_ROOT_USER_ID,
890                user_2.user_id,
891                false,
892                false
893            )
894            .await
895            .is_err(),
896            "user_2 can't revoke for user_1"
897        );
898
899        // revoke privilege without grant option.
900        assert!(
901            mgr.revoke_privilege(
902                vec![user_2.user_id],
903                std::slice::from_ref(&create_without_option),
904                user_1.user_id,
905                user_1.user_id,
906                false,
907                false
908            )
909            .await
910            .is_err(),
911            "user_2 don't have grant option for CREATE"
912        );
913
914        // revoke referred privilege in restrict mode.
915        assert!(
916            mgr.revoke_privilege(
917                vec![user_1.user_id],
918                std::slice::from_ref(&conn_with_option),
919                TEST_ROOT_USER_ID,
920                TEST_ROOT_USER_ID,
921                false,
922                false
923            )
924            .await
925            .is_err(),
926            "permission deny in restrict mode, CONN granted to user_2"
927        );
928
929        // revoke non-referred privilege in restrict mode.
930        mgr.revoke_privilege(
931            vec![user_1.user_id],
932            std::slice::from_ref(&create_without_option),
933            TEST_ROOT_USER_ID,
934            TEST_ROOT_USER_ID,
935            false,
936            false,
937        )
938        .await?;
939
940        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
941        assert_eq!(privilege_1.len(), 1);
942        assert!(
943            privilege_1
944                .iter()
945                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
946                    && gp.action_with_opts[0].action == PbAction::Connect as i32)
947        );
948
949        // revoke grant option for referred privilege in cascade mode.
950        mgr.revoke_privilege(
951            vec![user_1.user_id],
952            std::slice::from_ref(&conn_with_option),
953            TEST_ROOT_USER_ID,
954            TEST_ROOT_USER_ID,
955            true,
956            true,
957        )
958        .await?;
959        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
960        assert_eq!(privilege_1.len(), 1);
961        assert!(
962            privilege_1
963                .iter()
964                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
965                    && gp.action_with_opts[0].action == PbAction::Connect as i32
966                    && !gp.action_with_opts[0].with_grant_option)
967        );
968        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
969        assert_eq!(privilege_2.len(), 1);
970        assert!(
971            privilege_2
972                .iter()
973                .all(|gp| gp.object == Some(TEST_DATABASE_ID.into())
974                    && gp.action_with_opts[0].action == PbAction::Connect as i32
975                    && !gp.action_with_opts[0].with_grant_option)
976        );
977
978        // revoke referred privilege in cascade mode.
979        mgr.revoke_privilege(
980            vec![user_1.user_id],
981            std::slice::from_ref(&conn_with_option),
982            TEST_ROOT_USER_ID,
983            TEST_ROOT_USER_ID,
984            false,
985            true,
986        )
987        .await?;
988        let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?;
989        assert!(privilege_1.is_empty());
990        let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?;
991        assert!(privilege_2.is_empty());
992
993        mgr.drop_user(user_1.user_id).await?;
994        mgr.drop_user(user_2.user_id).await?;
995        Ok(())
996    }
997}