risingwave_frontend/handler/
handle_privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use itertools::Itertools;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::acl;
20use risingwave_common::catalog::is_system_schema;
21use risingwave_pb::common::PbObjectType;
22use risingwave_pb::user::alter_default_privilege_request::{
23    Operation as AlterDefaultPrivilegeOperation, PbGrantPrivilege as OpGrantPrivilege,
24    PbRevokePrivilege as OpRevokePrivilege,
25};
26use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, PbObject};
27use risingwave_pb::user::{PbAction, PbGrantPrivilege};
28use risingwave_sqlparser::ast::{
29    DefaultPrivilegeOperation, GrantObjects, Ident, PrivilegeObjectType, Privileges, Statement,
30};
31
32use super::RwPgResponse;
33use crate::bind_data_type;
34use crate::binder::Binder;
35use crate::catalog::CatalogError;
36use crate::catalog::root_catalog::SchemaPath;
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::HandlerArgs;
40use crate::session::SessionImpl;
41use crate::user::UserId;
42use crate::user::user_privilege::{
43    available_privilege_actions, check_privilege_type, get_prost_action,
44};
45
46fn make_prost_privilege(
47    session: &SessionImpl,
48    privileges: Privileges,
49    objects: GrantObjects,
50) -> Result<Vec<PbGrantPrivilege>> {
51    check_privilege_type(&privileges, &objects)?;
52
53    let catalog_reader = session.env().catalog_reader();
54    let reader = catalog_reader.read_guard();
55    let actions = match privileges {
56        Privileges::All { .. } => available_privilege_actions(&objects)?,
57        Privileges::Actions(actions) => actions
58            .into_iter()
59            .map(|action| get_prost_action(&action))
60            .collect(),
61    };
62    let mut grant_objs = vec![];
63    match objects {
64        GrantObjects::Databases(databases) => {
65            for db in databases {
66                let database_name = Binder::resolve_database_name(db)?;
67                let database = reader.get_database_by_name(&database_name)?;
68                grant_objs.push(PbObject::DatabaseId(database.id()));
69            }
70        }
71        GrantObjects::Schemas(schemas) => {
72            for schema in schemas {
73                let schema_name = Binder::resolve_schema_name(schema)?;
74                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
75                grant_objs.push(PbObject::SchemaId(schema.id()));
76            }
77        }
78        GrantObjects::Mviews(tables) => {
79            let db_name = &session.database();
80            let search_path = session.config().search_path();
81            let user_name = &session.user_name();
82
83            for name in tables {
84                let (schema_name, table_name) =
85                    Binder::resolve_schema_qualified_name(db_name, name)?;
86                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
87
88                let (table, _) =
89                    reader.get_created_table_by_name(db_name, schema_path, &table_name)?;
90                match table.table_type() {
91                    TableType::MaterializedView => {}
92                    _ => {
93                        return Err(ErrorCode::InvalidInputSyntax(format!(
94                            "{table_name} is not a materialized view",
95                        ))
96                        .into());
97                    }
98                }
99                grant_objs.push(PbObject::TableId(table.id().table_id));
100            }
101        }
102        GrantObjects::Tables(tables) => {
103            let db_name = &session.database();
104            let search_path = session.config().search_path();
105            let user_name = &session.user_name();
106
107            for name in tables {
108                let (schema_name, table_name) =
109                    Binder::resolve_schema_qualified_name(db_name, name)?;
110                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
111
112                match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
113                    Ok((table, _)) => {
114                        match table.table_type() {
115                            TableType::Table => {
116                                grant_objs.push(PbObject::TableId(table.id().table_id));
117                                continue;
118                            }
119                            _ => {
120                                return Err(ErrorCode::InvalidInputSyntax(format!(
121                                    "{table_name} is not a table",
122                                ))
123                                .into());
124                            }
125                        };
126                    }
127                    Err(CatalogError::NotFound("table", _)) => {
128                        let (view, _) = reader
129                            .get_view_by_name(db_name, schema_path, &table_name)
130                            .map_err(|_| CatalogError::NotFound("table", table_name))?;
131                        grant_objs.push(PbObject::ViewId(view.id));
132                    }
133                    Err(e) => {
134                        return Err(e.into());
135                    }
136                }
137            }
138        }
139        GrantObjects::Sources(sources) => {
140            let db_name = &session.database();
141            let search_path = session.config().search_path();
142            let user_name = &session.user_name();
143
144            for name in sources {
145                let (schema_name, source_name) =
146                    Binder::resolve_schema_qualified_name(db_name, name)?;
147                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
148
149                let (source, _) = reader.get_source_by_name(db_name, schema_path, &source_name)?;
150                grant_objs.push(PbObject::SourceId(source.id));
151            }
152        }
153        GrantObjects::Sinks(sinks) => {
154            let db_name = &session.database();
155            let search_path = session.config().search_path();
156            let user_name = &session.user_name();
157
158            for name in sinks {
159                let (schema_name, sink_name) =
160                    Binder::resolve_schema_qualified_name(db_name, name)?;
161                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
162
163                let (sink, _) =
164                    reader.get_created_sink_by_name(db_name, schema_path, &sink_name)?;
165                grant_objs.push(PbObject::SinkId(sink.id.sink_id));
166            }
167        }
168        GrantObjects::Views(views) => {
169            let db_name = &session.database();
170            let search_path = session.config().search_path();
171            let user_name = &session.user_name();
172
173            for name in views {
174                let (schema_name, view_name) =
175                    Binder::resolve_schema_qualified_name(db_name, name)?;
176                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
177
178                let (view, _) = reader.get_view_by_name(db_name, schema_path, &view_name)?;
179                grant_objs.push(PbObject::ViewId(view.id));
180            }
181        }
182        GrantObjects::Connections(conns) => {
183            let db_name = &session.database();
184            let search_path = session.config().search_path();
185            let user_name = &session.user_name();
186
187            for name in conns {
188                let (schema_name, conn_name) =
189                    Binder::resolve_schema_qualified_name(db_name, name)?;
190                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
191
192                let (conn, _) = reader.get_connection_by_name(db_name, schema_path, &conn_name)?;
193                grant_objs.push(PbObject::ConnectionId(conn.id));
194            }
195        }
196        GrantObjects::Subscriptions(subscriptions) => {
197            let db_name = &session.database();
198            let search_path = session.config().search_path();
199            let user_name = &session.user_name();
200
201            for name in subscriptions {
202                let (schema_name, sub_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
203                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
204
205                let (sub, _) = reader.get_subscription_by_name(db_name, schema_path, &sub_name)?;
206                grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
207            }
208        }
209        GrantObjects::Functions(func_descs) => {
210            let db_name = &session.database();
211            let search_path = session.config().search_path();
212            let user_name = &session.user_name();
213
214            for func_desc in func_descs {
215                let (schema_name, func_name) =
216                    Binder::resolve_schema_qualified_name(db_name, func_desc.name)?;
217                let arg_types = match func_desc.args {
218                    Some(args) => {
219                        let mut arg_types = vec![];
220                        for arg in args {
221                            arg_types.push(bind_data_type(&arg.data_type)?);
222                        }
223                        Some(arg_types)
224                    }
225                    None => None,
226                };
227                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
228
229                let (func, _) = match arg_types {
230                    Some(arg_types) => reader.get_function_by_name_args(
231                        db_name,
232                        schema_path,
233                        &func_name,
234                        &arg_types,
235                    )?,
236                    None => {
237                        let (functions, schema_name) =
238                            reader.get_functions_by_name(db_name, schema_path, &func_name)?;
239                        if functions.len() > 1 {
240                            return Err(ErrorCode::CatalogError(format!(
241                                "function name {func_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously."
242                            ).into()).into());
243                        }
244                        (
245                            functions.into_iter().next().expect("no functions"),
246                            schema_name,
247                        )
248                    }
249                };
250                grant_objs.push(PbObject::FunctionId(func.id.function_id()));
251            }
252        }
253        GrantObjects::Secrets(secrets) => {
254            let db_name = &session.database();
255            let search_path = session.config().search_path();
256            let user_name = &session.user_name();
257
258            for name in secrets {
259                let (schema_name, secret_name) =
260                    Binder::resolve_schema_qualified_name(db_name, name)?;
261                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
262
263                let (secret, _) = reader.get_secret_by_name(db_name, schema_path, &secret_name)?;
264                grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
265            }
266        }
267        GrantObjects::AllSourcesInSchema { schemas } => {
268            for schema in schemas {
269                let schema_name = Binder::resolve_schema_name(schema)?;
270                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
271                schema.iter_source().for_each(|source| {
272                    grant_objs.push(PbObject::SourceId(source.id));
273                });
274            }
275        }
276        GrantObjects::AllMviewsInSchema { schemas } => {
277            for schema in schemas {
278                let schema_name = Binder::resolve_schema_name(schema)?;
279                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
280                schema.iter_all_mvs().for_each(|mview| {
281                    grant_objs.push(PbObject::TableId(mview.id().table_id));
282                });
283            }
284        }
285        GrantObjects::AllTablesInSchema { schemas } => {
286            for schema in schemas {
287                let schema_name = Binder::resolve_schema_name(schema)?;
288                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
289                schema.iter_user_table().for_each(|table| {
290                    grant_objs.push(PbObject::TableId(table.id().table_id));
291                });
292            }
293        }
294        GrantObjects::AllSinksInSchema { schemas } => {
295            for schema in schemas {
296                let schema_name = Binder::resolve_schema_name(schema)?;
297                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
298                schema.iter_sink().for_each(|sink| {
299                    grant_objs.push(PbObject::SinkId(sink.id.sink_id));
300                });
301            }
302        }
303        GrantObjects::AllViewsInSchema { schemas } => {
304            for schema in schemas {
305                let schema_name = Binder::resolve_schema_name(schema)?;
306                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
307                schema.iter_view().for_each(|view| {
308                    grant_objs.push(PbObject::ViewId(view.id));
309                });
310            }
311        }
312        GrantObjects::AllFunctionsInSchema { schemas } => {
313            for schema in schemas {
314                let schema_name = Binder::resolve_schema_name(schema)?;
315                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
316                schema.iter_function().for_each(|func| {
317                    grant_objs.push(PbObject::FunctionId(func.id.function_id()));
318                });
319            }
320        }
321        GrantObjects::AllSecretsInSchema { schemas } => {
322            for schema in schemas {
323                let schema_name = Binder::resolve_schema_name(schema)?;
324                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
325                schema.iter_secret().for_each(|secret| {
326                    grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
327                });
328            }
329        }
330        GrantObjects::AllSubscriptionsInSchema { schemas } => {
331            for schema in schemas {
332                let schema_name = Binder::resolve_schema_name(schema)?;
333                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
334                schema.iter_subscription().for_each(|sub| {
335                    grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
336                });
337            }
338        }
339        GrantObjects::AllConnectionsInSchema { schemas } => {
340            for schema in schemas {
341                let schema_name = Binder::resolve_schema_name(schema)?;
342                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
343                schema.iter_connections().for_each(|conn| {
344                    grant_objs.push(PbObject::ConnectionId(conn.id));
345                });
346            }
347        }
348        o => {
349            return Err(ErrorCode::InvalidInputSyntax(format!(
350                "GRANT statement does not support object type: {:?}",
351                o
352            ))
353            .into());
354        }
355    };
356    let action_with_opts = actions
357        .into_iter()
358        .map(|action| ActionWithGrantOption {
359            action: action as i32,
360            granted_by: session.user_id(),
361            ..Default::default()
362        })
363        .collect::<Vec<_>>();
364
365    let mut prost_privileges = vec![];
366    for objs in grant_objs {
367        prost_privileges.push(PbGrantPrivilege {
368            action_with_opts: action_with_opts.clone(),
369            object: Some(objs),
370        });
371    }
372    Ok(prost_privileges)
373}
374
375/// Bind user from idents to user ids.
376fn bind_user_from_idents(session: &SessionImpl, names: Vec<Ident>) -> Result<Vec<UserId>> {
377    let user_reader = session.env().user_info_reader();
378    let reader = user_reader.read_guard();
379    let mut users = HashSet::new();
380    for name in &names {
381        if let Some(user) = reader.get_user_by_name(&name.real_value()) {
382            users.insert(user.id);
383        } else {
384            return Err(ErrorCode::InvalidInputSyntax(
385                format!("User \"{name}\" does not exist").to_owned(),
386            )
387            .into());
388        }
389    }
390    Ok(users.into_iter().collect())
391}
392
393fn derive_object_type(object_type: &PrivilegeObjectType) -> PbObjectType {
394    match object_type {
395        PrivilegeObjectType::Schemas => PbObjectType::Schema,
396        PrivilegeObjectType::Tables => PbObjectType::Table,
397        PrivilegeObjectType::Views => PbObjectType::View,
398        PrivilegeObjectType::Mviews => PbObjectType::Mview,
399        PrivilegeObjectType::Sources => PbObjectType::Source,
400        PrivilegeObjectType::Sinks => PbObjectType::Sink,
401        PrivilegeObjectType::Functions => PbObjectType::Function,
402        PrivilegeObjectType::Secrets => PbObjectType::Secret,
403        PrivilegeObjectType::Subscriptions => PbObjectType::Subscription,
404        PrivilegeObjectType::Connections => PbObjectType::Connection,
405    }
406}
407
408fn make_prost_actions(
409    privileges: Privileges,
410    object_type: &PrivilegeObjectType,
411) -> Result<Vec<PbAction>> {
412    let all_acls = match object_type {
413        PrivilegeObjectType::Tables => &acl::ALL_AVAILABLE_TABLE_MODES,
414        PrivilegeObjectType::Sources => &acl::ALL_AVAILABLE_SOURCE_MODES,
415        PrivilegeObjectType::Sinks => &acl::ALL_AVAILABLE_SINK_MODES,
416        PrivilegeObjectType::Mviews => &acl::ALL_AVAILABLE_MVIEW_MODES,
417        PrivilegeObjectType::Views => &acl::ALL_AVAILABLE_VIEW_MODES,
418        PrivilegeObjectType::Functions => &acl::ALL_AVAILABLE_FUNCTION_MODES,
419        PrivilegeObjectType::Connections => &acl::ALL_AVAILABLE_CONNECTION_MODES,
420        PrivilegeObjectType::Secrets => &acl::ALL_AVAILABLE_SECRET_MODES,
421        PrivilegeObjectType::Subscriptions => &acl::ALL_AVAILABLE_SUBSCRIPTION_MODES,
422        PrivilegeObjectType::Schemas => &acl::ALL_AVAILABLE_SCHEMA_MODES,
423    };
424
425    match privileges {
426        Privileges::All { .. } => Ok(all_acls.iter().map(Into::into).collect()),
427        Privileges::Actions(actions) => {
428            let actions = actions
429                .into_iter()
430                .map(|action| get_prost_action(&action))
431                .collect::<Vec<_>>();
432            for action in &actions {
433                if !all_acls.has_mode((*action).into()) {
434                    return Err(ErrorCode::InvalidInputSyntax(format!(
435                        "Invalid privilege type for the given object: {action:?}"
436                    ))
437                    .into());
438                }
439            }
440            Ok(actions)
441        }
442    }
443}
444
445pub async fn handle_grant_privilege(
446    handler_args: HandlerArgs,
447    stmt: Statement,
448) -> Result<RwPgResponse> {
449    let session = handler_args.session;
450    let Statement::Grant {
451        privileges,
452        objects,
453        grantees,
454        with_grant_option,
455        granted_by,
456    } = stmt
457    else {
458        return Err(ErrorCode::BindError("Invalid grant statement".to_owned()).into());
459    };
460    let users = bind_user_from_idents(&session, grantees)?;
461    if let Some(granted_by) = &granted_by {
462        let user_reader = session.env().user_info_reader();
463        let reader = user_reader.read_guard();
464
465        // We remark that the user name is always case-sensitive.
466        if reader.get_user_by_name(&granted_by.real_value()).is_none() {
467            return Err(ErrorCode::InvalidInputSyntax(
468                format!("Grantor \"{granted_by}\" does not exist").to_owned(),
469            )
470            .into());
471        }
472    }
473
474    let privileges = make_prost_privilege(&session, privileges, objects)?;
475    let user_info_writer = session.user_info_writer()?;
476    user_info_writer
477        .grant_privilege(users, privileges, with_grant_option, session.user_id())
478        .await?;
479    Ok(PgResponse::empty_result(StatementType::GRANT_PRIVILEGE))
480}
481
482pub async fn handle_revoke_privilege(
483    handler_args: HandlerArgs,
484    stmt: Statement,
485) -> Result<RwPgResponse> {
486    let session = handler_args.session;
487    let Statement::Revoke {
488        privileges,
489        objects,
490        grantees,
491        granted_by,
492        revoke_grant_option,
493        cascade,
494    } = stmt
495    else {
496        return Err(ErrorCode::BindError("Invalid revoke statement".to_owned()).into());
497    };
498    let users = bind_user_from_idents(&session, grantees)?;
499    let mut granted_by_id = None;
500    if let Some(granted_by) = &granted_by {
501        let user_reader = session.env().user_info_reader();
502        let reader = user_reader.read_guard();
503
504        if let Some(user) = reader.get_user_by_name(&granted_by.real_value()) {
505            granted_by_id = Some(user.id);
506        } else {
507            return Err(ErrorCode::InvalidInputSyntax(
508                format!("Grantor \"{granted_by}\" does not exist").to_owned(),
509            )
510            .into());
511        }
512    }
513    let privileges = make_prost_privilege(&session, privileges, objects)?;
514    let user_info_writer = session.user_info_writer()?;
515    user_info_writer
516        .revoke_privilege(
517            users,
518            privileges,
519            granted_by_id.unwrap_or(session.user_id()),
520            session.user_id(),
521            revoke_grant_option,
522            cascade,
523        )
524        .await?;
525
526    Ok(PgResponse::empty_result(StatementType::REVOKE_PRIVILEGE))
527}
528
529pub async fn handle_alter_default_privileges(
530    handler_args: HandlerArgs,
531    stmt: Statement,
532) -> Result<RwPgResponse> {
533    let session = handler_args.session;
534    let Statement::AlterDefaultPrivileges {
535        target_users,
536        schema_names,
537        operation,
538    } = stmt
539    else {
540        return Err(
541            ErrorCode::BindError("Invalid alter default privileges statement".to_owned()).into(),
542        );
543    };
544
545    // If target users are not specified, use the current user.
546    let users = match target_users {
547        None => vec![session.user_id()],
548        Some(users) => {
549            let users = bind_user_from_idents(&session, users)?;
550            if !session.is_super_user() && users.len() > 1 {
551                return Err(ErrorCode::PermissionDenied(
552                    "Only superuser can alter default privileges for multiple users".to_owned(),
553                )
554                .into());
555            } else if !session.is_super_user()
556                && users.iter().any(|user| *user != session.user_id())
557            {
558                return Err(ErrorCode::PermissionDenied(
559                    "Only superuser can alter default privileges for other users".to_owned(),
560                )
561                .into());
562            }
563            users
564        }
565    };
566
567    // If schema names are not specified,
568    // users will be grant/revoke privileges on all schemas in the current database.
569    let schemas = match schema_names {
570        None => {
571            if !operation.for_schemas() {
572                let catalog_reader = session.env().catalog_reader();
573                let reader = catalog_reader.read_guard();
574                let schemas = reader
575                    .get_database_by_id(&session.database_id())?
576                    .iter_schemas()
577                    .filter(|schema| !is_system_schema(&schema.name))
578                    .map(|schema| schema.id())
579                    .collect_vec();
580                if schemas.is_empty() {
581                    return Ok(PgResponse::empty_result(
582                        StatementType::ALTER_DEFAULT_PRIVILEGES,
583                    ));
584                }
585                schemas
586            } else {
587                vec![]
588            }
589        }
590        Some(names) => {
591            let catalog_reader = session.env().catalog_reader();
592            let reader = catalog_reader.read_guard();
593            let mut schemas = vec![];
594            for name in names {
595                let schema_name = Binder::resolve_schema_name(name)?;
596                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
597                schemas.push(schema.id());
598            }
599            schemas
600        }
601    };
602
603    let alter_operation = match operation {
604        DefaultPrivilegeOperation::Grant {
605            privileges,
606            object_type,
607            grantees,
608            with_grant_option,
609        } => {
610            let grantees = bind_user_from_idents(&session, grantees)?;
611            AlterDefaultPrivilegeOperation::GrantPrivilege(OpGrantPrivilege {
612                actions: make_prost_actions(privileges, &object_type)?
613                    .into_iter()
614                    .map(|a| a as i32)
615                    .collect(),
616                object_type: derive_object_type(&object_type) as i32,
617                grantees,
618                with_grant_option,
619            })
620        }
621        DefaultPrivilegeOperation::Revoke {
622            privileges,
623            object_type,
624            grantees,
625            revoke_grant_option,
626            ..
627        } => {
628            let grantees = bind_user_from_idents(&session, grantees)?;
629            AlterDefaultPrivilegeOperation::RevokePrivilege(OpRevokePrivilege {
630                actions: make_prost_actions(privileges, &object_type)?
631                    .into_iter()
632                    .map(|a| a as i32)
633                    .collect(),
634                object_type: derive_object_type(&object_type) as i32,
635                grantees,
636                revoke_grant_option,
637            })
638        }
639    };
640
641    let user_info_writer = session.user_info_writer()?;
642    user_info_writer
643        .alter_default_privilege(
644            users,
645            session.database_id(),
646            schemas,
647            alter_operation,
648            session.user_id(),
649        )
650        .await?;
651
652    Ok(PgResponse::empty_result(
653        StatementType::ALTER_DEFAULT_PRIVILEGES,
654    ))
655}
656
657#[cfg(test)]
658mod tests {
659    use risingwave_common::catalog::DEFAULT_SUPER_USER_ID;
660    use risingwave_pb::user::Action;
661
662    use super::*;
663    use crate::test_utils::LocalFrontend;
664
665    #[tokio::test]
666    async fn test_grant_privilege() {
667        let frontend = LocalFrontend::new(Default::default()).await;
668        let session = frontend.session_ref();
669        frontend
670            .run_sql("CREATE USER user WITH SUPERUSER PASSWORD 'password'")
671            .await
672            .unwrap();
673        frontend
674            .run_sql("CREATE USER user1 WITH PASSWORD 'password1'")
675            .await
676            .unwrap();
677        frontend.run_sql("CREATE DATABASE db1").await.unwrap();
678        frontend
679            .run_sql("GRANT ALL ON DATABASE db1 TO user1 WITH GRANT OPTION GRANTED BY user")
680            .await
681            .unwrap();
682
683        let (session_database_id, database_id) = {
684            let catalog_reader = session.env().catalog_reader();
685            let reader = catalog_reader.read_guard();
686            (
687                reader
688                    .get_database_by_name(&session.database())
689                    .unwrap()
690                    .id(),
691                reader.get_database_by_name("db1").unwrap().id(),
692            )
693        };
694
695        {
696            let user_reader = session.env().user_info_reader();
697            let reader = user_reader.read_guard();
698            let user_info = reader.get_user_by_name("user1").unwrap();
699            assert_eq!(
700                user_info.grant_privileges,
701                vec![
702                    PbGrantPrivilege {
703                        action_with_opts: vec![ActionWithGrantOption {
704                            action: Action::Connect as i32,
705                            with_grant_option: true,
706                            granted_by: session.user_id(),
707                        }],
708                        object: Some(PbObject::DatabaseId(session_database_id)),
709                    },
710                    PbGrantPrivilege {
711                        action_with_opts: vec![
712                            ActionWithGrantOption {
713                                action: Action::Create as i32,
714                                with_grant_option: true,
715                                granted_by: DEFAULT_SUPER_USER_ID,
716                            },
717                            ActionWithGrantOption {
718                                action: Action::Connect as i32,
719                                with_grant_option: true,
720                                granted_by: DEFAULT_SUPER_USER_ID,
721                            }
722                        ],
723                        object: Some(PbObject::DatabaseId(database_id)),
724                    }
725                ]
726            );
727        }
728
729        frontend
730            .run_sql("REVOKE GRANT OPTION FOR ALL ON DATABASE db1 from user1 GRANTED BY user")
731            .await
732            .unwrap();
733        {
734            let user_reader = session.env().user_info_reader();
735            let reader = user_reader.read_guard();
736            let user_info = reader.get_user_by_name("user1").unwrap();
737            assert!(
738                user_info
739                    .grant_privileges
740                    .iter()
741                    .filter(|gp| gp.object == Some(PbObject::DatabaseId(database_id)))
742                    .all(|p| p.action_with_opts.iter().all(|ao| !ao.with_grant_option))
743            );
744        }
745
746        frontend
747            .run_sql("REVOKE ALL ON DATABASE db1 from user1 GRANTED BY user")
748            .await
749            .unwrap();
750        {
751            let user_reader = session.env().user_info_reader();
752            let reader = user_reader.read_guard();
753            let user_info = reader.get_user_by_name("user1").unwrap();
754            assert_eq!(
755                user_info.grant_privileges,
756                vec![PbGrantPrivilege {
757                    action_with_opts: vec![ActionWithGrantOption {
758                        action: Action::Connect as i32,
759                        with_grant_option: true,
760                        granted_by: session.user_id(),
761                    }],
762                    object: Some(PbObject::DatabaseId(session_database_id)),
763                }]
764            );
765        }
766        frontend.run_sql("DROP USER user1").await.unwrap();
767    }
768}