risingwave_frontend/handler/
alter_owner.rs1use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::acl::AclMode;
19use risingwave_pb::ddl_service::alter_owner_request::Object;
20use risingwave_pb::user::grant_privilege;
21use risingwave_sqlparser::ast::{Ident, ObjectName};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::Binder;
25use crate::catalog::root_catalog::SchemaPath;
26use crate::catalog::{CatalogError, OwnedByUserCatalog};
27use crate::error::ErrorCode::PermissionDenied;
28use crate::error::Result;
29use crate::session::SessionImpl;
30use crate::user::UserId;
31use crate::user::user_catalog::UserCatalog;
32
33pub fn check_schema_create_privilege(
34 session: &Arc<SessionImpl>,
35 new_owner: &UserCatalog,
36 schema_id: u32,
37) -> Result<()> {
38 if session.is_super_user() {
39 return Ok(());
40 }
41 if !new_owner.is_super
42 && !new_owner.has_privilege(
43 &grant_privilege::Object::SchemaId(schema_id),
44 AclMode::Create,
45 )
46 {
47 return Err(PermissionDenied(
48 "Require new owner to have create privilege on the object.".to_owned(),
49 )
50 .into());
51 }
52 Ok(())
53}
54
55pub async fn handle_alter_owner(
56 handler_args: HandlerArgs,
57 obj_name: ObjectName,
58 new_owner_name: Ident,
59 stmt_type: StatementType,
60) -> Result<RwPgResponse> {
61 let session = handler_args.session;
62 let db_name = &session.database();
63 let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
64 let search_path = session.config().search_path();
65 let user_name = &session.user_name();
66 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
67
68 let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
69 let (object, owner_id) = {
70 let catalog_reader = session.env().catalog_reader().read_guard();
71 let user_reader = session.env().user_info_reader().read_guard();
72 let new_owner = user_reader
73 .get_user_by_name(&new_owner_name)
74 .ok_or(CatalogError::NotFound("user", new_owner_name))?;
75
76 let check_owned_by_admin = |owner: &UserId| -> Result<()> {
77 let user_catalog = user_reader.get_user_by_id(owner).unwrap();
78 if user_catalog.is_admin {
79 return Err(PermissionDenied(
80 format!(
81 "Cannot change owner of {} owned by admin user {}",
82 obj_name.real_value(),
83 user_catalog.name
84 )
85 .to_owned(),
86 )
87 .into());
88 }
89 Ok(())
90 };
91
92 let owner_id = new_owner.id;
93 (
94 match stmt_type {
95 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
96 let (table, schema_name) = catalog_reader.get_created_table_by_name(
97 db_name,
98 schema_path,
99 &real_obj_name,
100 )?;
101 session.check_privilege_for_drop_alter(schema_name, &**table)?;
102 let schema_id = catalog_reader
103 .get_schema_by_name(db_name, schema_name)?
104 .id();
105 check_schema_create_privilege(&session, new_owner, schema_id)?;
106 if table.owner() == owner_id {
107 return Ok(RwPgResponse::empty_result(stmt_type));
108 }
109 check_owned_by_admin(&table.owner)?;
110 Object::TableId(table.id.table_id)
111 }
112 StatementType::ALTER_VIEW => {
113 let (view, schema_name) =
114 catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
115 session.check_privilege_for_drop_alter(schema_name, &**view)?;
116 let schema_id = catalog_reader
117 .get_schema_by_name(db_name, schema_name)?
118 .id();
119 check_schema_create_privilege(&session, new_owner, schema_id)?;
120 if view.owner() == owner_id {
121 return Ok(RwPgResponse::empty_result(stmt_type));
122 }
123 check_owned_by_admin(&view.owner)?;
124 Object::ViewId(view.id)
125 }
126 StatementType::ALTER_SOURCE => {
127 let (source, schema_name) =
128 catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
129 session.check_privilege_for_drop_alter(schema_name, &**source)?;
130 let schema_id = catalog_reader
131 .get_schema_by_name(db_name, schema_name)?
132 .id();
133 check_schema_create_privilege(&session, new_owner, schema_id)?;
134 if source.owner() == owner_id {
135 return Ok(RwPgResponse::empty_result(stmt_type));
136 }
137 check_owned_by_admin(&source.owner())?;
138 Object::SourceId(source.id)
139 }
140 StatementType::ALTER_SINK => {
141 let (sink, schema_name) = catalog_reader.get_created_sink_by_name(
142 db_name,
143 schema_path,
144 &real_obj_name,
145 )?;
146 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
147 let schema_id = catalog_reader
148 .get_schema_by_name(db_name, schema_name)?
149 .id();
150 check_schema_create_privilege(&session, new_owner, schema_id)?;
151 if sink.owner() == owner_id {
152 return Ok(RwPgResponse::empty_result(stmt_type));
153 }
154 check_owned_by_admin(&sink.owner())?;
155 Object::SinkId(sink.id.sink_id)
156 }
157 StatementType::ALTER_SUBSCRIPTION => {
158 let (subscription, schema_name) = catalog_reader.get_subscription_by_name(
159 db_name,
160 schema_path,
161 &real_obj_name,
162 )?;
163 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
164 let schema_id = catalog_reader
165 .get_schema_by_name(db_name, schema_name)?
166 .id();
167 check_schema_create_privilege(&session, new_owner, schema_id)?;
168 if subscription.owner() == owner_id {
169 return Ok(RwPgResponse::empty_result(stmt_type));
170 }
171 check_owned_by_admin(&subscription.owner())?;
172 Object::SubscriptionId(subscription.id.subscription_id)
173 }
174 StatementType::ALTER_DATABASE => {
175 let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
176 session.check_privilege_for_drop_alter_db_schema(database)?;
177 if database.owner() == owner_id {
178 return Ok(RwPgResponse::empty_result(stmt_type));
179 }
180 check_owned_by_admin(&database.owner)?;
181 Object::DatabaseId(database.id())
182 }
183 StatementType::ALTER_SCHEMA => {
184 let schema =
185 catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
186 session.check_privilege_for_drop_alter_db_schema(schema)?;
187 if schema.owner() == owner_id {
188 return Ok(RwPgResponse::empty_result(stmt_type));
189 }
190 check_owned_by_admin(&schema.owner)?;
191 Object::SchemaId(schema.id())
192 }
193 StatementType::ALTER_CONNECTION => {
194 let (connection, schema_name) = catalog_reader.get_connection_by_name(
195 db_name,
196 schema_path,
197 &real_obj_name,
198 )?;
199 session.check_privilege_for_drop_alter(schema_name, &**connection)?;
200 if connection.owner() == owner_id {
201 return Ok(RwPgResponse::empty_result(stmt_type));
202 }
203 check_owned_by_admin(&connection.owner)?;
204 Object::ConnectionId(connection.id)
205 }
206 _ => unreachable!(),
207 },
208 owner_id,
209 )
210 };
211
212 let catalog_writer = session.catalog_writer()?;
213 catalog_writer.alter_owner(object, owner_id).await?;
214
215 Ok(RwPgResponse::empty_result(stmt_type))
216}