risingwave_frontend/handler/
alter_owner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::acl::AclMode;
19use risingwave_common::id::SchemaId;
20use risingwave_sqlparser::ast::{Ident, ObjectName};
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::Binder;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::{CatalogError, OwnedByUserCatalog};
26use crate::error::ErrorCode::PermissionDenied;
27use crate::error::Result;
28use crate::session::SessionImpl;
29use crate::user::UserId;
30use crate::user::user_catalog::UserCatalog;
31
32pub fn check_schema_create_privilege(
33    session: &Arc<SessionImpl>,
34    new_owner: &UserCatalog,
35    schema_id: SchemaId,
36) -> Result<()> {
37    if session.is_super_user() {
38        return Ok(());
39    }
40    if !new_owner.is_super && !new_owner.has_privilege(schema_id, AclMode::Create) {
41        return Err(PermissionDenied(
42            "Require new owner to have create privilege on the object.".to_owned(),
43        )
44        .into());
45    }
46    Ok(())
47}
48
49pub async fn handle_alter_owner(
50    handler_args: HandlerArgs,
51    obj_name: ObjectName,
52    new_owner_name: Ident,
53    stmt_type: StatementType,
54) -> Result<RwPgResponse> {
55    let session = handler_args.session;
56    let db_name = &session.database();
57    let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
58    let search_path = session.config().search_path();
59    let user_name = &session.user_name();
60    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
61
62    let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
63    let (object, owner_id) = {
64        let catalog_reader = session.env().catalog_reader().read_guard();
65        let user_reader = session.env().user_info_reader().read_guard();
66        let new_owner = user_reader
67            .get_user_by_name(&new_owner_name)
68            .ok_or(CatalogError::NotFound("user", new_owner_name))?;
69
70        let check_owned_by_admin = |owner: &UserId| -> Result<()> {
71            let user_catalog = user_reader.get_user_by_id(owner).unwrap();
72            if user_catalog.is_admin {
73                return Err(PermissionDenied(format!(
74                    "Cannot change owner of {} owned by admin user {}",
75                    obj_name.real_value(),
76                    user_catalog.name
77                ))
78                .into());
79            }
80            Ok(())
81        };
82
83        let owner_id = new_owner.id;
84        (
85            match stmt_type {
86                StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
87                    let (table, schema_name) = catalog_reader.get_created_table_by_name(
88                        db_name,
89                        schema_path,
90                        &real_obj_name,
91                    )?;
92                    session.check_privilege_for_drop_alter(schema_name, &**table)?;
93                    let schema_id = catalog_reader
94                        .get_schema_by_name(db_name, schema_name)?
95                        .id();
96                    check_schema_create_privilege(&session, new_owner, schema_id)?;
97                    if table.owner() == owner_id {
98                        return Ok(RwPgResponse::empty_result(stmt_type));
99                    }
100                    check_owned_by_admin(&table.owner)?;
101                    table.id.into()
102                }
103                StatementType::ALTER_VIEW => {
104                    let (view, schema_name) =
105                        catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
106                    session.check_privilege_for_drop_alter(schema_name, &**view)?;
107                    let schema_id = catalog_reader
108                        .get_schema_by_name(db_name, schema_name)?
109                        .id();
110                    check_schema_create_privilege(&session, new_owner, schema_id)?;
111                    if view.owner() == owner_id {
112                        return Ok(RwPgResponse::empty_result(stmt_type));
113                    }
114                    check_owned_by_admin(&view.owner)?;
115                    view.id.into()
116                }
117                StatementType::ALTER_SOURCE => {
118                    let (source, schema_name) =
119                        catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
120                    session.check_privilege_for_drop_alter(schema_name, &**source)?;
121                    let schema_id = catalog_reader
122                        .get_schema_by_name(db_name, schema_name)?
123                        .id();
124                    check_schema_create_privilege(&session, new_owner, schema_id)?;
125                    if source.owner() == owner_id {
126                        return Ok(RwPgResponse::empty_result(stmt_type));
127                    }
128                    check_owned_by_admin(&source.owner())?;
129                    source.id.into()
130                }
131                StatementType::ALTER_SINK => {
132                    let (sink, schema_name) = catalog_reader.get_created_sink_by_name(
133                        db_name,
134                        schema_path,
135                        &real_obj_name,
136                    )?;
137                    session.check_privilege_for_drop_alter(schema_name, &**sink)?;
138                    let schema_id = catalog_reader
139                        .get_schema_by_name(db_name, schema_name)?
140                        .id();
141                    check_schema_create_privilege(&session, new_owner, schema_id)?;
142                    if sink.owner() == owner_id {
143                        return Ok(RwPgResponse::empty_result(stmt_type));
144                    }
145                    check_owned_by_admin(&sink.owner())?;
146                    sink.id.into()
147                }
148                StatementType::ALTER_SUBSCRIPTION => {
149                    let (subscription, schema_name) = catalog_reader.get_subscription_by_name(
150                        db_name,
151                        schema_path,
152                        &real_obj_name,
153                    )?;
154                    session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
155                    let schema_id = catalog_reader
156                        .get_schema_by_name(db_name, schema_name)?
157                        .id();
158                    check_schema_create_privilege(&session, new_owner, schema_id)?;
159                    if subscription.owner() == owner_id {
160                        return Ok(RwPgResponse::empty_result(stmt_type));
161                    }
162                    check_owned_by_admin(&subscription.owner())?;
163                    subscription.id.into()
164                }
165                StatementType::ALTER_DATABASE => {
166                    let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
167                    session.check_privilege_for_drop_alter_db_schema(database)?;
168                    if database.owner() == owner_id {
169                        return Ok(RwPgResponse::empty_result(stmt_type));
170                    }
171                    check_owned_by_admin(&database.owner)?;
172                    database.id().into()
173                }
174                StatementType::ALTER_SCHEMA => {
175                    let schema =
176                        catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
177                    session.check_privilege_for_drop_alter_db_schema(schema)?;
178                    if schema.owner() == owner_id {
179                        return Ok(RwPgResponse::empty_result(stmt_type));
180                    }
181                    check_owned_by_admin(&schema.owner)?;
182                    schema.id().into()
183                }
184                StatementType::ALTER_CONNECTION => {
185                    let (connection, schema_name) = catalog_reader.get_connection_by_name(
186                        db_name,
187                        schema_path,
188                        &real_obj_name,
189                    )?;
190                    session.check_privilege_for_drop_alter(schema_name, &**connection)?;
191                    if connection.owner() == owner_id {
192                        return Ok(RwPgResponse::empty_result(stmt_type));
193                    }
194                    check_owned_by_admin(&connection.owner)?;
195                    connection.id.into()
196                }
197                _ => unreachable!(),
198            },
199            owner_id,
200        )
201    };
202
203    let catalog_writer = session.catalog_writer()?;
204    catalog_writer.alter_owner(object, owner_id).await?;
205
206    Ok(RwPgResponse::empty_result(stmt_type))
207}