risingwave_frontend/handler/
alter_owner.rs1use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::acl::AclMode;
19use risingwave_common::id::SchemaId;
20use risingwave_sqlparser::ast::{Ident, ObjectName, OperateFunctionArg};
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::{CatalogError, OwnedByUserCatalog};
25use crate::error::ErrorCode::{self, PermissionDenied};
26use crate::error::Result;
27use crate::session::SessionImpl;
28use crate::user::UserId;
29use crate::user::user_catalog::UserCatalog;
30use crate::{Binder, bind_data_type};
31
32pub fn check_schema_create_privilege(
33 session: &Arc<SessionImpl>,
34 new_owner: &UserCatalog,
35 schema_id: SchemaId,
36) -> Result<()> {
37 if session.is_super_user() {
38 return Ok(());
39 }
40 if !new_owner.is_super && !new_owner.has_privilege(schema_id, AclMode::Create) {
41 return Err(PermissionDenied(
42 "Require new owner to have create privilege on the object.".to_owned(),
43 )
44 .into());
45 }
46 Ok(())
47}
48
49pub async fn handle_alter_owner(
50 handler_args: HandlerArgs,
51 obj_name: ObjectName,
52 new_owner_name: Ident,
53 stmt_type: StatementType,
54 func_args: Option<Vec<OperateFunctionArg>>,
55) -> Result<RwPgResponse> {
56 let session = handler_args.session;
57 let db_name = &session.database();
58 let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
59 let search_path = session.config().search_path();
60 let user_name = &session.user_name();
61 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
62
63 let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
64 let (object, owner_id) = {
65 let catalog_reader = session.env().catalog_reader().read_guard();
66 let user_reader = session.env().user_info_reader().read_guard();
67 let new_owner = user_reader
68 .get_user_by_name(&new_owner_name)
69 .ok_or(CatalogError::not_found("user", new_owner_name))?;
70
71 let check_owned_by_admin = |owner: &UserId| -> Result<()> {
72 let user_catalog = user_reader.get_user_by_id(owner).unwrap();
73 if user_catalog.is_admin {
74 return Err(PermissionDenied(format!(
75 "Cannot change owner of {} owned by admin user {}",
76 obj_name.real_value(),
77 user_catalog.name
78 ))
79 .into());
80 }
81 Ok(())
82 };
83
84 let owner_id = new_owner.id;
85 (
86 match stmt_type {
87 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
88 let (table, schema_name) = catalog_reader.get_created_table_by_name(
89 db_name,
90 schema_path,
91 &real_obj_name,
92 )?;
93 session.check_privilege_for_drop_alter(schema_name, &**table)?;
94 let schema_id = catalog_reader
95 .get_schema_by_name(db_name, schema_name)?
96 .id();
97 check_schema_create_privilege(&session, new_owner, schema_id)?;
98 if table.owner() == owner_id {
99 return Ok(RwPgResponse::empty_result(stmt_type));
100 }
101 check_owned_by_admin(&table.owner)?;
102 table.id.into()
103 }
104 StatementType::ALTER_VIEW => {
105 let (view, schema_name) =
106 catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
107 session.check_privilege_for_drop_alter(schema_name, &**view)?;
108 let schema_id = catalog_reader
109 .get_schema_by_name(db_name, schema_name)?
110 .id();
111 check_schema_create_privilege(&session, new_owner, schema_id)?;
112 if view.owner() == owner_id {
113 return Ok(RwPgResponse::empty_result(stmt_type));
114 }
115 check_owned_by_admin(&view.owner)?;
116 view.id.into()
117 }
118 StatementType::ALTER_SOURCE => {
119 let (source, schema_name) =
120 catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
121 session.check_privilege_for_drop_alter(schema_name, &**source)?;
122 let schema_id = catalog_reader
123 .get_schema_by_name(db_name, schema_name)?
124 .id();
125 check_schema_create_privilege(&session, new_owner, schema_id)?;
126 if source.owner() == owner_id {
127 return Ok(RwPgResponse::empty_result(stmt_type));
128 }
129 check_owned_by_admin(&source.owner())?;
130 source.id.into()
131 }
132 StatementType::ALTER_SINK => {
133 let (sink, schema_name) = catalog_reader.get_created_sink_by_name(
134 db_name,
135 schema_path,
136 &real_obj_name,
137 )?;
138 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
139 let schema_id = catalog_reader
140 .get_schema_by_name(db_name, schema_name)?
141 .id();
142 check_schema_create_privilege(&session, new_owner, schema_id)?;
143 if sink.owner() == owner_id {
144 return Ok(RwPgResponse::empty_result(stmt_type));
145 }
146 check_owned_by_admin(&sink.owner())?;
147 sink.id.into()
148 }
149 StatementType::ALTER_SUBSCRIPTION => {
150 let (subscription, schema_name) = catalog_reader.get_subscription_by_name(
151 db_name,
152 schema_path,
153 &real_obj_name,
154 )?;
155 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
156 let schema_id = catalog_reader
157 .get_schema_by_name(db_name, schema_name)?
158 .id();
159 check_schema_create_privilege(&session, new_owner, schema_id)?;
160 if subscription.owner() == owner_id {
161 return Ok(RwPgResponse::empty_result(stmt_type));
162 }
163 check_owned_by_admin(&subscription.owner())?;
164 subscription.id.into()
165 }
166 StatementType::ALTER_DATABASE => {
167 let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
168 session.check_privilege_for_drop_alter_db_schema(database)?;
169 if database.owner() == owner_id {
170 return Ok(RwPgResponse::empty_result(stmt_type));
171 }
172 check_owned_by_admin(&database.owner)?;
173 database.id().into()
174 }
175 StatementType::ALTER_SCHEMA => {
176 let schema =
177 catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
178 session.check_privilege_for_drop_alter_db_schema(schema)?;
179 if schema.owner() == owner_id {
180 return Ok(RwPgResponse::empty_result(stmt_type));
181 }
182 check_owned_by_admin(&schema.owner)?;
183 schema.id().into()
184 }
185 StatementType::ALTER_CONNECTION => {
186 let (connection, schema_name) = catalog_reader.get_connection_by_name(
187 db_name,
188 schema_path,
189 &real_obj_name,
190 )?;
191 session.check_privilege_for_drop_alter(schema_name, &**connection)?;
192 if connection.owner() == owner_id {
193 return Ok(RwPgResponse::empty_result(stmt_type));
194 }
195 check_owned_by_admin(&connection.owner)?;
196 connection.id.into()
197 }
198 StatementType::ALTER_FUNCTION => {
199 let (function, schema_name) = if let Some(args) = func_args {
200 let mut arg_types = Vec::with_capacity(args.len());
201 for arg in args {
202 arg_types.push(bind_data_type(&arg.data_type)?);
203 }
204 catalog_reader.get_function_by_name_args(
205 db_name,
206 schema_path,
207 &real_obj_name,
208 &arg_types,
209 )?
210 } else {
211 let (functions, old_schema_name) = catalog_reader.get_functions_by_name(
212 db_name,
213 schema_path,
214 &real_obj_name,
215 )?;
216 if functions.len() > 1 {
217 return Err(ErrorCode::CatalogError(format!("function name {real_obj_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously.").into()).into());
218 }
219 (
220 functions.into_iter().next().expect("no functions"),
221 old_schema_name,
222 )
223 };
224 session.check_privilege_for_drop_alter(schema_name, &**function)?;
225 if function.owner == owner_id {
226 return Ok(RwPgResponse::empty_result(stmt_type));
227 }
228 check_owned_by_admin(&function.owner)?;
229 function.id.into()
230 }
231 _ => unreachable!(),
232 },
233 owner_id,
234 )
235 };
236
237 let catalog_writer = session.catalog_writer()?;
238 catalog_writer.alter_owner(object, owner_id).await?;
239
240 Ok(RwPgResponse::empty_result(stmt_type))
241}