risingwave_frontend/handler/
alter_owner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::acl::AclMode;
19use risingwave_pb::ddl_service::alter_owner_request::Object;
20use risingwave_pb::user::grant_privilege;
21use risingwave_sqlparser::ast::{Ident, ObjectName};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::Binder;
25use crate::catalog::root_catalog::SchemaPath;
26use crate::catalog::{CatalogError, OwnedByUserCatalog};
27use crate::error::ErrorCode::PermissionDenied;
28use crate::error::Result;
29use crate::session::SessionImpl;
30use crate::user::UserId;
31use crate::user::user_catalog::UserCatalog;
32
33pub fn check_schema_create_privilege(
34    session: &Arc<SessionImpl>,
35    new_owner: &UserCatalog,
36    schema_id: u32,
37) -> Result<()> {
38    if session.is_super_user() {
39        return Ok(());
40    }
41    if !new_owner.is_super
42        && !new_owner.has_privilege(
43            &grant_privilege::Object::SchemaId(schema_id),
44            AclMode::Create,
45        )
46    {
47        return Err(PermissionDenied(
48            "Require new owner to have create privilege on the object.".to_owned(),
49        )
50        .into());
51    }
52    Ok(())
53}
54
55pub async fn handle_alter_owner(
56    handler_args: HandlerArgs,
57    obj_name: ObjectName,
58    new_owner_name: Ident,
59    stmt_type: StatementType,
60) -> Result<RwPgResponse> {
61    let session = handler_args.session;
62    let db_name = &session.database();
63    let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
64    let search_path = session.config().search_path();
65    let user_name = &session.user_name();
66    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
67
68    let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
69    let (object, owner_id) = {
70        let catalog_reader = session.env().catalog_reader().read_guard();
71        let user_reader = session.env().user_info_reader().read_guard();
72        let new_owner = user_reader
73            .get_user_by_name(&new_owner_name)
74            .ok_or(CatalogError::NotFound("user", new_owner_name))?;
75
76        let check_owned_by_admin = |owner: &UserId| -> Result<()> {
77            let user_catalog = user_reader.get_user_by_id(owner).unwrap();
78            if user_catalog.is_admin {
79                return Err(PermissionDenied(
80                    format!(
81                        "Cannot change owner of {} owned by admin user {}",
82                        obj_name.real_value(),
83                        user_catalog.name
84                    )
85                    .to_owned(),
86                )
87                .into());
88            }
89            Ok(())
90        };
91
92        let owner_id = new_owner.id;
93        (
94            match stmt_type {
95                StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
96                    let (table, schema_name) = catalog_reader.get_created_table_by_name(
97                        db_name,
98                        schema_path,
99                        &real_obj_name,
100                    )?;
101                    session.check_privilege_for_drop_alter(schema_name, &**table)?;
102                    let schema_id = catalog_reader
103                        .get_schema_by_name(db_name, schema_name)?
104                        .id();
105                    check_schema_create_privilege(&session, new_owner, schema_id)?;
106                    if table.owner() == owner_id {
107                        return Ok(RwPgResponse::empty_result(stmt_type));
108                    }
109                    check_owned_by_admin(&table.owner)?;
110                    Object::TableId(table.id.table_id)
111                }
112                StatementType::ALTER_VIEW => {
113                    let (view, schema_name) =
114                        catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
115                    session.check_privilege_for_drop_alter(schema_name, &**view)?;
116                    let schema_id = catalog_reader
117                        .get_schema_by_name(db_name, schema_name)?
118                        .id();
119                    check_schema_create_privilege(&session, new_owner, schema_id)?;
120                    if view.owner() == owner_id {
121                        return Ok(RwPgResponse::empty_result(stmt_type));
122                    }
123                    check_owned_by_admin(&view.owner)?;
124                    Object::ViewId(view.id)
125                }
126                StatementType::ALTER_SOURCE => {
127                    let (source, schema_name) =
128                        catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
129                    session.check_privilege_for_drop_alter(schema_name, &**source)?;
130                    let schema_id = catalog_reader
131                        .get_schema_by_name(db_name, schema_name)?
132                        .id();
133                    check_schema_create_privilege(&session, new_owner, schema_id)?;
134                    if source.owner() == owner_id {
135                        return Ok(RwPgResponse::empty_result(stmt_type));
136                    }
137                    check_owned_by_admin(&source.owner())?;
138                    Object::SourceId(source.id)
139                }
140                StatementType::ALTER_SINK => {
141                    let (sink, schema_name) = catalog_reader.get_created_sink_by_name(
142                        db_name,
143                        schema_path,
144                        &real_obj_name,
145                    )?;
146                    session.check_privilege_for_drop_alter(schema_name, &**sink)?;
147                    let schema_id = catalog_reader
148                        .get_schema_by_name(db_name, schema_name)?
149                        .id();
150                    check_schema_create_privilege(&session, new_owner, schema_id)?;
151                    if sink.owner() == owner_id {
152                        return Ok(RwPgResponse::empty_result(stmt_type));
153                    }
154                    check_owned_by_admin(&sink.owner())?;
155                    Object::SinkId(sink.id.sink_id)
156                }
157                StatementType::ALTER_SUBSCRIPTION => {
158                    let (subscription, schema_name) = catalog_reader.get_subscription_by_name(
159                        db_name,
160                        schema_path,
161                        &real_obj_name,
162                    )?;
163                    session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
164                    let schema_id = catalog_reader
165                        .get_schema_by_name(db_name, schema_name)?
166                        .id();
167                    check_schema_create_privilege(&session, new_owner, schema_id)?;
168                    if subscription.owner() == owner_id {
169                        return Ok(RwPgResponse::empty_result(stmt_type));
170                    }
171                    check_owned_by_admin(&subscription.owner())?;
172                    Object::SubscriptionId(subscription.id.subscription_id)
173                }
174                StatementType::ALTER_DATABASE => {
175                    let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
176                    session.check_privilege_for_drop_alter_db_schema(database)?;
177                    if database.owner() == owner_id {
178                        return Ok(RwPgResponse::empty_result(stmt_type));
179                    }
180                    check_owned_by_admin(&database.owner)?;
181                    Object::DatabaseId(database.id())
182                }
183                StatementType::ALTER_SCHEMA => {
184                    let schema =
185                        catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
186                    session.check_privilege_for_drop_alter_db_schema(schema)?;
187                    if schema.owner() == owner_id {
188                        return Ok(RwPgResponse::empty_result(stmt_type));
189                    }
190                    check_owned_by_admin(&schema.owner)?;
191                    Object::SchemaId(schema.id())
192                }
193                StatementType::ALTER_CONNECTION => {
194                    let (connection, schema_name) = catalog_reader.get_connection_by_name(
195                        db_name,
196                        schema_path,
197                        &real_obj_name,
198                    )?;
199                    session.check_privilege_for_drop_alter(schema_name, &**connection)?;
200                    if connection.owner() == owner_id {
201                        return Ok(RwPgResponse::empty_result(stmt_type));
202                    }
203                    check_owned_by_admin(&connection.owner)?;
204                    Object::ConnectionId(connection.id)
205                }
206                _ => unreachable!(),
207            },
208            owner_id,
209        )
210    };
211
212    let catalog_writer = session.catalog_writer()?;
213    catalog_writer.alter_owner(object, owner_id).await?;
214
215    Ok(RwPgResponse::empty_result(stmt_type))
216}