risingwave_frontend/handler/
handle_privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use itertools::Itertools;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::acl;
20use risingwave_common::catalog::is_system_schema;
21use risingwave_pb::common::PbObjectType;
22use risingwave_pb::user::alter_default_privilege_request::{
23    Operation as AlterDefaultPrivilegeOperation, PbGrantPrivilege as OpGrantPrivilege,
24    PbRevokePrivilege as OpRevokePrivilege,
25};
26use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, PbObject};
27use risingwave_pb::user::{PbAction, PbGrantPrivilege};
28use risingwave_sqlparser::ast::{
29    DefaultPrivilegeOperation, GrantObjects, Ident, PrivilegeObjectType, Privileges, Statement,
30};
31
32use super::RwPgResponse;
33use crate::bind_data_type;
34use crate::binder::Binder;
35use crate::catalog::CatalogError;
36use crate::catalog::root_catalog::SchemaPath;
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::HandlerArgs;
40use crate::session::SessionImpl;
41use crate::user::UserId;
42use crate::user::user_privilege::{
43    available_privilege_actions, check_privilege_type, get_prost_action,
44};
45
46fn make_prost_privilege(
47    session: &SessionImpl,
48    privileges: Privileges,
49    objects: GrantObjects,
50) -> Result<Vec<PbGrantPrivilege>> {
51    check_privilege_type(&privileges, &objects)?;
52
53    let catalog_reader = session.env().catalog_reader();
54    let reader = catalog_reader.read_guard();
55    let actions = match privileges {
56        Privileges::All { .. } => available_privilege_actions(&objects)?,
57        Privileges::Actions(actions) => actions
58            .into_iter()
59            .map(|action| get_prost_action(&action))
60            .collect(),
61    };
62    let mut grant_objs = vec![];
63    match objects {
64        GrantObjects::Databases(databases) => {
65            for db in databases {
66                let database_name = Binder::resolve_database_name(db)?;
67                let database = reader.get_database_by_name(&database_name)?;
68                grant_objs.push(PbObject::DatabaseId(database.id()));
69            }
70        }
71        GrantObjects::Schemas(schemas) => {
72            for schema in schemas {
73                let schema_name = Binder::resolve_schema_name(schema)?;
74                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
75                grant_objs.push(PbObject::SchemaId(schema.id()));
76            }
77        }
78        GrantObjects::Mviews(tables) => {
79            let db_name = &session.database();
80            let search_path = session.config().search_path();
81            let user_name = &session.user_name();
82
83            for name in tables {
84                let (schema_name, table_name) =
85                    Binder::resolve_schema_qualified_name(db_name, name)?;
86                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
87
88                let (table, _) =
89                    reader.get_created_table_by_name(db_name, schema_path, &table_name)?;
90                match table.table_type() {
91                    TableType::MaterializedView => {}
92                    _ => {
93                        return Err(ErrorCode::InvalidInputSyntax(format!(
94                            "{table_name} is not a materialized view",
95                        ))
96                        .into());
97                    }
98                }
99                grant_objs.push(PbObject::TableId(table.id().table_id));
100            }
101        }
102        GrantObjects::Tables(tables) => {
103            let db_name = &session.database();
104            let search_path = session.config().search_path();
105            let user_name = &session.user_name();
106
107            for name in tables {
108                let (schema_name, table_name) =
109                    Binder::resolve_schema_qualified_name(db_name, name)?;
110                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
111
112                match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
113                    Ok((table, _)) => {
114                        match table.table_type() {
115                            TableType::Table => {
116                                grant_objs.push(PbObject::TableId(table.id().table_id));
117                                continue;
118                            }
119                            _ => {
120                                return Err(ErrorCode::InvalidInputSyntax(format!(
121                                    "{table_name} is not a table",
122                                ))
123                                .into());
124                            }
125                        };
126                    }
127                    Err(CatalogError::NotFound("table", _)) => {
128                        let (view, _) = reader
129                            .get_view_by_name(db_name, schema_path, &table_name)
130                            .map_err(|_| CatalogError::NotFound("table", table_name))?;
131                        grant_objs.push(PbObject::ViewId(view.id));
132                    }
133                    Err(e) => {
134                        return Err(e.into());
135                    }
136                }
137            }
138        }
139        GrantObjects::Sources(sources) => {
140            let db_name = &session.database();
141            let search_path = session.config().search_path();
142            let user_name = &session.user_name();
143
144            for name in sources {
145                let (schema_name, source_name) =
146                    Binder::resolve_schema_qualified_name(db_name, name)?;
147                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
148
149                let (source, _) = reader.get_source_by_name(db_name, schema_path, &source_name)?;
150                grant_objs.push(PbObject::SourceId(source.id));
151            }
152        }
153        GrantObjects::Sinks(sinks) => {
154            let db_name = &session.database();
155            let search_path = session.config().search_path();
156            let user_name = &session.user_name();
157
158            for name in sinks {
159                let (schema_name, sink_name) =
160                    Binder::resolve_schema_qualified_name(db_name, name)?;
161                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
162
163                let (sink, _) = reader.get_sink_by_name(db_name, schema_path, &sink_name)?;
164                grant_objs.push(PbObject::SinkId(sink.id.sink_id));
165            }
166        }
167        GrantObjects::Views(views) => {
168            let db_name = &session.database();
169            let search_path = session.config().search_path();
170            let user_name = &session.user_name();
171
172            for name in views {
173                let (schema_name, view_name) =
174                    Binder::resolve_schema_qualified_name(db_name, name)?;
175                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
176
177                let (view, _) = reader.get_view_by_name(db_name, schema_path, &view_name)?;
178                grant_objs.push(PbObject::ViewId(view.id));
179            }
180        }
181        GrantObjects::Connections(conns) => {
182            let db_name = &session.database();
183            let search_path = session.config().search_path();
184            let user_name = &session.user_name();
185
186            for name in conns {
187                let (schema_name, conn_name) =
188                    Binder::resolve_schema_qualified_name(db_name, name)?;
189                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
190
191                let (conn, _) = reader.get_connection_by_name(db_name, schema_path, &conn_name)?;
192                grant_objs.push(PbObject::ConnectionId(conn.id));
193            }
194        }
195        GrantObjects::Subscriptions(subscriptions) => {
196            let db_name = &session.database();
197            let search_path = session.config().search_path();
198            let user_name = &session.user_name();
199
200            for name in subscriptions {
201                let (schema_name, sub_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
202                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
203
204                let (sub, _) = reader.get_subscription_by_name(db_name, schema_path, &sub_name)?;
205                grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
206            }
207        }
208        GrantObjects::Functions(func_descs) => {
209            let db_name = &session.database();
210            let search_path = session.config().search_path();
211            let user_name = &session.user_name();
212
213            for func_desc in func_descs {
214                let (schema_name, func_name) =
215                    Binder::resolve_schema_qualified_name(db_name, func_desc.name)?;
216                let arg_types = match func_desc.args {
217                    Some(args) => {
218                        let mut arg_types = vec![];
219                        for arg in args {
220                            arg_types.push(bind_data_type(&arg.data_type)?);
221                        }
222                        Some(arg_types)
223                    }
224                    None => None,
225                };
226                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
227
228                let (func, _) = match arg_types {
229                    Some(arg_types) => reader.get_function_by_name_args(
230                        db_name,
231                        schema_path,
232                        &func_name,
233                        &arg_types,
234                    )?,
235                    None => {
236                        let (functions, schema_name) =
237                            reader.get_functions_by_name(db_name, schema_path, &func_name)?;
238                        if functions.len() > 1 {
239                            return Err(ErrorCode::CatalogError(format!(
240                                "function name {func_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously."
241                            ).into()).into());
242                        }
243                        (
244                            functions.into_iter().next().expect("no functions"),
245                            schema_name,
246                        )
247                    }
248                };
249                grant_objs.push(PbObject::FunctionId(func.id.function_id()));
250            }
251        }
252        GrantObjects::Secrets(secrets) => {
253            let db_name = &session.database();
254            let search_path = session.config().search_path();
255            let user_name = &session.user_name();
256
257            for name in secrets {
258                let (schema_name, secret_name) =
259                    Binder::resolve_schema_qualified_name(db_name, name)?;
260                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
261
262                let (secret, _) = reader.get_secret_by_name(db_name, schema_path, &secret_name)?;
263                grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
264            }
265        }
266        GrantObjects::AllSourcesInSchema { schemas } => {
267            for schema in schemas {
268                let schema_name = Binder::resolve_schema_name(schema)?;
269                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
270                schema.iter_source().for_each(|source| {
271                    grant_objs.push(PbObject::SourceId(source.id));
272                });
273            }
274        }
275        GrantObjects::AllMviewsInSchema { schemas } => {
276            for schema in schemas {
277                let schema_name = Binder::resolve_schema_name(schema)?;
278                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
279                schema.iter_all_mvs().for_each(|mview| {
280                    grant_objs.push(PbObject::TableId(mview.id().table_id));
281                });
282            }
283        }
284        GrantObjects::AllTablesInSchema { schemas } => {
285            for schema in schemas {
286                let schema_name = Binder::resolve_schema_name(schema)?;
287                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
288                schema.iter_user_table().for_each(|table| {
289                    grant_objs.push(PbObject::TableId(table.id().table_id));
290                });
291            }
292        }
293        GrantObjects::AllSinksInSchema { schemas } => {
294            for schema in schemas {
295                let schema_name = Binder::resolve_schema_name(schema)?;
296                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
297                schema.iter_sink().for_each(|sink| {
298                    grant_objs.push(PbObject::SinkId(sink.id.sink_id));
299                });
300            }
301        }
302        GrantObjects::AllViewsInSchema { schemas } => {
303            for schema in schemas {
304                let schema_name = Binder::resolve_schema_name(schema)?;
305                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
306                schema.iter_view().for_each(|view| {
307                    grant_objs.push(PbObject::ViewId(view.id));
308                });
309            }
310        }
311        GrantObjects::AllFunctionsInSchema { schemas } => {
312            for schema in schemas {
313                let schema_name = Binder::resolve_schema_name(schema)?;
314                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
315                schema.iter_function().for_each(|func| {
316                    grant_objs.push(PbObject::FunctionId(func.id.function_id()));
317                });
318            }
319        }
320        GrantObjects::AllSecretsInSchema { schemas } => {
321            for schema in schemas {
322                let schema_name = Binder::resolve_schema_name(schema)?;
323                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
324                schema.iter_secret().for_each(|secret| {
325                    grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
326                });
327            }
328        }
329        GrantObjects::AllSubscriptionsInSchema { schemas } => {
330            for schema in schemas {
331                let schema_name = Binder::resolve_schema_name(schema)?;
332                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
333                schema.iter_subscription().for_each(|sub| {
334                    grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
335                });
336            }
337        }
338        GrantObjects::AllConnectionsInSchema { schemas } => {
339            for schema in schemas {
340                let schema_name = Binder::resolve_schema_name(schema)?;
341                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
342                schema.iter_connections().for_each(|conn| {
343                    grant_objs.push(PbObject::ConnectionId(conn.id));
344                });
345            }
346        }
347        o => {
348            return Err(ErrorCode::InvalidInputSyntax(format!(
349                "GRANT statement does not support object type: {:?}",
350                o
351            ))
352            .into());
353        }
354    };
355    let action_with_opts = actions
356        .into_iter()
357        .map(|action| ActionWithGrantOption {
358            action: action as i32,
359            granted_by: session.user_id(),
360            ..Default::default()
361        })
362        .collect::<Vec<_>>();
363
364    let mut prost_privileges = vec![];
365    for objs in grant_objs {
366        prost_privileges.push(PbGrantPrivilege {
367            action_with_opts: action_with_opts.clone(),
368            object: Some(objs),
369        });
370    }
371    Ok(prost_privileges)
372}
373
374/// Bind user from idents to user ids.
375fn bind_user_from_idents(session: &SessionImpl, names: Vec<Ident>) -> Result<Vec<UserId>> {
376    let user_reader = session.env().user_info_reader();
377    let reader = user_reader.read_guard();
378    let mut users = HashSet::new();
379    for name in &names {
380        if let Some(user) = reader.get_user_by_name(&name.real_value()) {
381            users.insert(user.id);
382        } else {
383            return Err(ErrorCode::InvalidInputSyntax(
384                format!("User \"{name}\" does not exist").to_owned(),
385            )
386            .into());
387        }
388    }
389    Ok(users.into_iter().collect())
390}
391
392fn derive_object_type(object_type: &PrivilegeObjectType) -> PbObjectType {
393    match object_type {
394        PrivilegeObjectType::Schemas => PbObjectType::Schema,
395        PrivilegeObjectType::Tables => PbObjectType::Table,
396        PrivilegeObjectType::Views => PbObjectType::View,
397        PrivilegeObjectType::Mviews => PbObjectType::Mview,
398        PrivilegeObjectType::Sources => PbObjectType::Source,
399        PrivilegeObjectType::Sinks => PbObjectType::Sink,
400        PrivilegeObjectType::Functions => PbObjectType::Function,
401        PrivilegeObjectType::Secrets => PbObjectType::Secret,
402        PrivilegeObjectType::Subscriptions => PbObjectType::Subscription,
403        PrivilegeObjectType::Connections => PbObjectType::Connection,
404    }
405}
406
407fn make_prost_actions(
408    privileges: Privileges,
409    object_type: &PrivilegeObjectType,
410) -> Result<Vec<PbAction>> {
411    let all_acls = match object_type {
412        PrivilegeObjectType::Tables => &acl::ALL_AVAILABLE_TABLE_MODES,
413        PrivilegeObjectType::Sources => &acl::ALL_AVAILABLE_SOURCE_MODES,
414        PrivilegeObjectType::Sinks => &acl::ALL_AVAILABLE_SINK_MODES,
415        PrivilegeObjectType::Mviews => &acl::ALL_AVAILABLE_MVIEW_MODES,
416        PrivilegeObjectType::Views => &acl::ALL_AVAILABLE_VIEW_MODES,
417        PrivilegeObjectType::Functions => &acl::ALL_AVAILABLE_FUNCTION_MODES,
418        PrivilegeObjectType::Connections => &acl::ALL_AVAILABLE_CONNECTION_MODES,
419        PrivilegeObjectType::Secrets => &acl::ALL_AVAILABLE_SECRET_MODES,
420        PrivilegeObjectType::Subscriptions => &acl::ALL_AVAILABLE_SUBSCRIPTION_MODES,
421        PrivilegeObjectType::Schemas => &acl::ALL_AVAILABLE_SCHEMA_MODES,
422    };
423
424    match privileges {
425        Privileges::All { .. } => Ok(all_acls.iter().map(Into::into).collect()),
426        Privileges::Actions(actions) => {
427            let actions = actions
428                .into_iter()
429                .map(|action| get_prost_action(&action))
430                .collect::<Vec<_>>();
431            for action in &actions {
432                if !all_acls.has_mode((*action).into()) {
433                    return Err(ErrorCode::InvalidInputSyntax(format!(
434                        "Invalid privilege type for the given object: {action:?}"
435                    ))
436                    .into());
437                }
438            }
439            Ok(actions)
440        }
441    }
442}
443
444pub async fn handle_grant_privilege(
445    handler_args: HandlerArgs,
446    stmt: Statement,
447) -> Result<RwPgResponse> {
448    let session = handler_args.session;
449    let Statement::Grant {
450        privileges,
451        objects,
452        grantees,
453        with_grant_option,
454        granted_by,
455    } = stmt
456    else {
457        return Err(ErrorCode::BindError("Invalid grant statement".to_owned()).into());
458    };
459    let users = bind_user_from_idents(&session, grantees)?;
460    if let Some(granted_by) = &granted_by {
461        let user_reader = session.env().user_info_reader();
462        let reader = user_reader.read_guard();
463
464        // We remark that the user name is always case-sensitive.
465        if reader.get_user_by_name(&granted_by.real_value()).is_none() {
466            return Err(ErrorCode::InvalidInputSyntax(
467                format!("Grantor \"{granted_by}\" does not exist").to_owned(),
468            )
469            .into());
470        }
471    }
472
473    let privileges = make_prost_privilege(&session, privileges, objects)?;
474    let user_info_writer = session.user_info_writer()?;
475    user_info_writer
476        .grant_privilege(users, privileges, with_grant_option, session.user_id())
477        .await?;
478    Ok(PgResponse::empty_result(StatementType::GRANT_PRIVILEGE))
479}
480
481pub async fn handle_revoke_privilege(
482    handler_args: HandlerArgs,
483    stmt: Statement,
484) -> Result<RwPgResponse> {
485    let session = handler_args.session;
486    let Statement::Revoke {
487        privileges,
488        objects,
489        grantees,
490        granted_by,
491        revoke_grant_option,
492        cascade,
493    } = stmt
494    else {
495        return Err(ErrorCode::BindError("Invalid revoke statement".to_owned()).into());
496    };
497    let users = bind_user_from_idents(&session, grantees)?;
498    let mut granted_by_id = None;
499    if let Some(granted_by) = &granted_by {
500        let user_reader = session.env().user_info_reader();
501        let reader = user_reader.read_guard();
502
503        if let Some(user) = reader.get_user_by_name(&granted_by.real_value()) {
504            granted_by_id = Some(user.id);
505        } else {
506            return Err(ErrorCode::InvalidInputSyntax(
507                format!("Grantor \"{granted_by}\" does not exist").to_owned(),
508            )
509            .into());
510        }
511    }
512    let privileges = make_prost_privilege(&session, privileges, objects)?;
513    let user_info_writer = session.user_info_writer()?;
514    user_info_writer
515        .revoke_privilege(
516            users,
517            privileges,
518            granted_by_id.unwrap_or(session.user_id()),
519            session.user_id(),
520            revoke_grant_option,
521            cascade,
522        )
523        .await?;
524
525    Ok(PgResponse::empty_result(StatementType::REVOKE_PRIVILEGE))
526}
527
528pub async fn handle_alter_default_privileges(
529    handler_args: HandlerArgs,
530    stmt: Statement,
531) -> Result<RwPgResponse> {
532    let session = handler_args.session;
533    let Statement::AlterDefaultPrivileges {
534        target_users,
535        schema_names,
536        operation,
537    } = stmt
538    else {
539        return Err(
540            ErrorCode::BindError("Invalid alter default privileges statement".to_owned()).into(),
541        );
542    };
543
544    // If target users are not specified, use the current user.
545    let users = match target_users {
546        None => vec![session.user_id()],
547        Some(users) => {
548            let users = bind_user_from_idents(&session, users)?;
549            if !session.is_super_user() && users.len() > 1 {
550                return Err(ErrorCode::PermissionDenied(
551                    "Only superuser can alter default privileges for multiple users".to_owned(),
552                )
553                .into());
554            } else if !session.is_super_user()
555                && users.iter().any(|user| *user != session.user_id())
556            {
557                return Err(ErrorCode::PermissionDenied(
558                    "Only superuser can alter default privileges for other users".to_owned(),
559                )
560                .into());
561            }
562            users
563        }
564    };
565
566    // If schema names are not specified,
567    // users will be grant/revoke privileges on all schemas in the current database.
568    let schemas = match schema_names {
569        None => {
570            if !operation.for_schemas() {
571                let catalog_reader = session.env().catalog_reader();
572                let reader = catalog_reader.read_guard();
573                let schemas = reader
574                    .get_database_by_id(&session.database_id())?
575                    .iter_schemas()
576                    .filter(|schema| !is_system_schema(&schema.name))
577                    .map(|schema| schema.id())
578                    .collect_vec();
579                if schemas.is_empty() {
580                    return Ok(PgResponse::empty_result(
581                        StatementType::ALTER_DEFAULT_PRIVILEGES,
582                    ));
583                }
584                schemas
585            } else {
586                vec![]
587            }
588        }
589        Some(names) => {
590            let catalog_reader = session.env().catalog_reader();
591            let reader = catalog_reader.read_guard();
592            let mut schemas = vec![];
593            for name in names {
594                let schema_name = Binder::resolve_schema_name(name)?;
595                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
596                schemas.push(schema.id());
597            }
598            schemas
599        }
600    };
601
602    let alter_operation = match operation {
603        DefaultPrivilegeOperation::Grant {
604            privileges,
605            object_type,
606            grantees,
607            with_grant_option,
608        } => {
609            let grantees = bind_user_from_idents(&session, grantees)?;
610            AlterDefaultPrivilegeOperation::GrantPrivilege(OpGrantPrivilege {
611                actions: make_prost_actions(privileges, &object_type)?
612                    .into_iter()
613                    .map(|a| a as i32)
614                    .collect(),
615                object_type: derive_object_type(&object_type) as i32,
616                grantees,
617                with_grant_option,
618            })
619        }
620        DefaultPrivilegeOperation::Revoke {
621            privileges,
622            object_type,
623            grantees,
624            revoke_grant_option,
625            ..
626        } => {
627            let grantees = bind_user_from_idents(&session, grantees)?;
628            AlterDefaultPrivilegeOperation::RevokePrivilege(OpRevokePrivilege {
629                actions: make_prost_actions(privileges, &object_type)?
630                    .into_iter()
631                    .map(|a| a as i32)
632                    .collect(),
633                object_type: derive_object_type(&object_type) as i32,
634                grantees,
635                revoke_grant_option,
636            })
637        }
638    };
639
640    let user_info_writer = session.user_info_writer()?;
641    user_info_writer
642        .alter_default_privilege(
643            users,
644            session.database_id(),
645            schemas,
646            alter_operation,
647            session.user_id(),
648        )
649        .await?;
650
651    Ok(PgResponse::empty_result(
652        StatementType::ALTER_DEFAULT_PRIVILEGES,
653    ))
654}
655
656#[cfg(test)]
657mod tests {
658    use risingwave_common::catalog::DEFAULT_SUPER_USER_ID;
659    use risingwave_pb::user::Action;
660
661    use super::*;
662    use crate::test_utils::LocalFrontend;
663
664    #[tokio::test]
665    async fn test_grant_privilege() {
666        let frontend = LocalFrontend::new(Default::default()).await;
667        let session = frontend.session_ref();
668        frontend
669            .run_sql("CREATE USER user WITH SUPERUSER PASSWORD 'password'")
670            .await
671            .unwrap();
672        frontend
673            .run_sql("CREATE USER user1 WITH PASSWORD 'password1'")
674            .await
675            .unwrap();
676        frontend.run_sql("CREATE DATABASE db1").await.unwrap();
677        frontend
678            .run_sql("GRANT ALL ON DATABASE db1 TO user1 WITH GRANT OPTION GRANTED BY user")
679            .await
680            .unwrap();
681
682        let (session_database_id, database_id) = {
683            let catalog_reader = session.env().catalog_reader();
684            let reader = catalog_reader.read_guard();
685            (
686                reader
687                    .get_database_by_name(&session.database())
688                    .unwrap()
689                    .id(),
690                reader.get_database_by_name("db1").unwrap().id(),
691            )
692        };
693
694        {
695            let user_reader = session.env().user_info_reader();
696            let reader = user_reader.read_guard();
697            let user_info = reader.get_user_by_name("user1").unwrap();
698            assert_eq!(
699                user_info.grant_privileges,
700                vec![
701                    PbGrantPrivilege {
702                        action_with_opts: vec![ActionWithGrantOption {
703                            action: Action::Connect as i32,
704                            with_grant_option: true,
705                            granted_by: session.user_id(),
706                        }],
707                        object: Some(PbObject::DatabaseId(session_database_id)),
708                    },
709                    PbGrantPrivilege {
710                        action_with_opts: vec![
711                            ActionWithGrantOption {
712                                action: Action::Create as i32,
713                                with_grant_option: true,
714                                granted_by: DEFAULT_SUPER_USER_ID,
715                            },
716                            ActionWithGrantOption {
717                                action: Action::Connect as i32,
718                                with_grant_option: true,
719                                granted_by: DEFAULT_SUPER_USER_ID,
720                            }
721                        ],
722                        object: Some(PbObject::DatabaseId(database_id)),
723                    }
724                ]
725            );
726        }
727
728        frontend
729            .run_sql("REVOKE GRANT OPTION FOR ALL ON DATABASE db1 from user1 GRANTED BY user")
730            .await
731            .unwrap();
732        {
733            let user_reader = session.env().user_info_reader();
734            let reader = user_reader.read_guard();
735            let user_info = reader.get_user_by_name("user1").unwrap();
736            assert!(
737                user_info
738                    .grant_privileges
739                    .iter()
740                    .filter(|gp| gp.object == Some(PbObject::DatabaseId(database_id)))
741                    .all(|p| p.action_with_opts.iter().all(|ao| !ao.with_grant_option))
742            );
743        }
744
745        frontend
746            .run_sql("REVOKE ALL ON DATABASE db1 from user1 GRANTED BY user")
747            .await
748            .unwrap();
749        {
750            let user_reader = session.env().user_info_reader();
751            let reader = user_reader.read_guard();
752            let user_info = reader.get_user_by_name("user1").unwrap();
753            assert_eq!(
754                user_info.grant_privileges,
755                vec![PbGrantPrivilege {
756                    action_with_opts: vec![ActionWithGrantOption {
757                        action: Action::Connect as i32,
758                        with_grant_option: true,
759                        granted_by: session.user_id(),
760                    }],
761                    object: Some(PbObject::DatabaseId(session_database_id)),
762                }]
763            );
764        }
765        frontend.run_sql("DROP USER user1").await.unwrap();
766    }
767}