risingwave_frontend/handler/
handle_privilege.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_pb::user::PbGrantPrivilege;
17use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, PbObject};
18use risingwave_sqlparser::ast::{GrantObjects, Privileges, Statement};
19
20use super::RwPgResponse;
21use crate::bind_data_type;
22use crate::binder::Binder;
23use crate::catalog::CatalogError;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::table_catalog::TableType;
26use crate::error::{ErrorCode, Result};
27use crate::handler::HandlerArgs;
28use crate::session::SessionImpl;
29use crate::user::user_privilege::{
30    available_privilege_actions, check_privilege_type, get_prost_action,
31};
32
33fn make_prost_privilege(
34    session: &SessionImpl,
35    privileges: Privileges,
36    objects: GrantObjects,
37) -> Result<Vec<PbGrantPrivilege>> {
38    check_privilege_type(&privileges, &objects)?;
39
40    let catalog_reader = session.env().catalog_reader();
41    let reader = catalog_reader.read_guard();
42    let actions = match privileges {
43        Privileges::All { .. } => available_privilege_actions(&objects)?,
44        Privileges::Actions(actions) => actions
45            .into_iter()
46            .map(|action| get_prost_action(&action))
47            .collect(),
48    };
49    let mut grant_objs = vec![];
50    match objects {
51        GrantObjects::Databases(databases) => {
52            for db in databases {
53                let database_name = Binder::resolve_database_name(db)?;
54                let database = reader.get_database_by_name(&database_name)?;
55                grant_objs.push(PbObject::DatabaseId(database.id()));
56            }
57        }
58        GrantObjects::Schemas(schemas) => {
59            for schema in schemas {
60                let schema_name = Binder::resolve_schema_name(schema)?;
61                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
62                grant_objs.push(PbObject::SchemaId(schema.id()));
63            }
64        }
65        GrantObjects::Mviews(tables) => {
66            let db_name = &session.database();
67            let search_path = session.config().search_path();
68            let user_name = &session.user_name();
69
70            for name in tables {
71                let (schema_name, table_name) =
72                    Binder::resolve_schema_qualified_name(db_name, name)?;
73                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
74
75                let (table, _) =
76                    reader.get_created_table_by_name(db_name, schema_path, &table_name)?;
77                match table.table_type() {
78                    TableType::MaterializedView => {}
79                    _ => {
80                        return Err(ErrorCode::InvalidInputSyntax(format!(
81                            "{table_name} is not a materialized view",
82                        ))
83                        .into());
84                    }
85                }
86                grant_objs.push(PbObject::TableId(table.id().table_id));
87            }
88        }
89        GrantObjects::Tables(tables) => {
90            let db_name = &session.database();
91            let search_path = session.config().search_path();
92            let user_name = &session.user_name();
93
94            for name in tables {
95                let (schema_name, table_name) =
96                    Binder::resolve_schema_qualified_name(db_name, name)?;
97                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
98
99                match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
100                    Ok((table, _)) => {
101                        match table.table_type() {
102                            TableType::Table => {
103                                grant_objs.push(PbObject::TableId(table.id().table_id));
104                                continue;
105                            }
106                            _ => {
107                                return Err(ErrorCode::InvalidInputSyntax(format!(
108                                    "{table_name} is not a table",
109                                ))
110                                .into());
111                            }
112                        };
113                    }
114                    Err(CatalogError::NotFound("table", _)) => {
115                        let (view, _) = reader
116                            .get_view_by_name(db_name, schema_path, &table_name)
117                            .map_err(|_| CatalogError::NotFound("table", table_name))?;
118                        grant_objs.push(PbObject::ViewId(view.id));
119                    }
120                    Err(e) => {
121                        return Err(e.into());
122                    }
123                }
124            }
125        }
126        GrantObjects::Sources(sources) => {
127            let db_name = &session.database();
128            let search_path = session.config().search_path();
129            let user_name = &session.user_name();
130
131            for name in sources {
132                let (schema_name, source_name) =
133                    Binder::resolve_schema_qualified_name(db_name, name)?;
134                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
135
136                let (source, _) = reader.get_source_by_name(db_name, schema_path, &source_name)?;
137                grant_objs.push(PbObject::SourceId(source.id));
138            }
139        }
140        GrantObjects::Sinks(sinks) => {
141            let db_name = &session.database();
142            let search_path = session.config().search_path();
143            let user_name = &session.user_name();
144
145            for name in sinks {
146                let (schema_name, sink_name) =
147                    Binder::resolve_schema_qualified_name(db_name, name)?;
148                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
149
150                let (sink, _) = reader.get_sink_by_name(db_name, schema_path, &sink_name)?;
151                grant_objs.push(PbObject::SinkId(sink.id.sink_id));
152            }
153        }
154        GrantObjects::Views(views) => {
155            let db_name = &session.database();
156            let search_path = session.config().search_path();
157            let user_name = &session.user_name();
158
159            for name in views {
160                let (schema_name, view_name) =
161                    Binder::resolve_schema_qualified_name(db_name, name)?;
162                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
163
164                let (view, _) = reader.get_view_by_name(db_name, schema_path, &view_name)?;
165                grant_objs.push(PbObject::ViewId(view.id));
166            }
167        }
168        GrantObjects::Connections(conns) => {
169            let db_name = &session.database();
170            let search_path = session.config().search_path();
171            let user_name = &session.user_name();
172
173            for name in conns {
174                let (schema_name, conn_name) =
175                    Binder::resolve_schema_qualified_name(db_name, name)?;
176                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
177
178                let (conn, _) = reader.get_connection_by_name(db_name, schema_path, &conn_name)?;
179                grant_objs.push(PbObject::ConnectionId(conn.id));
180            }
181        }
182        GrantObjects::Subscriptions(subscriptions) => {
183            let db_name = &session.database();
184            let search_path = session.config().search_path();
185            let user_name = &session.user_name();
186
187            for name in subscriptions {
188                let (schema_name, sub_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
189                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
190
191                let (sub, _) = reader.get_subscription_by_name(db_name, schema_path, &sub_name)?;
192                grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
193            }
194        }
195        GrantObjects::Functions(func_descs) => {
196            let db_name = &session.database();
197            let search_path = session.config().search_path();
198            let user_name = &session.user_name();
199
200            for func_desc in func_descs {
201                let (schema_name, func_name) =
202                    Binder::resolve_schema_qualified_name(db_name, func_desc.name)?;
203                let arg_types = match func_desc.args {
204                    Some(args) => {
205                        let mut arg_types = vec![];
206                        for arg in args {
207                            arg_types.push(bind_data_type(&arg.data_type)?);
208                        }
209                        Some(arg_types)
210                    }
211                    None => None,
212                };
213                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
214
215                let (func, _) = match arg_types {
216                    Some(arg_types) => reader.get_function_by_name_args(
217                        db_name,
218                        schema_path,
219                        &func_name,
220                        &arg_types,
221                    )?,
222                    None => {
223                        let (functions, schema_name) =
224                            reader.get_functions_by_name(db_name, schema_path, &func_name)?;
225                        if functions.len() > 1 {
226                            return Err(ErrorCode::CatalogError(format!(
227                                "function name {func_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously."
228                            ).into()).into());
229                        }
230                        (
231                            functions.into_iter().next().expect("no functions"),
232                            schema_name,
233                        )
234                    }
235                };
236                grant_objs.push(PbObject::FunctionId(func.id.function_id()));
237            }
238        }
239        GrantObjects::Secrets(secrets) => {
240            let db_name = &session.database();
241            let search_path = session.config().search_path();
242            let user_name = &session.user_name();
243
244            for name in secrets {
245                let (schema_name, secret_name) =
246                    Binder::resolve_schema_qualified_name(db_name, name)?;
247                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
248
249                let (secret, _) = reader.get_secret_by_name(db_name, schema_path, &secret_name)?;
250                grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
251            }
252        }
253        GrantObjects::AllSourcesInSchema { schemas } => {
254            for schema in schemas {
255                let schema_name = Binder::resolve_schema_name(schema)?;
256                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
257                schema.iter_source().for_each(|source| {
258                    grant_objs.push(PbObject::SourceId(source.id));
259                });
260            }
261        }
262        GrantObjects::AllMviewsInSchema { schemas } => {
263            for schema in schemas {
264                let schema_name = Binder::resolve_schema_name(schema)?;
265                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
266                schema.iter_all_mvs().for_each(|mview| {
267                    grant_objs.push(PbObject::TableId(mview.id().table_id));
268                });
269            }
270        }
271        GrantObjects::AllTablesInSchema { schemas } => {
272            for schema in schemas {
273                let schema_name = Binder::resolve_schema_name(schema)?;
274                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
275                schema.iter_user_table().for_each(|table| {
276                    grant_objs.push(PbObject::TableId(table.id().table_id));
277                });
278            }
279        }
280        GrantObjects::AllSinksInSchema { schemas } => {
281            for schema in schemas {
282                let schema_name = Binder::resolve_schema_name(schema)?;
283                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
284                schema.iter_sink().for_each(|sink| {
285                    grant_objs.push(PbObject::SinkId(sink.id.sink_id));
286                });
287            }
288        }
289        GrantObjects::AllViewsInSchema { schemas } => {
290            for schema in schemas {
291                let schema_name = Binder::resolve_schema_name(schema)?;
292                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
293                schema.iter_view().for_each(|view| {
294                    grant_objs.push(PbObject::ViewId(view.id));
295                });
296            }
297        }
298        GrantObjects::AllFunctionsInSchema { schemas } => {
299            for schema in schemas {
300                let schema_name = Binder::resolve_schema_name(schema)?;
301                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
302                schema.iter_function().for_each(|func| {
303                    grant_objs.push(PbObject::FunctionId(func.id.function_id()));
304                });
305            }
306        }
307        GrantObjects::AllSecretsInSchema { schemas } => {
308            for schema in schemas {
309                let schema_name = Binder::resolve_schema_name(schema)?;
310                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
311                schema.iter_secret().for_each(|secret| {
312                    grant_objs.push(PbObject::SecretId(secret.id.secret_id()));
313                });
314            }
315        }
316        GrantObjects::AllSubscriptionsInSchema { schemas } => {
317            for schema in schemas {
318                let schema_name = Binder::resolve_schema_name(schema)?;
319                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
320                schema.iter_subscription().for_each(|sub| {
321                    grant_objs.push(PbObject::SubscriptionId(sub.id.subscription_id));
322                });
323            }
324        }
325        GrantObjects::AllConnectionsInSchema { schemas } => {
326            for schema in schemas {
327                let schema_name = Binder::resolve_schema_name(schema)?;
328                let schema = reader.get_schema_by_name(&session.database(), &schema_name)?;
329                schema.iter_connections().for_each(|conn| {
330                    grant_objs.push(PbObject::ConnectionId(conn.id));
331                });
332            }
333        }
334        o => {
335            return Err(ErrorCode::BindError(format!(
336                "GRANT statement does not support object type: {:?}",
337                o
338            ))
339            .into());
340        }
341    };
342    let action_with_opts = actions
343        .into_iter()
344        .map(|action| ActionWithGrantOption {
345            action: action as i32,
346            granted_by: session.user_id(),
347            ..Default::default()
348        })
349        .collect::<Vec<_>>();
350
351    let mut prost_privileges = vec![];
352    for objs in grant_objs {
353        prost_privileges.push(PbGrantPrivilege {
354            action_with_opts: action_with_opts.clone(),
355            object: Some(objs),
356        });
357    }
358    Ok(prost_privileges)
359}
360
361pub async fn handle_grant_privilege(
362    handler_args: HandlerArgs,
363    stmt: Statement,
364) -> Result<RwPgResponse> {
365    let session = handler_args.session;
366    let Statement::Grant {
367        privileges,
368        objects,
369        grantees,
370        with_grant_option,
371        granted_by,
372    } = stmt
373    else {
374        return Err(ErrorCode::BindError("Invalid grant statement".to_owned()).into());
375    };
376    let mut users = vec![];
377    {
378        let user_reader = session.env().user_info_reader();
379        let reader = user_reader.read_guard();
380        for grantee in grantees {
381            if let Some(user) = reader.get_user_by_name(&grantee.real_value()) {
382                users.push(user.id);
383            } else {
384                return Err(ErrorCode::BindError("Grantee does not exist".to_owned()).into());
385            }
386        }
387        if let Some(granted_by) = &granted_by {
388            // We remark that the user name is always case-sensitive.
389            if reader.get_user_by_name(&granted_by.real_value()).is_none() {
390                return Err(ErrorCode::BindError("Grantor does not exist".to_owned()).into());
391            }
392        }
393    };
394
395    let privileges = make_prost_privilege(&session, privileges, objects)?;
396    let user_info_writer = session.user_info_writer()?;
397    user_info_writer
398        .grant_privilege(users, privileges, with_grant_option, session.user_id())
399        .await?;
400    Ok(PgResponse::empty_result(StatementType::GRANT_PRIVILEGE))
401}
402
403pub async fn handle_revoke_privilege(
404    handler_args: HandlerArgs,
405    stmt: Statement,
406) -> Result<RwPgResponse> {
407    let session = handler_args.session;
408    let Statement::Revoke {
409        privileges,
410        objects,
411        grantees,
412        granted_by,
413        revoke_grant_option,
414        cascade,
415    } = stmt
416    else {
417        return Err(ErrorCode::BindError("Invalid revoke statement".to_owned()).into());
418    };
419    let mut users = vec![];
420    let mut granted_by_id = None;
421    {
422        let user_reader = session.env().user_info_reader();
423        let reader = user_reader.read_guard();
424        for grantee in grantees {
425            if let Some(user) = reader.get_user_by_name(&grantee.real_value()) {
426                users.push(user.id);
427            } else {
428                return Err(ErrorCode::BindError("Grantee does not exist".to_owned()).into());
429            }
430        }
431        if let Some(granted_by) = &granted_by {
432            if let Some(user) = reader.get_user_by_name(&granted_by.real_value()) {
433                granted_by_id = Some(user.id);
434            } else {
435                return Err(ErrorCode::BindError("Grantor does not exist".to_owned()).into());
436            }
437        }
438    };
439    let privileges = make_prost_privilege(&session, privileges, objects)?;
440    let user_info_writer = session.user_info_writer()?;
441    user_info_writer
442        .revoke_privilege(
443            users,
444            privileges,
445            granted_by_id.unwrap_or(session.user_id()),
446            session.user_id(),
447            revoke_grant_option,
448            cascade,
449        )
450        .await?;
451
452    Ok(PgResponse::empty_result(StatementType::REVOKE_PRIVILEGE))
453}
454
455#[cfg(test)]
456mod tests {
457    use risingwave_common::catalog::DEFAULT_SUPER_USER_ID;
458    use risingwave_pb::user::grant_privilege::Action;
459
460    use super::*;
461    use crate::test_utils::LocalFrontend;
462
463    #[tokio::test]
464    async fn test_grant_privilege() {
465        let frontend = LocalFrontend::new(Default::default()).await;
466        let session = frontend.session_ref();
467        frontend
468            .run_sql("CREATE USER user WITH SUPERUSER PASSWORD 'password'")
469            .await
470            .unwrap();
471        frontend
472            .run_sql("CREATE USER user1 WITH PASSWORD 'password1'")
473            .await
474            .unwrap();
475        frontend.run_sql("CREATE DATABASE db1").await.unwrap();
476        frontend
477            .run_sql("GRANT ALL ON DATABASE db1 TO user1 WITH GRANT OPTION GRANTED BY user")
478            .await
479            .unwrap();
480
481        let (session_database_id, database_id) = {
482            let catalog_reader = session.env().catalog_reader();
483            let reader = catalog_reader.read_guard();
484            (
485                reader
486                    .get_database_by_name(&session.database())
487                    .unwrap()
488                    .id(),
489                reader.get_database_by_name("db1").unwrap().id(),
490            )
491        };
492
493        {
494            let user_reader = session.env().user_info_reader();
495            let reader = user_reader.read_guard();
496            let user_info = reader.get_user_by_name("user1").unwrap();
497            assert_eq!(
498                user_info.grant_privileges,
499                vec![
500                    PbGrantPrivilege {
501                        action_with_opts: vec![ActionWithGrantOption {
502                            action: Action::Connect as i32,
503                            with_grant_option: true,
504                            granted_by: session.user_id(),
505                        }],
506                        object: Some(PbObject::DatabaseId(session_database_id)),
507                    },
508                    PbGrantPrivilege {
509                        action_with_opts: vec![
510                            ActionWithGrantOption {
511                                action: Action::Create as i32,
512                                with_grant_option: true,
513                                granted_by: DEFAULT_SUPER_USER_ID,
514                            },
515                            ActionWithGrantOption {
516                                action: Action::Connect as i32,
517                                with_grant_option: true,
518                                granted_by: DEFAULT_SUPER_USER_ID,
519                            }
520                        ],
521                        object: Some(PbObject::DatabaseId(database_id)),
522                    }
523                ]
524            );
525        }
526
527        frontend
528            .run_sql("REVOKE GRANT OPTION FOR ALL ON DATABASE db1 from user1 GRANTED BY user")
529            .await
530            .unwrap();
531        {
532            let user_reader = session.env().user_info_reader();
533            let reader = user_reader.read_guard();
534            let user_info = reader.get_user_by_name("user1").unwrap();
535            assert!(
536                user_info
537                    .grant_privileges
538                    .iter()
539                    .filter(|gp| gp.object == Some(PbObject::DatabaseId(database_id)))
540                    .all(|p| p.action_with_opts.iter().all(|ao| !ao.with_grant_option))
541            );
542        }
543
544        frontend
545            .run_sql("REVOKE ALL ON DATABASE db1 from user1 GRANTED BY user")
546            .await
547            .unwrap();
548        {
549            let user_reader = session.env().user_info_reader();
550            let reader = user_reader.read_guard();
551            let user_info = reader.get_user_by_name("user1").unwrap();
552            assert_eq!(
553                user_info.grant_privileges,
554                vec![PbGrantPrivilege {
555                    action_with_opts: vec![ActionWithGrantOption {
556                        action: Action::Connect as i32,
557                        with_grant_option: true,
558                        granted_by: session.user_id(),
559                    }],
560                    object: Some(PbObject::DatabaseId(session_database_id)),
561                }]
562            );
563        }
564        frontend.run_sql("DROP USER user1").await.unwrap();
565    }
566}