risingwave_frontend/handler/
alter_owner.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::acl::AclMode;
19use risingwave_common::id::SchemaId;
20use risingwave_sqlparser::ast::{Ident, ObjectName, OperateFunctionArg};
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::{CatalogError, OwnedByUserCatalog};
25use crate::error::ErrorCode::{self, PermissionDenied};
26use crate::error::Result;
27use crate::session::SessionImpl;
28use crate::user::UserId;
29use crate::user::user_catalog::UserCatalog;
30use crate::{Binder, bind_data_type};
31
32pub fn check_schema_create_privilege(
33    session: &Arc<SessionImpl>,
34    new_owner: &UserCatalog,
35    schema_id: SchemaId,
36) -> Result<()> {
37    if session.is_super_user() {
38        return Ok(());
39    }
40    if !new_owner.is_super && !new_owner.has_privilege(schema_id, AclMode::Create) {
41        return Err(PermissionDenied(
42            "Require new owner to have create privilege on the object.".to_owned(),
43        )
44        .into());
45    }
46    Ok(())
47}
48
49pub async fn handle_alter_owner(
50    handler_args: HandlerArgs,
51    obj_name: ObjectName,
52    new_owner_name: Ident,
53    stmt_type: StatementType,
54    func_args: Option<Vec<OperateFunctionArg>>,
55) -> Result<RwPgResponse> {
56    let session = handler_args.session;
57    let db_name = &session.database();
58    let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
59    let search_path = session.config().search_path();
60    let user_name = &session.user_name();
61    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
62
63    let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
64    let (object, owner_id) = {
65        let catalog_reader = session.env().catalog_reader().read_guard();
66        let user_reader = session.env().user_info_reader().read_guard();
67        let new_owner = user_reader
68            .get_user_by_name(&new_owner_name)
69            .ok_or(CatalogError::not_found("user", new_owner_name))?;
70
71        let check_owned_by_admin = |owner: &UserId| -> Result<()> {
72            let user_catalog = user_reader.get_user_by_id(owner).unwrap();
73            if user_catalog.is_admin {
74                return Err(PermissionDenied(format!(
75                    "Cannot change owner of {} owned by admin user {}",
76                    obj_name.real_value(),
77                    user_catalog.name
78                ))
79                .into());
80            }
81            Ok(())
82        };
83
84        let owner_id = new_owner.id;
85        (
86            match stmt_type {
87                StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
88                    let (table, schema_name) = catalog_reader.get_created_table_by_name(
89                        db_name,
90                        schema_path,
91                        &real_obj_name,
92                    )?;
93                    session.check_privilege_for_drop_alter(schema_name, &**table)?;
94                    let schema_id = catalog_reader
95                        .get_schema_by_name(db_name, schema_name)?
96                        .id();
97                    check_schema_create_privilege(&session, new_owner, schema_id)?;
98                    if table.owner() == owner_id {
99                        return Ok(RwPgResponse::empty_result(stmt_type));
100                    }
101                    check_owned_by_admin(&table.owner)?;
102                    table.id.into()
103                }
104                StatementType::ALTER_VIEW => {
105                    let (view, schema_name) =
106                        catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
107                    session.check_privilege_for_drop_alter(schema_name, &**view)?;
108                    let schema_id = catalog_reader
109                        .get_schema_by_name(db_name, schema_name)?
110                        .id();
111                    check_schema_create_privilege(&session, new_owner, schema_id)?;
112                    if view.owner() == owner_id {
113                        return Ok(RwPgResponse::empty_result(stmt_type));
114                    }
115                    check_owned_by_admin(&view.owner)?;
116                    view.id.into()
117                }
118                StatementType::ALTER_SOURCE => {
119                    let (source, schema_name) =
120                        catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
121                    session.check_privilege_for_drop_alter(schema_name, &**source)?;
122                    let schema_id = catalog_reader
123                        .get_schema_by_name(db_name, schema_name)?
124                        .id();
125                    check_schema_create_privilege(&session, new_owner, schema_id)?;
126                    if source.owner() == owner_id {
127                        return Ok(RwPgResponse::empty_result(stmt_type));
128                    }
129                    check_owned_by_admin(&source.owner())?;
130                    source.id.into()
131                }
132                StatementType::ALTER_SINK => {
133                    let (sink, schema_name) = catalog_reader.get_created_sink_by_name(
134                        db_name,
135                        schema_path,
136                        &real_obj_name,
137                    )?;
138                    session.check_privilege_for_drop_alter(schema_name, &**sink)?;
139                    let schema_id = catalog_reader
140                        .get_schema_by_name(db_name, schema_name)?
141                        .id();
142                    check_schema_create_privilege(&session, new_owner, schema_id)?;
143                    if sink.owner() == owner_id {
144                        return Ok(RwPgResponse::empty_result(stmt_type));
145                    }
146                    check_owned_by_admin(&sink.owner())?;
147                    sink.id.into()
148                }
149                StatementType::ALTER_SUBSCRIPTION => {
150                    let (subscription, schema_name) = catalog_reader.get_subscription_by_name(
151                        db_name,
152                        schema_path,
153                        &real_obj_name,
154                    )?;
155                    session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
156                    let schema_id = catalog_reader
157                        .get_schema_by_name(db_name, schema_name)?
158                        .id();
159                    check_schema_create_privilege(&session, new_owner, schema_id)?;
160                    if subscription.owner() == owner_id {
161                        return Ok(RwPgResponse::empty_result(stmt_type));
162                    }
163                    check_owned_by_admin(&subscription.owner())?;
164                    subscription.id.into()
165                }
166                StatementType::ALTER_DATABASE => {
167                    let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
168                    session.check_privilege_for_drop_alter_db_schema(database)?;
169                    if database.owner() == owner_id {
170                        return Ok(RwPgResponse::empty_result(stmt_type));
171                    }
172                    check_owned_by_admin(&database.owner)?;
173                    database.id().into()
174                }
175                StatementType::ALTER_SCHEMA => {
176                    let schema =
177                        catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
178                    session.check_privilege_for_drop_alter_db_schema(schema)?;
179                    if schema.owner() == owner_id {
180                        return Ok(RwPgResponse::empty_result(stmt_type));
181                    }
182                    check_owned_by_admin(&schema.owner)?;
183                    schema.id().into()
184                }
185                StatementType::ALTER_CONNECTION => {
186                    let (connection, schema_name) = catalog_reader.get_connection_by_name(
187                        db_name,
188                        schema_path,
189                        &real_obj_name,
190                    )?;
191                    session.check_privilege_for_drop_alter(schema_name, &**connection)?;
192                    if connection.owner() == owner_id {
193                        return Ok(RwPgResponse::empty_result(stmt_type));
194                    }
195                    check_owned_by_admin(&connection.owner)?;
196                    connection.id.into()
197                }
198                StatementType::ALTER_FUNCTION => {
199                    let (function, schema_name) = if let Some(args) = func_args {
200                        let mut arg_types = Vec::with_capacity(args.len());
201                        for arg in args {
202                            arg_types.push(bind_data_type(&arg.data_type)?);
203                        }
204                        catalog_reader.get_function_by_name_args(
205                            db_name,
206                            schema_path,
207                            &real_obj_name,
208                            &arg_types,
209                        )?
210                    } else {
211                        let (functions, old_schema_name) = catalog_reader.get_functions_by_name(
212                            db_name,
213                            schema_path,
214                            &real_obj_name,
215                        )?;
216                        if functions.len() > 1 {
217                            return Err(ErrorCode::CatalogError(format!("function name {real_obj_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously.").into()).into());
218                        }
219                        (
220                            functions.into_iter().next().expect("no functions"),
221                            old_schema_name,
222                        )
223                    };
224                    session.check_privilege_for_drop_alter(schema_name, &**function)?;
225                    if function.owner == owner_id {
226                        return Ok(RwPgResponse::empty_result(stmt_type));
227                    }
228                    check_owned_by_admin(&function.owner)?;
229                    function.id.into()
230                }
231                _ => unreachable!(),
232            },
233            owner_id,
234        )
235    };
236
237    let catalog_writer = session.catalog_writer()?;
238    catalog_writer.alter_owner(object, owner_id).await?;
239
240    Ok(RwPgResponse::empty_result(stmt_type))
241}