risingwave_frontend/handler/
alter_owner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::acl::AclMode;
19use risingwave_pb::ddl_service::alter_owner_request::Object;
20use risingwave_pb::user::grant_privilege;
21use risingwave_sqlparser::ast::{Ident, ObjectName};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::Binder;
25use crate::catalog::root_catalog::SchemaPath;
26use crate::catalog::{CatalogError, OwnedByUserCatalog};
27use crate::error::ErrorCode::PermissionDenied;
28use crate::error::Result;
29use crate::session::SessionImpl;
30use crate::user::user_catalog::UserCatalog;
31
32pub fn check_schema_create_privilege(
33    session: &Arc<SessionImpl>,
34    new_owner: &UserCatalog,
35    schema_id: u32,
36) -> Result<()> {
37    if session.is_super_user() {
38        return Ok(());
39    }
40    if !new_owner.is_super
41        && !new_owner.has_privilege(
42            &grant_privilege::Object::SchemaId(schema_id),
43            AclMode::Create,
44        )
45    {
46        return Err(PermissionDenied(
47            "Require new owner to have create privilege on the object.".to_owned(),
48        )
49        .into());
50    }
51    Ok(())
52}
53
54pub async fn handle_alter_owner(
55    handler_args: HandlerArgs,
56    obj_name: ObjectName,
57    new_owner_name: Ident,
58    stmt_type: StatementType,
59) -> Result<RwPgResponse> {
60    let session = handler_args.session;
61    let db_name = &session.database();
62    let (schema_name, real_obj_name) =
63        Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
64    let search_path = session.config().search_path();
65    let user_name = &session.user_name();
66    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
67
68    let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
69    let (object, owner_id) = {
70        let catalog_reader = session.env().catalog_reader().read_guard();
71        let user_reader = session.env().user_info_reader().read_guard();
72        let new_owner = user_reader
73            .get_user_by_name(&new_owner_name)
74            .ok_or(CatalogError::NotFound("user", new_owner_name))?;
75        let owner_id = new_owner.id;
76        (
77            match stmt_type {
78                StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
79                    let (table, schema_name) = catalog_reader.get_created_table_by_name(
80                        db_name,
81                        schema_path,
82                        &real_obj_name,
83                    )?;
84                    session.check_privilege_for_drop_alter(schema_name, &**table)?;
85                    let schema_id = catalog_reader
86                        .get_schema_by_name(db_name, schema_name)?
87                        .id();
88                    check_schema_create_privilege(&session, new_owner, schema_id)?;
89                    if table.owner() == owner_id {
90                        return Ok(RwPgResponse::empty_result(stmt_type));
91                    }
92                    Object::TableId(table.id.table_id)
93                }
94                StatementType::ALTER_VIEW => {
95                    let (view, schema_name) =
96                        catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
97                    session.check_privilege_for_drop_alter(schema_name, &**view)?;
98                    let schema_id = catalog_reader
99                        .get_schema_by_name(db_name, schema_name)?
100                        .id();
101                    check_schema_create_privilege(&session, new_owner, schema_id)?;
102                    if view.owner() == owner_id {
103                        return Ok(RwPgResponse::empty_result(stmt_type));
104                    }
105                    Object::ViewId(view.id)
106                }
107                StatementType::ALTER_SOURCE => {
108                    let (source, schema_name) =
109                        catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
110                    session.check_privilege_for_drop_alter(schema_name, &**source)?;
111                    let schema_id = catalog_reader
112                        .get_schema_by_name(db_name, schema_name)?
113                        .id();
114                    check_schema_create_privilege(&session, new_owner, schema_id)?;
115                    if source.owner() == owner_id {
116                        return Ok(RwPgResponse::empty_result(stmt_type));
117                    }
118                    Object::SourceId(source.id)
119                }
120                StatementType::ALTER_SINK => {
121                    let (sink, schema_name) =
122                        catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?;
123                    session.check_privilege_for_drop_alter(schema_name, &**sink)?;
124                    let schema_id = catalog_reader
125                        .get_schema_by_name(db_name, schema_name)?
126                        .id();
127                    check_schema_create_privilege(&session, new_owner, schema_id)?;
128                    if sink.owner() == owner_id {
129                        return Ok(RwPgResponse::empty_result(stmt_type));
130                    }
131                    Object::SinkId(sink.id.sink_id)
132                }
133                StatementType::ALTER_SUBSCRIPTION => {
134                    let (subscription, schema_name) = catalog_reader.get_subscription_by_name(
135                        db_name,
136                        schema_path,
137                        &real_obj_name,
138                    )?;
139                    session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
140                    let schema_id = catalog_reader
141                        .get_schema_by_name(db_name, schema_name)?
142                        .id();
143                    check_schema_create_privilege(&session, new_owner, schema_id)?;
144                    if subscription.owner() == owner_id {
145                        return Ok(RwPgResponse::empty_result(stmt_type));
146                    }
147                    Object::SubscriptionId(subscription.id.subscription_id)
148                }
149                StatementType::ALTER_DATABASE => {
150                    let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
151                    session.check_privilege_for_drop_alter_db_schema(database)?;
152                    if database.owner() == owner_id {
153                        return Ok(RwPgResponse::empty_result(stmt_type));
154                    }
155                    Object::DatabaseId(database.id())
156                }
157                StatementType::ALTER_SCHEMA => {
158                    let schema =
159                        catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
160                    session.check_privilege_for_drop_alter_db_schema(schema)?;
161                    if schema.owner() == owner_id {
162                        return Ok(RwPgResponse::empty_result(stmt_type));
163                    }
164                    Object::SchemaId(schema.id())
165                }
166                StatementType::ALTER_CONNECTION => {
167                    let (connection, schema_name) = catalog_reader.get_connection_by_name(
168                        db_name,
169                        schema_path,
170                        &real_obj_name,
171                    )?;
172                    session.check_privilege_for_drop_alter(schema_name, &**connection)?;
173                    if connection.owner() == owner_id {
174                        return Ok(RwPgResponse::empty_result(stmt_type));
175                    }
176                    Object::ConnectionId(connection.id)
177                }
178                _ => unreachable!(),
179            },
180            owner_id,
181        )
182    };
183
184    let catalog_writer = session.catalog_writer()?;
185    catalog_writer.alter_owner(object, owner_id).await?;
186
187    Ok(RwPgResponse::empty_result(stmt_type))
188}