risingwave_frontend/handler/
alter_owner.rs1use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::acl::AclMode;
19use risingwave_pb::ddl_service::alter_owner_request::Object;
20use risingwave_pb::user::grant_privilege;
21use risingwave_sqlparser::ast::{Ident, ObjectName};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::Binder;
25use crate::catalog::root_catalog::SchemaPath;
26use crate::catalog::{CatalogError, OwnedByUserCatalog};
27use crate::error::ErrorCode::PermissionDenied;
28use crate::error::Result;
29use crate::session::SessionImpl;
30use crate::user::user_catalog::UserCatalog;
31
32pub fn check_schema_create_privilege(
33 session: &Arc<SessionImpl>,
34 new_owner: &UserCatalog,
35 schema_id: u32,
36) -> Result<()> {
37 if session.is_super_user() {
38 return Ok(());
39 }
40 if !new_owner.is_super
41 && !new_owner.has_privilege(
42 &grant_privilege::Object::SchemaId(schema_id),
43 AclMode::Create,
44 )
45 {
46 return Err(PermissionDenied(
47 "Require new owner to have create privilege on the object.".to_owned(),
48 )
49 .into());
50 }
51 Ok(())
52}
53
54pub async fn handle_alter_owner(
55 handler_args: HandlerArgs,
56 obj_name: ObjectName,
57 new_owner_name: Ident,
58 stmt_type: StatementType,
59) -> Result<RwPgResponse> {
60 let session = handler_args.session;
61 let db_name = &session.database();
62 let (schema_name, real_obj_name) =
63 Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
64 let search_path = session.config().search_path();
65 let user_name = &session.user_name();
66 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
67
68 let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
69 let (object, owner_id) = {
70 let catalog_reader = session.env().catalog_reader().read_guard();
71 let user_reader = session.env().user_info_reader().read_guard();
72 let new_owner = user_reader
73 .get_user_by_name(&new_owner_name)
74 .ok_or(CatalogError::NotFound("user", new_owner_name))?;
75 let owner_id = new_owner.id;
76 (
77 match stmt_type {
78 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
79 let (table, schema_name) = catalog_reader.get_created_table_by_name(
80 db_name,
81 schema_path,
82 &real_obj_name,
83 )?;
84 session.check_privilege_for_drop_alter(schema_name, &**table)?;
85 let schema_id = catalog_reader
86 .get_schema_by_name(db_name, schema_name)?
87 .id();
88 check_schema_create_privilege(&session, new_owner, schema_id)?;
89 if table.owner() == owner_id {
90 return Ok(RwPgResponse::empty_result(stmt_type));
91 }
92 Object::TableId(table.id.table_id)
93 }
94 StatementType::ALTER_VIEW => {
95 let (view, schema_name) =
96 catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
97 session.check_privilege_for_drop_alter(schema_name, &**view)?;
98 let schema_id = catalog_reader
99 .get_schema_by_name(db_name, schema_name)?
100 .id();
101 check_schema_create_privilege(&session, new_owner, schema_id)?;
102 if view.owner() == owner_id {
103 return Ok(RwPgResponse::empty_result(stmt_type));
104 }
105 Object::ViewId(view.id)
106 }
107 StatementType::ALTER_SOURCE => {
108 let (source, schema_name) =
109 catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
110 session.check_privilege_for_drop_alter(schema_name, &**source)?;
111 let schema_id = catalog_reader
112 .get_schema_by_name(db_name, schema_name)?
113 .id();
114 check_schema_create_privilege(&session, new_owner, schema_id)?;
115 if source.owner() == owner_id {
116 return Ok(RwPgResponse::empty_result(stmt_type));
117 }
118 Object::SourceId(source.id)
119 }
120 StatementType::ALTER_SINK => {
121 let (sink, schema_name) =
122 catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?;
123 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
124 let schema_id = catalog_reader
125 .get_schema_by_name(db_name, schema_name)?
126 .id();
127 check_schema_create_privilege(&session, new_owner, schema_id)?;
128 if sink.owner() == owner_id {
129 return Ok(RwPgResponse::empty_result(stmt_type));
130 }
131 Object::SinkId(sink.id.sink_id)
132 }
133 StatementType::ALTER_SUBSCRIPTION => {
134 let (subscription, schema_name) = catalog_reader.get_subscription_by_name(
135 db_name,
136 schema_path,
137 &real_obj_name,
138 )?;
139 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
140 let schema_id = catalog_reader
141 .get_schema_by_name(db_name, schema_name)?
142 .id();
143 check_schema_create_privilege(&session, new_owner, schema_id)?;
144 if subscription.owner() == owner_id {
145 return Ok(RwPgResponse::empty_result(stmt_type));
146 }
147 Object::SubscriptionId(subscription.id.subscription_id)
148 }
149 StatementType::ALTER_DATABASE => {
150 let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
151 session.check_privilege_for_drop_alter_db_schema(database)?;
152 if database.owner() == owner_id {
153 return Ok(RwPgResponse::empty_result(stmt_type));
154 }
155 Object::DatabaseId(database.id())
156 }
157 StatementType::ALTER_SCHEMA => {
158 let schema =
159 catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
160 session.check_privilege_for_drop_alter_db_schema(schema)?;
161 if schema.owner() == owner_id {
162 return Ok(RwPgResponse::empty_result(stmt_type));
163 }
164 Object::SchemaId(schema.id())
165 }
166 StatementType::ALTER_CONNECTION => {
167 let (connection, schema_name) = catalog_reader.get_connection_by_name(
168 db_name,
169 schema_path,
170 &real_obj_name,
171 )?;
172 session.check_privilege_for_drop_alter(schema_name, &**connection)?;
173 if connection.owner() == owner_id {
174 return Ok(RwPgResponse::empty_result(stmt_type));
175 }
176 Object::ConnectionId(connection.id)
177 }
178 _ => unreachable!(),
179 },
180 owner_id,
181 )
182 };
183
184 let catalog_writer = session.catalog_writer()?;
185 catalog_writer.alter_owner(object, owner_id).await?;
186
187 Ok(RwPgResponse::empty_result(stmt_type))
188}