risingwave_frontend/handler/
alter_rename.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::is_system_schema;
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::catalog::table_catalog::TableType;
24use crate::error::{ErrorCode, Result};
25
26pub async fn handle_rename_table(
27 handler_args: HandlerArgs,
28 table_type: TableType,
29 table_name: ObjectName,
30 new_table_name: ObjectName,
31) -> Result<RwPgResponse> {
32 let session = handler_args.session;
33 let db_name = &session.database();
34 let (schema_name, real_table_name) =
35 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
36 let new_table_name = Binder::resolve_table_name(new_table_name)?;
37 let search_path = session.config().search_path();
38 let user_name = &session.user_name();
39
40 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
41
42 let table_id = {
43 let reader = session.env().catalog_reader().read_guard();
44 let (table, schema_name) =
45 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
46 if table_type != table.table_type {
47 return Err(ErrorCode::InvalidInputSyntax(format!(
48 "\"{table_name}\" is not a {}",
49 table_type.to_prost().as_str_name()
50 ))
51 .into());
52 }
53
54 session.check_privilege_for_drop_alter(schema_name, &**table)?;
55 table.id
56 };
57
58 let catalog_writer = session.catalog_writer()?;
59 catalog_writer
60 .alter_name(table_id.into(), &new_table_name)
61 .await?;
62
63 let stmt_type = match table_type {
64 TableType::Table => StatementType::ALTER_TABLE,
65 TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW,
66 _ => unreachable!(),
67 };
68 Ok(PgResponse::empty_result(stmt_type))
69}
70
71pub async fn handle_rename_index(
72 handler_args: HandlerArgs,
73 index_name: ObjectName,
74 new_index_name: ObjectName,
75) -> Result<RwPgResponse> {
76 let session = handler_args.session;
77 let db_name = &session.database();
78 let (schema_name, real_index_name) =
79 Binder::resolve_schema_qualified_name(db_name, &index_name)?;
80 let new_index_name = Binder::resolve_index_name(new_index_name)?;
81 let search_path = session.config().search_path();
82 let user_name = &session.user_name();
83
84 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
85
86 let index_id = {
87 let reader = session.env().catalog_reader().read_guard();
88 let (index, schema_name) =
89 reader.get_index_by_name(db_name, schema_path, &real_index_name)?;
90 session.check_privilege_for_drop_alter(schema_name, &**index)?;
91 index.id
92 };
93
94 let catalog_writer = session.catalog_writer()?;
95 catalog_writer
96 .alter_name(index_id.into(), &new_index_name)
97 .await?;
98
99 Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
100}
101
102pub async fn handle_rename_view(
103 handler_args: HandlerArgs,
104 view_name: ObjectName,
105 new_view_name: ObjectName,
106) -> Result<RwPgResponse> {
107 let session = handler_args.session;
108 let db_name = &session.database();
109 let (schema_name, real_view_name) = Binder::resolve_schema_qualified_name(db_name, &view_name)?;
110 let new_view_name = Binder::resolve_view_name(new_view_name)?;
111 let search_path = session.config().search_path();
112 let user_name = &session.user_name();
113
114 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
115
116 let view_id = {
117 let reader = session.env().catalog_reader().read_guard();
118 let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?;
119 session.check_privilege_for_drop_alter(schema_name, &**view)?;
120 view.id
121 };
122
123 let catalog_writer = session.catalog_writer()?;
124 catalog_writer
125 .alter_name(view_id.into(), &new_view_name)
126 .await?;
127
128 Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
129}
130
131pub async fn handle_rename_sink(
132 handler_args: HandlerArgs,
133 sink_name: ObjectName,
134 new_sink_name: ObjectName,
135) -> Result<RwPgResponse> {
136 let session = handler_args.session;
137 let db_name = &session.database();
138 let (schema_name, real_sink_name) = Binder::resolve_schema_qualified_name(db_name, &sink_name)?;
139 let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
140 let search_path = session.config().search_path();
141 let user_name = &session.user_name();
142
143 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
144
145 let sink_id = {
146 let reader = session.env().catalog_reader().read_guard();
147 let (sink, schema_name) =
148 reader.get_created_sink_by_name(db_name, schema_path, &real_sink_name)?;
149 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
150 sink.id
151 };
152
153 let catalog_writer = session.catalog_writer()?;
154 catalog_writer
155 .alter_name(sink_id.into(), &new_sink_name)
156 .await?;
157
158 Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
159}
160
161pub async fn handle_rename_subscription(
162 handler_args: HandlerArgs,
163 subscription_name: ObjectName,
164 new_subscription_name: ObjectName,
165) -> Result<RwPgResponse> {
166 let session = handler_args.session;
167 let db_name = &session.database();
168 let (schema_name, real_subscription_name) =
169 Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
170 let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
171 let search_path = session.config().search_path();
172 let user_name = &session.user_name();
173
174 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
175
176 let subscription_id = {
177 let reader = session.env().catalog_reader().read_guard();
178 let (subscription, schema_name) =
179 reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
180 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
181 subscription.id
182 };
183
184 let catalog_writer = session.catalog_writer()?;
185 catalog_writer
186 .alter_name(subscription_id.into(), &new_subscription_name)
187 .await?;
188
189 Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
190}
191
192pub async fn handle_rename_source(
193 handler_args: HandlerArgs,
194 source_name: ObjectName,
195 new_source_name: ObjectName,
196) -> Result<RwPgResponse> {
197 let session = handler_args.session;
198 let db_name = &session.database();
199 let (schema_name, real_source_name) =
200 Binder::resolve_schema_qualified_name(db_name, &source_name)?;
201 let new_source_name = Binder::resolve_source_name(new_source_name)?;
202 let search_path = session.config().search_path();
203 let user_name = &session.user_name();
204
205 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
206
207 let source_id = {
208 let reader = session.env().catalog_reader().read_guard();
209 let (source, schema_name) =
210 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
211
212 if source.associated_table_id.is_some() {
214 return Err(ErrorCode::InvalidInputSyntax(
215 "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
216 )
217 .into());
218 }
219
220 session.check_privilege_for_drop_alter(schema_name, &**source)?;
221 source.id
222 };
223
224 let catalog_writer = session.catalog_writer()?;
225 catalog_writer
226 .alter_name(source_id.into(), &new_source_name)
227 .await?;
228
229 Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
230}
231
232pub async fn handle_rename_schema(
233 handler_args: HandlerArgs,
234 schema_name: ObjectName,
235 new_schema_name: ObjectName,
236) -> Result<RwPgResponse> {
237 let session = handler_args.session;
238 let db_name = &session.database();
239 let schema_name = Binder::resolve_schema_name(schema_name)?;
240 let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
241
242 let schema_id = {
243 let user_reader = session.env().user_info_reader().read_guard();
244 let catalog_reader = session.env().catalog_reader().read_guard();
245 let schema = catalog_reader.get_schema_by_name(db_name, &schema_name)?;
246 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
247
248 if is_system_schema(&schema.name()) {
250 return Err(ErrorCode::ProtocolError(format!(
251 "permission denied to rename on \"{}\", System catalog modifications are currently disallowed.",
252 schema_name
253 )).into());
254 }
255
256 session.check_privilege_for_drop_alter_db_schema(schema)?;
258
259 if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
261 if !user.is_super && !user.has_privilege(db_id, AclMode::Create) {
262 return Err(ErrorCode::PermissionDenied(
263 "Do not have CREATE privilege on the current database".to_owned(),
264 )
265 .into());
266 }
267 } else {
268 return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
269 }
270
271 schema.id()
272 };
273
274 let catalog_writer = session.catalog_writer()?;
275 catalog_writer
276 .alter_name(schema_id.into(), &new_schema_name)
277 .await?;
278
279 Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
280}
281
282pub async fn handle_rename_database(
283 handler_args: HandlerArgs,
284 database_name: ObjectName,
285 new_database_name: ObjectName,
286) -> Result<RwPgResponse> {
287 let session = handler_args.session;
288 let database_name = Binder::resolve_database_name(database_name)?;
289 let new_database_name = Binder::resolve_database_name(new_database_name)?;
290
291 let database_id = {
292 let user_reader = session.env().user_info_reader().read_guard();
293 let catalog_reader = session.env().catalog_reader().read_guard();
294 let database = catalog_reader.get_database_by_name(&database_name)?;
295
296 session.check_privilege_for_drop_alter_db_schema(database)?;
298
299 if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
301 if !user.is_super && !user.can_create_db {
302 return Err(ErrorCode::PermissionDenied(
303 "Non-superuser owners must also have the CREATEDB privilege".to_owned(),
304 )
305 .into());
306 }
307 } else {
308 return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
309 }
310
311 if database_name == session.database() {
313 return Err(ErrorCode::PermissionDenied(
314 "Current database cannot be renamed".to_owned(),
315 )
316 .into());
317 }
318
319 database.id()
320 };
321
322 let catalog_writer = session.catalog_writer()?;
323 catalog_writer
324 .alter_name(database_id.into(), &new_database_name)
325 .await?;
326
327 Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
328}
329
330#[cfg(test)]
331mod tests {
332 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
333
334 use crate::catalog::root_catalog::SchemaPath;
335 use crate::test_utils::LocalFrontend;
336
337 #[tokio::test]
338 async fn test_alter_table_name_handler() {
339 let frontend = LocalFrontend::new(Default::default()).await;
340 let session = frontend.session_ref();
341 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
342
343 let sql = "create table t (i int, r real);";
344 frontend.run_sql(sql).await.unwrap();
345
346 let table_id = {
347 let catalog_reader = session.env().catalog_reader().read_guard();
348 catalog_reader
349 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
350 .unwrap()
351 .0
352 .id
353 };
354
355 let sql = "alter table t rename to t1;";
357 frontend.run_sql(sql).await.unwrap();
358
359 let catalog_reader = session.env().catalog_reader().read_guard();
360 let altered_table_name = catalog_reader
361 .get_any_table_by_id(table_id)
362 .unwrap()
363 .name()
364 .to_owned();
365 assert_eq!(altered_table_name, "t1");
366 }
367}