risingwave_frontend/handler/
handle_privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use itertools::Itertools;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::acl;
20use risingwave_common::catalog::is_system_schema;
21use risingwave_pb::common::PbObjectType;
22use risingwave_pb::user::alter_default_privilege_request::{
23    Operation as AlterDefaultPrivilegeOperation, PbGrantPrivilege as OpGrantPrivilege,
24    PbRevokePrivilege as OpRevokePrivilege,
25};
26use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, PbObject};
27use risingwave_pb::user::{PbAction, PbGrantPrivilege};
28use risingwave_sqlparser::ast::{
29    DefaultPrivilegeOperation, GrantObjects, Ident, PrivilegeObjectType, Privileges, Statement,
30};
31
32use super::RwPgResponse;
33use crate::bind_data_type;
34use crate::binder::Binder;
35use crate::catalog::CatalogError;
36use crate::catalog::root_catalog::SchemaPath;
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::HandlerArgs;
40use crate::session::SessionImpl;
41use crate::user::UserId;
42use crate::user::user_privilege::{
43    available_privilege_actions, check_privilege_type, get_prost_action,
44};
45
46fn make_prost_privilege(
47    session: &SessionImpl,
48    privileges: Privileges,
49    objects: GrantObjects,
50) -> Result<Vec<PbGrantPrivilege>> {
51    check_privilege_type(&privileges, &objects)?;
52
53    let catalog_reader = session.env().catalog_reader();
54    let reader = catalog_reader.read_guard();
55    let actions = match privileges {
56        Privileges::All { .. } => available_privilege_actions(&objects)?,
57        Privileges::Actions(actions) => actions
58            .into_iter()
59            .map(|action| get_prost_action(&action))
60            .collect(),
61    };
62    let mut grant_objs = vec![];
63    match objects {
64        GrantObjects::Databases(databases) => {
65            for db in databases {
66                let database_name = Binder::resolve_database_name(db)?;
67                let database = reader.get_database_by_name(&database_name)?;
68                grant_objs.push(PbObject::DatabaseId(database.id()));
69            }
70        }
71        GrantObjects::Schemas(schemas) => {
72            for schema in schemas {
73                let schema_name = Binder::resolve_schema_name(schema)?;
74                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
75                grant_objs.push(PbObject::SchemaId(schema.id()));
76            }
77        }
78        GrantObjects::Mviews(tables) => {
79            let db_name = &session.database();
80            let search_path = session.config().search_path();
81            let user_name = &session.user_name();
82
83            for name in tables {
84                let (schema_name, table_name) =
85                    Binder::resolve_schema_qualified_name(db_name, &name)?;
86                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
87
88                let (table, _) =
89                    reader.get_created_table_by_name(db_name, schema_path, &table_name)?;
90                match table.table_type() {
91                    TableType::MaterializedView => {}
92                    _ => {
93                        return Err(ErrorCode::InvalidInputSyntax(format!(
94                            "{table_name} is not a materialized view",
95                        ))
96                        .into());
97                    }
98                }
99                grant_objs.push(PbObject::TableId(table.id().table_id));
100            }
101        }
102        GrantObjects::Tables(tables) => {
103            let db_name = &session.database();
104            let search_path = session.config().search_path();
105            let user_name = &session.user_name();
106
107            for name in tables {
108                let (schema_name, table_name) =
109                    Binder::resolve_schema_qualified_name(db_name, &name)?;
110                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
111
112                match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
113                    Ok((table, _)) => {
114                        match table.table_type() {
115                            TableType::Table => {
116                                grant_objs.push(PbObject::TableId(table.id().table_id));
117                                continue;
118                            }
119                            _ => {
120                                return Err(ErrorCode::InvalidInputSyntax(format!(
121                                    "{table_name} is not a table",
122                                ))
123                                .into());
124                            }
125                        };
126                    }
127                    Err(CatalogError::NotFound("table", _)) => {
128                        let (view, _) = reader
129                            .get_view_by_name(db_name, schema_path, &table_name)
130                            .map_err(|_| CatalogError::NotFound("table", table_name))?;
131                        grant_objs.push(PbObject::ViewId(view.id));
132                    }
133                    Err(e) => {
134                        return Err(e.into());
135                    }
136                }
137            }
138        }
139        GrantObjects::Sources(sources) => {
140            let db_name = &session.database();
141            let search_path = session.config().search_path();
142            let user_name = &session.user_name();
143
144            for name in sources {
145                let (schema_name, source_name) =
146                    Binder::resolve_schema_qualified_name(db_name, &name)?;
147                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
148
149                let (source, _) = reader.get_source_by_name(db_name, schema_path, &source_name)?;
150                grant_objs.push(PbObject::SourceId(source.id));
151            }
152        }
153        GrantObjects::Sinks(sinks) => {
154            let db_name = &session.database();
155            let search_path = session.config().search_path();
156            let user_name = &session.user_name();
157
158            for name in sinks {
159                let (schema_name, sink_name) =
160                    Binder::resolve_schema_qualified_name(db_name, &name)?;
161                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
162
163                let (sink, _) =
164                    reader.get_created_sink_by_name(db_name, schema_path, &sink_name)?;
165                grant_objs.push(PbObject::SinkId(sink.id.sink_id));
166            }
167        }
168        GrantObjects::Views(views) => {
169            let db_name = &session.database();
170            let search_path = session.config().search_path();
171            let user_name = &session.user_name();
172
173            for name in views {
174                let (schema_name, view_name) =
175                    Binder::resolve_schema_qualified_name(db_name, &name)?;
176                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
177
178                let (view, _) = reader.get_view_by_name(db_name, schema_path, &view_name)?;
179                grant_objs.push(PbObject::ViewId(view.id));
180            }
181        }
182        GrantObjects::Connections(conns) => {
183            let db_name = &session.database();
184            let search_path = session.config().search_path();
185            let user_name = &session.user_name();
186
187            for name in conns {
188                let (schema_name, conn_name) =
189                    Binder::resolve_schema_qualified_name(db_name, &name)?;
190                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
191
192                let (conn, _) = reader.get_connection_by_name(db_name, schema_path, &conn_name)?;
193                grant_objs.push(PbObject::ConnectionId(conn.id));
194            }
195        }
196        GrantObjects::Subscriptions(subscriptions) => {
197            let db_name = &session.database();
198            let search_path = session.config().search_path();
199            let user_name = &session.user_name();
200
201            for name in subscriptions {
202                let (schema_name, sub_name) =
203                    Binder::resolve_schema_qualified_name(db_name, &name)?;
204                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
205
206                let (sub, _) = reader.get_subscription_by_name(db_name, schema_path, &sub_name)?;
207                grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
208            }
209        }
210        GrantObjects::Functions(func_descs) => {
211            let db_name = &session.database();
212            let search_path = session.config().search_path();
213            let user_name = &session.user_name();
214
215            for func_desc in func_descs {
216                let (schema_name, func_name) =
217                    Binder::resolve_schema_qualified_name(db_name, &func_desc.name)?;
218                let arg_types = match func_desc.args {
219                    Some(args) => {
220                        let mut arg_types = vec![];
221                        for arg in args {
222                            arg_types.push(bind_data_type(&arg.data_type)?);
223                        }
224                        Some(arg_types)
225                    }
226                    None => None,
227                };
228                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
229
230                let (func, _) = match arg_types {
231                    Some(arg_types) => reader.get_function_by_name_args(
232                        db_name,
233                        schema_path,
234                        &func_name,
235                        &arg_types,
236                    )?,
237                    None => {
238                        let (functions, schema_name) =
239                            reader.get_functions_by_name(db_name, schema_path, &func_name)?;
240                        if functions.len() > 1 {
241                            return Err(ErrorCode::CatalogError(format!(
242                                "function name {func_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously."
243                            ).into()).into());
244                        }
245                        (
246                            functions.into_iter().next().expect("no functions"),
247                            schema_name,
248                        )
249                    }
250                };
251                grant_objs.push(PbObject::FunctionId(func.id.function_id()));
252            }
253        }
254        GrantObjects::Secrets(secrets) => {
255            let db_name = &session.database();
256            let search_path = session.config().search_path();
257            let user_name = &session.user_name();
258
259            for name in secrets {
260                let (schema_name, secret_name) =
261                    Binder::resolve_schema_qualified_name(db_name, &name)?;
262                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
263
264                let (secret, _) = reader.get_secret_by_name(db_name, schema_path, &secret_name)?;
265                grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
266            }
267        }
268        GrantObjects::AllSourcesInSchema { schemas } => {
269            for schema in schemas {
270                let schema_name = Binder::resolve_schema_name(schema)?;
271                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
272                schema.iter_source().for_each(|source| {
273                    grant_objs.push(PbObject::SourceId(source.id));
274                });
275            }
276        }
277        GrantObjects::AllMviewsInSchema { schemas } => {
278            for schema in schemas {
279                let schema_name = Binder::resolve_schema_name(schema)?;
280                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
281                schema.iter_all_mvs().for_each(|mview| {
282                    grant_objs.push(PbObject::TableId(mview.id().table_id));
283                });
284            }
285        }
286        GrantObjects::AllTablesInSchema { schemas } => {
287            for schema in schemas {
288                let schema_name = Binder::resolve_schema_name(schema)?;
289                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
290                schema.iter_user_table().for_each(|table| {
291                    grant_objs.push(PbObject::TableId(table.id().table_id));
292                });
293            }
294        }
295        GrantObjects::AllSinksInSchema { schemas } => {
296            for schema in schemas {
297                let schema_name = Binder::resolve_schema_name(schema)?;
298                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
299                schema.iter_sink().for_each(|sink| {
300                    grant_objs.push(PbObject::SinkId(sink.id.sink_id));
301                });
302            }
303        }
304        GrantObjects::AllViewsInSchema { schemas } => {
305            for schema in schemas {
306                let schema_name = Binder::resolve_schema_name(schema)?;
307                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
308                schema.iter_view().for_each(|view| {
309                    grant_objs.push(PbObject::ViewId(view.id));
310                });
311            }
312        }
313        GrantObjects::AllFunctionsInSchema { schemas } => {
314            for schema in schemas {
315                let schema_name = Binder::resolve_schema_name(schema)?;
316                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
317                schema.iter_function().for_each(|func| {
318                    grant_objs.push(PbObject::FunctionId(func.id.function_id()));
319                });
320            }
321        }
322        GrantObjects::AllSecretsInSchema { schemas } => {
323            for schema in schemas {
324                let schema_name = Binder::resolve_schema_name(schema)?;
325                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
326                schema.iter_secret().for_each(|secret| {
327                    grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
328                });
329            }
330        }
331        GrantObjects::AllSubscriptionsInSchema { schemas } => {
332            for schema in schemas {
333                let schema_name = Binder::resolve_schema_name(schema)?;
334                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
335                schema.iter_subscription().for_each(|sub| {
336                    grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
337                });
338            }
339        }
340        GrantObjects::AllConnectionsInSchema { schemas } => {
341            for schema in schemas {
342                let schema_name = Binder::resolve_schema_name(schema)?;
343                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
344                schema.iter_connections().for_each(|conn| {
345                    grant_objs.push(PbObject::ConnectionId(conn.id));
346                });
347            }
348        }
349        o => {
350            return Err(ErrorCode::InvalidInputSyntax(format!(
351                "GRANT statement does not support object type: {:?}",
352                o
353            ))
354            .into());
355        }
356    };
357    let action_with_opts = actions
358        .into_iter()
359        .map(|action| ActionWithGrantOption {
360            action: action as i32,
361            granted_by: session.user_id(),
362            ..Default::default()
363        })
364        .collect::<Vec<_>>();
365
366    let mut prost_privileges = vec![];
367    for objs in grant_objs {
368        prost_privileges.push(PbGrantPrivilege {
369            action_with_opts: action_with_opts.clone(),
370            object: Some(objs),
371        });
372    }
373    Ok(prost_privileges)
374}
375
376/// Bind user from idents to user ids.
377fn bind_user_from_idents(session: &SessionImpl, names: Vec<Ident>) -> Result<Vec<UserId>> {
378    let user_reader = session.env().user_info_reader();
379    let reader = user_reader.read_guard();
380    let mut users = HashSet::new();
381    for name in &names {
382        if let Some(user) = reader.get_user_by_name(&name.real_value()) {
383            users.insert(user.id);
384        } else {
385            return Err(ErrorCode::InvalidInputSyntax(
386                format!("User \"{name}\" does not exist").to_owned(),
387            )
388            .into());
389        }
390    }
391    Ok(users.into_iter().collect())
392}
393
394fn derive_object_type(object_type: &PrivilegeObjectType) -> PbObjectType {
395    match object_type {
396        PrivilegeObjectType::Schemas => PbObjectType::Schema,
397        PrivilegeObjectType::Tables => PbObjectType::Table,
398        PrivilegeObjectType::Views => PbObjectType::View,
399        PrivilegeObjectType::Mviews => PbObjectType::Mview,
400        PrivilegeObjectType::Sources => PbObjectType::Source,
401        PrivilegeObjectType::Sinks => PbObjectType::Sink,
402        PrivilegeObjectType::Functions => PbObjectType::Function,
403        PrivilegeObjectType::Secrets => PbObjectType::Secret,
404        PrivilegeObjectType::Subscriptions => PbObjectType::Subscription,
405        PrivilegeObjectType::Connections => PbObjectType::Connection,
406    }
407}
408
409fn make_prost_actions(
410    privileges: Privileges,
411    object_type: &PrivilegeObjectType,
412) -> Result<Vec<PbAction>> {
413    let all_acls = match object_type {
414        PrivilegeObjectType::Tables => &acl::ALL_AVAILABLE_TABLE_MODES,
415        PrivilegeObjectType::Sources => &acl::ALL_AVAILABLE_SOURCE_MODES,
416        PrivilegeObjectType::Sinks => &acl::ALL_AVAILABLE_SINK_MODES,
417        PrivilegeObjectType::Mviews => &acl::ALL_AVAILABLE_MVIEW_MODES,
418        PrivilegeObjectType::Views => &acl::ALL_AVAILABLE_VIEW_MODES,
419        PrivilegeObjectType::Functions => &acl::ALL_AVAILABLE_FUNCTION_MODES,
420        PrivilegeObjectType::Connections => &acl::ALL_AVAILABLE_CONNECTION_MODES,
421        PrivilegeObjectType::Secrets => &acl::ALL_AVAILABLE_SECRET_MODES,
422        PrivilegeObjectType::Subscriptions => &acl::ALL_AVAILABLE_SUBSCRIPTION_MODES,
423        PrivilegeObjectType::Schemas => &acl::ALL_AVAILABLE_SCHEMA_MODES,
424    };
425
426    match privileges {
427        Privileges::All { .. } => Ok(all_acls.iter().map(Into::into).collect()),
428        Privileges::Actions(actions) => {
429            let actions = actions
430                .into_iter()
431                .map(|action| get_prost_action(&action))
432                .collect::<Vec<_>>();
433            for action in &actions {
434                if !all_acls.has_mode((*action).into()) {
435                    return Err(ErrorCode::InvalidInputSyntax(format!(
436                        "Invalid privilege type for the given object: {action:?}"
437                    ))
438                    .into());
439                }
440            }
441            Ok(actions)
442        }
443    }
444}
445
446pub async fn handle_grant_privilege(
447    handler_args: HandlerArgs,
448    stmt: Statement,
449) -> Result<RwPgResponse> {
450    let session = handler_args.session;
451    let Statement::Grant {
452        privileges,
453        objects,
454        grantees,
455        with_grant_option,
456        granted_by,
457    } = stmt
458    else {
459        return Err(ErrorCode::BindError("Invalid grant statement".to_owned()).into());
460    };
461    let users = bind_user_from_idents(&session, grantees)?;
462    if let Some(granted_by) = &granted_by {
463        let user_reader = session.env().user_info_reader();
464        let reader = user_reader.read_guard();
465
466        // We remark that the user name is always case-sensitive.
467        if reader.get_user_by_name(&granted_by.real_value()).is_none() {
468            return Err(ErrorCode::InvalidInputSyntax(
469                format!("Grantor \"{granted_by}\" does not exist").to_owned(),
470            )
471            .into());
472        }
473    }
474
475    let privileges = make_prost_privilege(&session, privileges, objects)?;
476    let user_info_writer = session.user_info_writer()?;
477    user_info_writer
478        .grant_privilege(users, privileges, with_grant_option, session.user_id())
479        .await?;
480    Ok(PgResponse::empty_result(StatementType::GRANT_PRIVILEGE))
481}
482
483pub async fn handle_revoke_privilege(
484    handler_args: HandlerArgs,
485    stmt: Statement,
486) -> Result<RwPgResponse> {
487    let session = handler_args.session;
488    let Statement::Revoke {
489        privileges,
490        objects,
491        grantees,
492        granted_by,
493        revoke_grant_option,
494        cascade,
495    } = stmt
496    else {
497        return Err(ErrorCode::BindError("Invalid revoke statement".to_owned()).into());
498    };
499    let users = bind_user_from_idents(&session, grantees)?;
500    let mut granted_by_id = None;
501    if let Some(granted_by) = &granted_by {
502        let user_reader = session.env().user_info_reader();
503        let reader = user_reader.read_guard();
504
505        if let Some(user) = reader.get_user_by_name(&granted_by.real_value()) {
506            granted_by_id = Some(user.id);
507        } else {
508            return Err(ErrorCode::InvalidInputSyntax(
509                format!("Grantor \"{granted_by}\" does not exist").to_owned(),
510            )
511            .into());
512        }
513    }
514    let privileges = make_prost_privilege(&session, privileges, objects)?;
515    let user_info_writer = session.user_info_writer()?;
516    user_info_writer
517        .revoke_privilege(
518            users,
519            privileges,
520            granted_by_id.unwrap_or(session.user_id()),
521            session.user_id(),
522            revoke_grant_option,
523            cascade,
524        )
525        .await?;
526
527    Ok(PgResponse::empty_result(StatementType::REVOKE_PRIVILEGE))
528}
529
530pub async fn handle_alter_default_privileges(
531    handler_args: HandlerArgs,
532    stmt: Statement,
533) -> Result<RwPgResponse> {
534    let session = handler_args.session;
535    let Statement::AlterDefaultPrivileges {
536        target_users,
537        schema_names,
538        operation,
539    } = stmt
540    else {
541        return Err(
542            ErrorCode::BindError("Invalid alter default privileges statement".to_owned()).into(),
543        );
544    };
545
546    // If target users are not specified, use the current user.
547    let users = match target_users {
548        None => vec![session.user_id()],
549        Some(users) => {
550            let users = bind_user_from_idents(&session, users)?;
551            if !session.is_super_user() && users.len() > 1 {
552                return Err(ErrorCode::PermissionDenied(
553                    "Only superuser can alter default privileges for multiple users".to_owned(),
554                )
555                .into());
556            } else if !session.is_super_user()
557                && users.iter().any(|user| *user != session.user_id())
558            {
559                return Err(ErrorCode::PermissionDenied(
560                    "Only superuser can alter default privileges for other users".to_owned(),
561                )
562                .into());
563            }
564            users
565        }
566    };
567
568    // If schema names are not specified,
569    // users will be grant/revoke privileges on all schemas in the current database.
570    let schemas = match schema_names {
571        None => {
572            if !operation.for_schemas() {
573                let catalog_reader = session.env().catalog_reader();
574                let reader = catalog_reader.read_guard();
575                let schemas = reader
576                    .get_database_by_id(&session.database_id())?
577                    .iter_schemas()
578                    .filter(|schema| !is_system_schema(&schema.name))
579                    .map(|schema| schema.id())
580                    .collect_vec();
581                if schemas.is_empty() {
582                    return Ok(PgResponse::empty_result(
583                        StatementType::ALTER_DEFAULT_PRIVILEGES,
584                    ));
585                }
586                schemas
587            } else {
588                vec![]
589            }
590        }
591        Some(names) => {
592            let catalog_reader = session.env().catalog_reader();
593            let reader = catalog_reader.read_guard();
594            let mut schemas = vec![];
595            for name in names {
596                let schema_name = Binder::resolve_schema_name(name)?;
597                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
598                schemas.push(schema.id());
599            }
600            schemas
601        }
602    };
603
604    let alter_operation = match operation {
605        DefaultPrivilegeOperation::Grant {
606            privileges,
607            object_type,
608            grantees,
609            with_grant_option,
610        } => {
611            let grantees = bind_user_from_idents(&session, grantees)?;
612            AlterDefaultPrivilegeOperation::GrantPrivilege(OpGrantPrivilege {
613                actions: make_prost_actions(privileges, &object_type)?
614                    .into_iter()
615                    .map(|a| a as i32)
616                    .collect(),
617                object_type: derive_object_type(&object_type) as i32,
618                grantees,
619                with_grant_option,
620            })
621        }
622        DefaultPrivilegeOperation::Revoke {
623            privileges,
624            object_type,
625            grantees,
626            revoke_grant_option,
627            ..
628        } => {
629            let grantees = bind_user_from_idents(&session, grantees)?;
630            AlterDefaultPrivilegeOperation::RevokePrivilege(OpRevokePrivilege {
631                actions: make_prost_actions(privileges, &object_type)?
632                    .into_iter()
633                    .map(|a| a as i32)
634                    .collect(),
635                object_type: derive_object_type(&object_type) as i32,
636                grantees,
637                revoke_grant_option,
638            })
639        }
640    };
641
642    let user_info_writer = session.user_info_writer()?;
643    user_info_writer
644        .alter_default_privilege(
645            users,
646            session.database_id(),
647            schemas,
648            alter_operation,
649            session.user_id(),
650        )
651        .await?;
652
653    Ok(PgResponse::empty_result(
654        StatementType::ALTER_DEFAULT_PRIVILEGES,
655    ))
656}
657
658#[cfg(test)]
659mod tests {
660    use risingwave_common::catalog::DEFAULT_SUPER_USER_ID;
661    use risingwave_pb::user::Action;
662
663    use super::*;
664    use crate::test_utils::LocalFrontend;
665
666    #[tokio::test]
667    async fn test_grant_privilege() {
668        let frontend = LocalFrontend::new(Default::default()).await;
669        let session = frontend.session_ref();
670        frontend
671            .run_sql("CREATE USER user WITH SUPERUSER PASSWORD 'password'")
672            .await
673            .unwrap();
674        frontend
675            .run_sql("CREATE USER user1 WITH PASSWORD 'password1'")
676            .await
677            .unwrap();
678        frontend.run_sql("CREATE DATABASE db1").await.unwrap();
679        frontend
680            .run_sql("GRANT ALL ON DATABASE db1 TO user1 WITH GRANT OPTION GRANTED BY user")
681            .await
682            .unwrap();
683
684        let (session_database_id, database_id) = {
685            let catalog_reader = session.env().catalog_reader();
686            let reader = catalog_reader.read_guard();
687            (
688                reader
689                    .get_database_by_name(&session.database())
690                    .unwrap()
691                    .id(),
692                reader.get_database_by_name("db1").unwrap().id(),
693            )
694        };
695
696        {
697            let user_reader = session.env().user_info_reader();
698            let reader = user_reader.read_guard();
699            let user_info = reader.get_user_by_name("user1").unwrap();
700            assert_eq!(
701                user_info.grant_privileges,
702                vec![
703                    PbGrantPrivilege {
704                        action_with_opts: vec![ActionWithGrantOption {
705                            action: Action::Connect as i32,
706                            with_grant_option: true,
707                            granted_by: session.user_id(),
708                        }],
709                        object: Some(PbObject::DatabaseId(session_database_id)),
710                    },
711                    PbGrantPrivilege {
712                        action_with_opts: vec![
713                            ActionWithGrantOption {
714                                action: Action::Create as i32,
715                                with_grant_option: true,
716                                granted_by: DEFAULT_SUPER_USER_ID,
717                            },
718                            ActionWithGrantOption {
719                                action: Action::Connect as i32,
720                                with_grant_option: true,
721                                granted_by: DEFAULT_SUPER_USER_ID,
722                            }
723                        ],
724                        object: Some(PbObject::DatabaseId(database_id)),
725                    }
726                ]
727            );
728        }
729
730        frontend
731            .run_sql("REVOKE GRANT OPTION FOR ALL ON DATABASE db1 from user1 GRANTED BY user")
732            .await
733            .unwrap();
734        {
735            let user_reader = session.env().user_info_reader();
736            let reader = user_reader.read_guard();
737            let user_info = reader.get_user_by_name("user1").unwrap();
738            assert!(
739                user_info
740                    .grant_privileges
741                    .iter()
742                    .filter(|gp| gp.object == Some(PbObject::DatabaseId(database_id)))
743                    .all(|p| p.action_with_opts.iter().all(|ao| !ao.with_grant_option))
744            );
745        }
746
747        frontend
748            .run_sql("REVOKE ALL ON DATABASE db1 from user1 GRANTED BY user")
749            .await
750            .unwrap();
751        {
752            let user_reader = session.env().user_info_reader();
753            let reader = user_reader.read_guard();
754            let user_info = reader.get_user_by_name("user1").unwrap();
755            assert_eq!(
756                user_info.grant_privileges,
757                vec![PbGrantPrivilege {
758                    action_with_opts: vec![ActionWithGrantOption {
759                        action: Action::Connect as i32,
760                        with_grant_option: true,
761                        granted_by: session.user_id(),
762                    }],
763                    object: Some(PbObject::DatabaseId(session_database_id)),
764                }]
765            );
766        }
767        frontend.run_sql("DROP USER user1").await.unwrap();
768    }
769}