risingwave_frontend/handler/
handle_privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use itertools::Itertools;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::acl;
20use risingwave_common::catalog::is_system_schema;
21use risingwave_pb::common::PbObjectType;
22use risingwave_pb::user::alter_default_privilege_request::{
23    Operation as AlterDefaultPrivilegeOperation, PbGrantPrivilege as OpGrantPrivilege,
24    PbRevokePrivilege as OpRevokePrivilege,
25};
26use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, PbObject};
27use risingwave_pb::user::{PbAction, PbGrantPrivilege};
28use risingwave_sqlparser::ast::{
29    DefaultPrivilegeOperation, GrantObjects, Ident, PrivilegeObjectType, Privileges, Statement,
30};
31
32use super::RwPgResponse;
33use crate::bind_data_type;
34use crate::binder::Binder;
35use crate::catalog::CatalogError;
36use crate::catalog::root_catalog::SchemaPath;
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::HandlerArgs;
40use crate::session::SessionImpl;
41use crate::user::UserId;
42use crate::user::user_privilege::{
43    available_privilege_actions, check_privilege_type, get_prost_action,
44};
45
46fn make_prost_privilege(
47    session: &SessionImpl,
48    privileges: Privileges,
49    objects: GrantObjects,
50) -> Result<Vec<PbGrantPrivilege>> {
51    check_privilege_type(&privileges, &objects)?;
52
53    let catalog_reader = session.env().catalog_reader();
54    let reader = catalog_reader.read_guard();
55    let actions = match privileges {
56        Privileges::All { .. } => available_privilege_actions(&objects)?,
57        Privileges::Actions(actions) => actions
58            .into_iter()
59            .map(|action| get_prost_action(&action))
60            .collect(),
61    };
62    let mut grant_objs = vec![];
63    match objects {
64        GrantObjects::Databases(databases) => {
65            for db in databases {
66                let database_name = Binder::resolve_database_name(db)?;
67                let database = reader.get_database_by_name(&database_name)?;
68                grant_objs.push(PbObject::DatabaseId(database.id()));
69            }
70        }
71        GrantObjects::Schemas(schemas) => {
72            for schema in schemas {
73                let schema_name = Binder::resolve_schema_name(schema)?;
74                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
75                grant_objs.push(PbObject::SchemaId(schema.id()));
76            }
77        }
78        GrantObjects::Mviews(tables) => {
79            let db_name = &session.database();
80            let search_path = session.config().search_path();
81            let user_name = &session.user_name();
82
83            for name in tables {
84                let (schema_name, table_name) =
85                    Binder::resolve_schema_qualified_name(db_name, &name)?;
86                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
87
88                let (table, _) =
89                    reader.get_created_table_by_name(db_name, schema_path, &table_name)?;
90                match table.table_type() {
91                    TableType::MaterializedView => {}
92                    _ => {
93                        return Err(ErrorCode::InvalidInputSyntax(format!(
94                            "{table_name} is not a materialized view",
95                        ))
96                        .into());
97                    }
98                }
99                grant_objs.push(PbObject::TableId(table.id().table_id));
100            }
101        }
102        GrantObjects::Tables(tables) => {
103            let db_name = &session.database();
104            let search_path = session.config().search_path();
105            let user_name = &session.user_name();
106
107            for name in tables {
108                let (schema_name, table_name) =
109                    Binder::resolve_schema_qualified_name(db_name, &name)?;
110                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
111
112                match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
113                    Ok((table, _)) => {
114                        match table.table_type() {
115                            TableType::Table => {
116                                grant_objs.push(PbObject::TableId(table.id().table_id));
117                                continue;
118                            }
119                            _ => {
120                                return Err(ErrorCode::InvalidInputSyntax(format!(
121                                    "{table_name} is not a table",
122                                ))
123                                .into());
124                            }
125                        };
126                    }
127                    Err(CatalogError::NotFound("table", _)) => {
128                        let (view, _) = reader
129                            .get_view_by_name(db_name, schema_path, &table_name)
130                            .map_err(|_| CatalogError::NotFound("table", table_name))?;
131                        grant_objs.push(PbObject::ViewId(view.id));
132                    }
133                    Err(e) => {
134                        return Err(e.into());
135                    }
136                }
137            }
138        }
139        GrantObjects::Sources(sources) => {
140            let db_name = &session.database();
141            let search_path = session.config().search_path();
142            let user_name = &session.user_name();
143
144            for name in sources {
145                let (schema_name, source_name) =
146                    Binder::resolve_schema_qualified_name(db_name, &name)?;
147                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
148
149                let (source, _) = reader.get_source_by_name(db_name, schema_path, &source_name)?;
150                grant_objs.push(PbObject::SourceId(source.id));
151            }
152        }
153        GrantObjects::Sinks(sinks) => {
154            let db_name = &session.database();
155            let search_path = session.config().search_path();
156            let user_name = &session.user_name();
157
158            for name in sinks {
159                let (schema_name, sink_name) =
160                    Binder::resolve_schema_qualified_name(db_name, &name)?;
161                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
162
163                let (sink, _) =
164                    reader.get_created_sink_by_name(db_name, schema_path, &sink_name)?;
165                grant_objs.push(PbObject::SinkId(sink.id.sink_id));
166            }
167        }
168        GrantObjects::Views(views) => {
169            let db_name = &session.database();
170            let search_path = session.config().search_path();
171            let user_name = &session.user_name();
172
173            for name in views {
174                let (schema_name, view_name) =
175                    Binder::resolve_schema_qualified_name(db_name, &name)?;
176                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
177
178                let (view, _) = reader.get_view_by_name(db_name, schema_path, &view_name)?;
179                grant_objs.push(PbObject::ViewId(view.id));
180            }
181        }
182        GrantObjects::Connections(conns) => {
183            let db_name = &session.database();
184            let search_path = session.config().search_path();
185            let user_name = &session.user_name();
186
187            for name in conns {
188                let (schema_name, conn_name) =
189                    Binder::resolve_schema_qualified_name(db_name, &name)?;
190                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
191
192                let (conn, _) = reader.get_connection_by_name(db_name, schema_path, &conn_name)?;
193                grant_objs.push(PbObject::ConnectionId(conn.id));
194            }
195        }
196        GrantObjects::Subscriptions(subscriptions) => {
197            let db_name = &session.database();
198            let search_path = session.config().search_path();
199            let user_name = &session.user_name();
200
201            for name in subscriptions {
202                let (schema_name, sub_name) =
203                    Binder::resolve_schema_qualified_name(db_name, &name)?;
204                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
205
206                let (sub, _) = reader.get_subscription_by_name(db_name, schema_path, &sub_name)?;
207                grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
208            }
209        }
210        GrantObjects::Functions(func_descs) => {
211            let db_name = &session.database();
212            let search_path = session.config().search_path();
213            let user_name = &session.user_name();
214
215            for func_desc in func_descs {
216                let (schema_name, func_name) =
217                    Binder::resolve_schema_qualified_name(db_name, &func_desc.name)?;
218                let arg_types = match func_desc.args {
219                    Some(args) => {
220                        let mut arg_types = vec![];
221                        for arg in args {
222                            arg_types.push(bind_data_type(&arg.data_type)?);
223                        }
224                        Some(arg_types)
225                    }
226                    None => None,
227                };
228                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
229
230                let (func, _) = match arg_types {
231                    Some(arg_types) => reader.get_function_by_name_args(
232                        db_name,
233                        schema_path,
234                        &func_name,
235                        &arg_types,
236                    )?,
237                    None => {
238                        let (functions, schema_name) =
239                            reader.get_functions_by_name(db_name, schema_path, &func_name)?;
240                        if functions.len() > 1 {
241                            return Err(ErrorCode::CatalogError(format!(
242                                "function name {func_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously."
243                            ).into()).into());
244                        }
245                        (
246                            functions.into_iter().next().expect("no functions"),
247                            schema_name,
248                        )
249                    }
250                };
251                grant_objs.push(PbObject::FunctionId(func.id.function_id()));
252            }
253        }
254        GrantObjects::Secrets(secrets) => {
255            let db_name = &session.database();
256            let search_path = session.config().search_path();
257            let user_name = &session.user_name();
258
259            for name in secrets {
260                let (schema_name, secret_name) =
261                    Binder::resolve_schema_qualified_name(db_name, &name)?;
262                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
263
264                let (secret, _) = reader.get_secret_by_name(db_name, schema_path, &secret_name)?;
265                grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
266            }
267        }
268        GrantObjects::AllSourcesInSchema { schemas } => {
269            for schema in schemas {
270                let schema_name = Binder::resolve_schema_name(schema)?;
271                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
272                schema.iter_source().for_each(|source| {
273                    grant_objs.push(PbObject::SourceId(source.id));
274                });
275            }
276        }
277        GrantObjects::AllMviewsInSchema { schemas } => {
278            for schema in schemas {
279                let schema_name = Binder::resolve_schema_name(schema)?;
280                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
281                schema.iter_all_mvs().for_each(|mview| {
282                    grant_objs.push(PbObject::TableId(mview.id().table_id));
283                });
284            }
285        }
286        GrantObjects::AllTablesInSchema { schemas } => {
287            for schema in schemas {
288                let schema_name = Binder::resolve_schema_name(schema)?;
289                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
290                schema.iter_user_table().for_each(|table| {
291                    grant_objs.push(PbObject::TableId(table.id().table_id));
292                });
293            }
294        }
295        GrantObjects::AllSinksInSchema { schemas } => {
296            for schema in schemas {
297                let schema_name = Binder::resolve_schema_name(schema)?;
298                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
299                schema.iter_sink().for_each(|sink| {
300                    grant_objs.push(PbObject::SinkId(sink.id.sink_id));
301                });
302            }
303        }
304        GrantObjects::AllViewsInSchema { schemas } => {
305            for schema in schemas {
306                let schema_name = Binder::resolve_schema_name(schema)?;
307                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
308                schema.iter_view().for_each(|view| {
309                    grant_objs.push(PbObject::ViewId(view.id));
310                });
311            }
312        }
313        GrantObjects::AllFunctionsInSchema { schemas } => {
314            for schema in schemas {
315                let schema_name = Binder::resolve_schema_name(schema)?;
316                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
317                schema.iter_function().for_each(|func| {
318                    grant_objs.push(PbObject::FunctionId(func.id.function_id()));
319                });
320            }
321        }
322        GrantObjects::AllSecretsInSchema { schemas } => {
323            for schema in schemas {
324                let schema_name = Binder::resolve_schema_name(schema)?;
325                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
326                schema.iter_secret().for_each(|secret| {
327                    grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
328                });
329            }
330        }
331        GrantObjects::AllSubscriptionsInSchema { schemas } => {
332            for schema in schemas {
333                let schema_name = Binder::resolve_schema_name(schema)?;
334                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
335                schema.iter_subscription().for_each(|sub| {
336                    grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
337                });
338            }
339        }
340        GrantObjects::AllConnectionsInSchema { schemas } => {
341            for schema in schemas {
342                let schema_name = Binder::resolve_schema_name(schema)?;
343                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
344                schema.iter_connections().for_each(|conn| {
345                    grant_objs.push(PbObject::ConnectionId(conn.id));
346                });
347            }
348        }
349        o => {
350            return Err(ErrorCode::InvalidInputSyntax(format!(
351                "GRANT statement does not support object type: {:?}",
352                o
353            ))
354            .into());
355        }
356    };
357    let action_with_opts = actions
358        .into_iter()
359        .map(|action| ActionWithGrantOption {
360            action: action as i32,
361            granted_by: session.user_id(),
362            ..Default::default()
363        })
364        .collect::<Vec<_>>();
365
366    let mut prost_privileges = vec![];
367    for objs in grant_objs {
368        prost_privileges.push(PbGrantPrivilege {
369            action_with_opts: action_with_opts.clone(),
370            object: Some(objs),
371        });
372    }
373    Ok(prost_privileges)
374}
375
376/// Bind user from idents to user ids.
377fn bind_user_from_idents(session: &SessionImpl, names: Vec<Ident>) -> Result<Vec<UserId>> {
378    let user_reader = session.env().user_info_reader();
379    let reader = user_reader.read_guard();
380    let mut users = HashSet::new();
381    for name in &names {
382        if let Some(user) = reader.get_user_by_name(&name.real_value()) {
383            users.insert(user.id);
384        } else {
385            return Err(
386                ErrorCode::InvalidInputSyntax(format!("User \"{name}\" does not exist")).into(),
387            );
388        }
389    }
390    Ok(users.into_iter().collect())
391}
392
393fn derive_object_type(object_type: &PrivilegeObjectType) -> PbObjectType {
394    match object_type {
395        PrivilegeObjectType::Schemas => PbObjectType::Schema,
396        PrivilegeObjectType::Tables => PbObjectType::Table,
397        PrivilegeObjectType::Views => PbObjectType::View,
398        PrivilegeObjectType::Mviews => PbObjectType::Mview,
399        PrivilegeObjectType::Sources => PbObjectType::Source,
400        PrivilegeObjectType::Sinks => PbObjectType::Sink,
401        PrivilegeObjectType::Functions => PbObjectType::Function,
402        PrivilegeObjectType::Secrets => PbObjectType::Secret,
403        PrivilegeObjectType::Subscriptions => PbObjectType::Subscription,
404        PrivilegeObjectType::Connections => PbObjectType::Connection,
405    }
406}
407
408fn make_prost_actions(
409    privileges: Privileges,
410    object_type: &PrivilegeObjectType,
411) -> Result<Vec<PbAction>> {
412    let all_acls = match object_type {
413        PrivilegeObjectType::Tables => &acl::ALL_AVAILABLE_TABLE_MODES,
414        PrivilegeObjectType::Sources => &acl::ALL_AVAILABLE_SOURCE_MODES,
415        PrivilegeObjectType::Sinks => &acl::ALL_AVAILABLE_SINK_MODES,
416        PrivilegeObjectType::Mviews => &acl::ALL_AVAILABLE_MVIEW_MODES,
417        PrivilegeObjectType::Views => &acl::ALL_AVAILABLE_VIEW_MODES,
418        PrivilegeObjectType::Functions => &acl::ALL_AVAILABLE_FUNCTION_MODES,
419        PrivilegeObjectType::Connections => &acl::ALL_AVAILABLE_CONNECTION_MODES,
420        PrivilegeObjectType::Secrets => &acl::ALL_AVAILABLE_SECRET_MODES,
421        PrivilegeObjectType::Subscriptions => &acl::ALL_AVAILABLE_SUBSCRIPTION_MODES,
422        PrivilegeObjectType::Schemas => &acl::ALL_AVAILABLE_SCHEMA_MODES,
423    };
424
425    match privileges {
426        Privileges::All { .. } => Ok(all_acls.iter().map(Into::into).collect()),
427        Privileges::Actions(actions) => {
428            let actions = actions
429                .into_iter()
430                .map(|action| get_prost_action(&action))
431                .collect::<Vec<_>>();
432            for action in &actions {
433                if !all_acls.has_mode((*action).into()) {
434                    return Err(ErrorCode::InvalidInputSyntax(format!(
435                        "Invalid privilege type for the given object: {action:?}"
436                    ))
437                    .into());
438                }
439            }
440            Ok(actions)
441        }
442    }
443}
444
445pub async fn handle_grant_privilege(
446    handler_args: HandlerArgs,
447    stmt: Statement,
448) -> Result<RwPgResponse> {
449    let session = handler_args.session;
450    let Statement::Grant {
451        privileges,
452        objects,
453        grantees,
454        with_grant_option,
455        granted_by,
456    } = stmt
457    else {
458        return Err(ErrorCode::BindError("Invalid grant statement".to_owned()).into());
459    };
460    let users = bind_user_from_idents(&session, grantees)?;
461    if let Some(granted_by) = &granted_by {
462        let user_reader = session.env().user_info_reader();
463        let reader = user_reader.read_guard();
464
465        // We remark that the user name is always case-sensitive.
466        if reader.get_user_by_name(&granted_by.real_value()).is_none() {
467            return Err(ErrorCode::InvalidInputSyntax(format!(
468                "Grantor \"{granted_by}\" does not exist"
469            ))
470            .into());
471        }
472    }
473
474    let privileges = make_prost_privilege(&session, privileges, objects)?;
475    let user_info_writer = session.user_info_writer()?;
476    user_info_writer
477        .grant_privilege(users, privileges, with_grant_option, session.user_id())
478        .await?;
479    Ok(PgResponse::empty_result(StatementType::GRANT_PRIVILEGE))
480}
481
482pub async fn handle_revoke_privilege(
483    handler_args: HandlerArgs,
484    stmt: Statement,
485) -> Result<RwPgResponse> {
486    let session = handler_args.session;
487    let Statement::Revoke {
488        privileges,
489        objects,
490        grantees,
491        granted_by,
492        revoke_grant_option,
493        cascade,
494    } = stmt
495    else {
496        return Err(ErrorCode::BindError("Invalid revoke statement".to_owned()).into());
497    };
498    let users = bind_user_from_idents(&session, grantees)?;
499    let mut granted_by_id = None;
500    if let Some(granted_by) = &granted_by {
501        let user_reader = session.env().user_info_reader();
502        let reader = user_reader.read_guard();
503
504        if let Some(user) = reader.get_user_by_name(&granted_by.real_value()) {
505            granted_by_id = Some(user.id);
506        } else {
507            return Err(ErrorCode::InvalidInputSyntax(format!(
508                "Grantor \"{granted_by}\" does not exist"
509            ))
510            .into());
511        }
512    }
513    let privileges = make_prost_privilege(&session, privileges, objects)?;
514    let user_info_writer = session.user_info_writer()?;
515    user_info_writer
516        .revoke_privilege(
517            users,
518            privileges,
519            granted_by_id.unwrap_or(session.user_id()),
520            session.user_id(),
521            revoke_grant_option,
522            cascade,
523        )
524        .await?;
525
526    Ok(PgResponse::empty_result(StatementType::REVOKE_PRIVILEGE))
527}
528
529pub async fn handle_alter_default_privileges(
530    handler_args: HandlerArgs,
531    stmt: Statement,
532) -> Result<RwPgResponse> {
533    let session = handler_args.session;
534    let Statement::AlterDefaultPrivileges {
535        target_users,
536        schema_names,
537        operation,
538    } = stmt
539    else {
540        return Err(
541            ErrorCode::BindError("Invalid alter default privileges statement".to_owned()).into(),
542        );
543    };
544
545    // If target users are not specified, use the current user.
546    let users = match target_users {
547        None => vec![session.user_id()],
548        Some(users) => {
549            let users = bind_user_from_idents(&session, users)?;
550            if !session.is_super_user() && users.len() > 1 {
551                return Err(ErrorCode::PermissionDenied(
552                    "Only superuser can alter default privileges for multiple users".to_owned(),
553                )
554                .into());
555            } else if !session.is_super_user()
556                && users.iter().any(|user| *user != session.user_id())
557            {
558                return Err(ErrorCode::PermissionDenied(
559                    "Only superuser can alter default privileges for other users".to_owned(),
560                )
561                .into());
562            }
563            users
564        }
565    };
566
567    // If schema names are not specified,
568    // users will be grant/revoke privileges on all schemas in the current database.
569    let schemas = match schema_names {
570        None => {
571            if !operation.for_schemas() {
572                let catalog_reader = session.env().catalog_reader();
573                let reader = catalog_reader.read_guard();
574                let schemas = reader
575                    .get_database_by_id(&session.database_id())?
576                    .iter_schemas()
577                    .filter(|schema| !is_system_schema(&schema.name))
578                    .map(|schema| schema.id())
579                    .collect_vec();
580                if schemas.is_empty() {
581                    return Ok(PgResponse::empty_result(
582                        StatementType::ALTER_DEFAULT_PRIVILEGES,
583                    ));
584                }
585                schemas
586            } else {
587                vec![]
588            }
589        }
590        Some(names) => {
591            let catalog_reader = session.env().catalog_reader();
592            let reader = catalog_reader.read_guard();
593            let mut schemas = vec![];
594            for name in names {
595                let schema_name = Binder::resolve_schema_name(name)?;
596                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
597                schemas.push(schema.id());
598            }
599            schemas
600        }
601    };
602
603    let alter_operation = match operation {
604        DefaultPrivilegeOperation::Grant {
605            privileges,
606            object_type,
607            grantees,
608            with_grant_option,
609        } => {
610            let grantees = bind_user_from_idents(&session, grantees)?;
611            AlterDefaultPrivilegeOperation::GrantPrivilege(OpGrantPrivilege {
612                actions: make_prost_actions(privileges, &object_type)?
613                    .into_iter()
614                    .map(|a| a as i32)
615                    .collect(),
616                object_type: derive_object_type(&object_type) as i32,
617                grantees,
618                with_grant_option,
619            })
620        }
621        DefaultPrivilegeOperation::Revoke {
622            privileges,
623            object_type,
624            grantees,
625            revoke_grant_option,
626            ..
627        } => {
628            let grantees = bind_user_from_idents(&session, grantees)?;
629            AlterDefaultPrivilegeOperation::RevokePrivilege(OpRevokePrivilege {
630                actions: make_prost_actions(privileges, &object_type)?
631                    .into_iter()
632                    .map(|a| a as i32)
633                    .collect(),
634                object_type: derive_object_type(&object_type) as i32,
635                grantees,
636                revoke_grant_option,
637            })
638        }
639    };
640
641    let user_info_writer = session.user_info_writer()?;
642    user_info_writer
643        .alter_default_privilege(
644            users,
645            session.database_id(),
646            schemas,
647            alter_operation,
648            session.user_id(),
649        )
650        .await?;
651
652    Ok(PgResponse::empty_result(
653        StatementType::ALTER_DEFAULT_PRIVILEGES,
654    ))
655}
656
657#[cfg(test)]
658mod tests {
659    use risingwave_common::catalog::DEFAULT_SUPER_USER_ID;
660    use risingwave_pb::user::Action;
661
662    use super::*;
663    use crate::test_utils::LocalFrontend;
664
665    #[tokio::test]
666    async fn test_grant_privilege() {
667        let frontend = LocalFrontend::new(Default::default()).await;
668        let session = frontend.session_ref();
669        frontend
670            .run_sql("CREATE USER user WITH SUPERUSER PASSWORD 'password'")
671            .await
672            .unwrap();
673        frontend
674            .run_sql("CREATE USER user1 WITH PASSWORD 'password1'")
675            .await
676            .unwrap();
677        frontend.run_sql("CREATE DATABASE db1").await.unwrap();
678        frontend
679            .run_sql("GRANT ALL ON DATABASE db1 TO user1 WITH GRANT OPTION GRANTED BY user")
680            .await
681            .unwrap();
682
683        let (session_database_id, database_id) = {
684            let catalog_reader = session.env().catalog_reader();
685            let reader = catalog_reader.read_guard();
686            (
687                reader
688                    .get_database_by_name(&session.database())
689                    .unwrap()
690                    .id(),
691                reader.get_database_by_name("db1").unwrap().id(),
692            )
693        };
694
695        {
696            let user_reader = session.env().user_info_reader();
697            let reader = user_reader.read_guard();
698            let user_info = reader.get_user_by_name("user1").unwrap();
699            assert_eq!(
700                user_info.grant_privileges,
701                vec![
702                    PbGrantPrivilege {
703                        action_with_opts: vec![ActionWithGrantOption {
704                            action: Action::Connect as i32,
705                            with_grant_option: true,
706                            granted_by: session.user_id(),
707                        }],
708                        object: Some(PbObject::DatabaseId(session_database_id)),
709                    },
710                    PbGrantPrivilege {
711                        action_with_opts: vec![
712                            ActionWithGrantOption {
713                                action: Action::Create as i32,
714                                with_grant_option: true,
715                                granted_by: DEFAULT_SUPER_USER_ID,
716                            },
717                            ActionWithGrantOption {
718                                action: Action::Connect as i32,
719                                with_grant_option: true,
720                                granted_by: DEFAULT_SUPER_USER_ID,
721                            }
722                        ],
723                        object: Some(PbObject::DatabaseId(database_id)),
724                    }
725                ]
726            );
727        }
728
729        frontend
730            .run_sql("REVOKE GRANT OPTION FOR ALL ON DATABASE db1 from user1 GRANTED BY user")
731            .await
732            .unwrap();
733        {
734            let user_reader = session.env().user_info_reader();
735            let reader = user_reader.read_guard();
736            let user_info = reader.get_user_by_name("user1").unwrap();
737            assert!(
738                user_info
739                    .grant_privileges
740                    .iter()
741                    .filter(|gp| gp.object == Some(PbObject::DatabaseId(database_id)))
742                    .all(|p| p.action_with_opts.iter().all(|ao| !ao.with_grant_option))
743            );
744        }
745
746        frontend
747            .run_sql("REVOKE ALL ON DATABASE db1 from user1 GRANTED BY user")
748            .await
749            .unwrap();
750        {
751            let user_reader = session.env().user_info_reader();
752            let reader = user_reader.read_guard();
753            let user_info = reader.get_user_by_name("user1").unwrap();
754            assert_eq!(
755                user_info.grant_privileges,
756                vec![PbGrantPrivilege {
757                    action_with_opts: vec![ActionWithGrantOption {
758                        action: Action::Connect as i32,
759                        with_grant_option: true,
760                        granted_by: session.user_id(),
761                    }],
762                    object: Some(PbObject::DatabaseId(session_database_id)),
763                }]
764            );
765        }
766        frontend.run_sql("DROP USER user1").await.unwrap();
767    }
768}