risingwave_frontend/handler/
alter_owner.rs1use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::acl::AclMode;
19use risingwave_common::id::SchemaId;
20use risingwave_sqlparser::ast::{Ident, ObjectName};
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::Binder;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::{CatalogError, OwnedByUserCatalog};
26use crate::error::ErrorCode::PermissionDenied;
27use crate::error::Result;
28use crate::session::SessionImpl;
29use crate::user::UserId;
30use crate::user::user_catalog::UserCatalog;
31
32pub fn check_schema_create_privilege(
33 session: &Arc<SessionImpl>,
34 new_owner: &UserCatalog,
35 schema_id: SchemaId,
36) -> Result<()> {
37 if session.is_super_user() {
38 return Ok(());
39 }
40 if !new_owner.is_super && !new_owner.has_privilege(schema_id, AclMode::Create) {
41 return Err(PermissionDenied(
42 "Require new owner to have create privilege on the object.".to_owned(),
43 )
44 .into());
45 }
46 Ok(())
47}
48
49pub async fn handle_alter_owner(
50 handler_args: HandlerArgs,
51 obj_name: ObjectName,
52 new_owner_name: Ident,
53 stmt_type: StatementType,
54) -> Result<RwPgResponse> {
55 let session = handler_args.session;
56 let db_name = &session.database();
57 let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
58 let search_path = session.config().search_path();
59 let user_name = &session.user_name();
60 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
61
62 let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
63 let (object, owner_id) = {
64 let catalog_reader = session.env().catalog_reader().read_guard();
65 let user_reader = session.env().user_info_reader().read_guard();
66 let new_owner = user_reader
67 .get_user_by_name(&new_owner_name)
68 .ok_or(CatalogError::NotFound("user", new_owner_name))?;
69
70 let check_owned_by_admin = |owner: &UserId| -> Result<()> {
71 let user_catalog = user_reader.get_user_by_id(owner).unwrap();
72 if user_catalog.is_admin {
73 return Err(PermissionDenied(format!(
74 "Cannot change owner of {} owned by admin user {}",
75 obj_name.real_value(),
76 user_catalog.name
77 ))
78 .into());
79 }
80 Ok(())
81 };
82
83 let owner_id = new_owner.id;
84 (
85 match stmt_type {
86 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
87 let (table, schema_name) = catalog_reader.get_created_table_by_name(
88 db_name,
89 schema_path,
90 &real_obj_name,
91 )?;
92 session.check_privilege_for_drop_alter(schema_name, &**table)?;
93 let schema_id = catalog_reader
94 .get_schema_by_name(db_name, schema_name)?
95 .id();
96 check_schema_create_privilege(&session, new_owner, schema_id)?;
97 if table.owner() == owner_id {
98 return Ok(RwPgResponse::empty_result(stmt_type));
99 }
100 check_owned_by_admin(&table.owner)?;
101 table.id.into()
102 }
103 StatementType::ALTER_VIEW => {
104 let (view, schema_name) =
105 catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
106 session.check_privilege_for_drop_alter(schema_name, &**view)?;
107 let schema_id = catalog_reader
108 .get_schema_by_name(db_name, schema_name)?
109 .id();
110 check_schema_create_privilege(&session, new_owner, schema_id)?;
111 if view.owner() == owner_id {
112 return Ok(RwPgResponse::empty_result(stmt_type));
113 }
114 check_owned_by_admin(&view.owner)?;
115 view.id.into()
116 }
117 StatementType::ALTER_SOURCE => {
118 let (source, schema_name) =
119 catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
120 session.check_privilege_for_drop_alter(schema_name, &**source)?;
121 let schema_id = catalog_reader
122 .get_schema_by_name(db_name, schema_name)?
123 .id();
124 check_schema_create_privilege(&session, new_owner, schema_id)?;
125 if source.owner() == owner_id {
126 return Ok(RwPgResponse::empty_result(stmt_type));
127 }
128 check_owned_by_admin(&source.owner())?;
129 source.id.into()
130 }
131 StatementType::ALTER_SINK => {
132 let (sink, schema_name) = catalog_reader.get_created_sink_by_name(
133 db_name,
134 schema_path,
135 &real_obj_name,
136 )?;
137 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
138 let schema_id = catalog_reader
139 .get_schema_by_name(db_name, schema_name)?
140 .id();
141 check_schema_create_privilege(&session, new_owner, schema_id)?;
142 if sink.owner() == owner_id {
143 return Ok(RwPgResponse::empty_result(stmt_type));
144 }
145 check_owned_by_admin(&sink.owner())?;
146 sink.id.into()
147 }
148 StatementType::ALTER_SUBSCRIPTION => {
149 let (subscription, schema_name) = catalog_reader.get_subscription_by_name(
150 db_name,
151 schema_path,
152 &real_obj_name,
153 )?;
154 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
155 let schema_id = catalog_reader
156 .get_schema_by_name(db_name, schema_name)?
157 .id();
158 check_schema_create_privilege(&session, new_owner, schema_id)?;
159 if subscription.owner() == owner_id {
160 return Ok(RwPgResponse::empty_result(stmt_type));
161 }
162 check_owned_by_admin(&subscription.owner())?;
163 subscription.id.into()
164 }
165 StatementType::ALTER_DATABASE => {
166 let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
167 session.check_privilege_for_drop_alter_db_schema(database)?;
168 if database.owner() == owner_id {
169 return Ok(RwPgResponse::empty_result(stmt_type));
170 }
171 check_owned_by_admin(&database.owner)?;
172 database.id().into()
173 }
174 StatementType::ALTER_SCHEMA => {
175 let schema =
176 catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
177 session.check_privilege_for_drop_alter_db_schema(schema)?;
178 if schema.owner() == owner_id {
179 return Ok(RwPgResponse::empty_result(stmt_type));
180 }
181 check_owned_by_admin(&schema.owner)?;
182 schema.id().into()
183 }
184 StatementType::ALTER_CONNECTION => {
185 let (connection, schema_name) = catalog_reader.get_connection_by_name(
186 db_name,
187 schema_path,
188 &real_obj_name,
189 )?;
190 session.check_privilege_for_drop_alter(schema_name, &**connection)?;
191 if connection.owner() == owner_id {
192 return Ok(RwPgResponse::empty_result(stmt_type));
193 }
194 check_owned_by_admin(&connection.owner)?;
195 connection.id.into()
196 }
197 _ => unreachable!(),
198 },
199 owner_id,
200 )
201 };
202
203 let catalog_writer = session.catalog_writer()?;
204 catalog_writer.alter_owner(object, owner_id).await?;
205
206 Ok(RwPgResponse::empty_result(stmt_type))
207}