risingwave_frontend/handler/
alter_rename.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::is_system_schema;
18use risingwave_pb::ddl_service::alter_name_request;
19use risingwave_pb::user::grant_privilege;
20use risingwave_sqlparser::ast::ObjectName;
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::Binder;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::table_catalog::TableType;
26use crate::error::{ErrorCode, Result};
27
28pub async fn handle_rename_table(
29 handler_args: HandlerArgs,
30 table_type: TableType,
31 table_name: ObjectName,
32 new_table_name: ObjectName,
33) -> Result<RwPgResponse> {
34 let session = handler_args.session;
35 let db_name = &session.database();
36 let (schema_name, real_table_name) =
37 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
38 let new_table_name = Binder::resolve_table_name(new_table_name)?;
39 let search_path = session.config().search_path();
40 let user_name = &session.user_name();
41
42 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
43
44 let table_id = {
45 let reader = session.env().catalog_reader().read_guard();
46 let (table, schema_name) =
47 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
48 if table_type != table.table_type {
49 return Err(ErrorCode::InvalidInputSyntax(format!(
50 "\"{table_name}\" is not a {}",
51 table_type.to_prost().as_str_name()
52 ))
53 .into());
54 }
55
56 session.check_privilege_for_drop_alter(schema_name, &**table)?;
57 table.id
58 };
59
60 let catalog_writer = session.catalog_writer()?;
61 catalog_writer
62 .alter_name(
63 alter_name_request::Object::TableId(table_id.table_id),
64 &new_table_name,
65 )
66 .await?;
67
68 let stmt_type = match table_type {
69 TableType::Table => StatementType::ALTER_TABLE,
70 TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW,
71 _ => unreachable!(),
72 };
73 Ok(PgResponse::empty_result(stmt_type))
74}
75
76pub async fn handle_rename_index(
77 handler_args: HandlerArgs,
78 index_name: ObjectName,
79 new_index_name: ObjectName,
80) -> Result<RwPgResponse> {
81 let session = handler_args.session;
82 let db_name = &session.database();
83 let (schema_name, real_index_name) =
84 Binder::resolve_schema_qualified_name(db_name, index_name.clone())?;
85 let new_index_name = Binder::resolve_index_name(new_index_name)?;
86 let search_path = session.config().search_path();
87 let user_name = &session.user_name();
88
89 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
90
91 let index_id = {
92 let reader = session.env().catalog_reader().read_guard();
93 let (index, schema_name) =
94 reader.get_index_by_name(db_name, schema_path, &real_index_name)?;
95 session.check_privilege_for_drop_alter(schema_name, &**index)?;
96 index.id
97 };
98
99 let catalog_writer = session.catalog_writer()?;
100 catalog_writer
101 .alter_name(
102 alter_name_request::Object::IndexId(index_id.index_id),
103 &new_index_name,
104 )
105 .await?;
106
107 Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
108}
109
110pub async fn handle_rename_view(
111 handler_args: HandlerArgs,
112 view_name: ObjectName,
113 new_view_name: ObjectName,
114) -> Result<RwPgResponse> {
115 let session = handler_args.session;
116 let db_name = &session.database();
117 let (schema_name, real_view_name) =
118 Binder::resolve_schema_qualified_name(db_name, view_name.clone())?;
119 let new_view_name = Binder::resolve_view_name(new_view_name)?;
120 let search_path = session.config().search_path();
121 let user_name = &session.user_name();
122
123 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
124
125 let view_id = {
126 let reader = session.env().catalog_reader().read_guard();
127 let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?;
128 session.check_privilege_for_drop_alter(schema_name, &**view)?;
129 view.id
130 };
131
132 let catalog_writer = session.catalog_writer()?;
133 catalog_writer
134 .alter_name(alter_name_request::Object::ViewId(view_id), &new_view_name)
135 .await?;
136
137 Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
138}
139
140pub async fn handle_rename_sink(
141 handler_args: HandlerArgs,
142 sink_name: ObjectName,
143 new_sink_name: ObjectName,
144) -> Result<RwPgResponse> {
145 let session = handler_args.session;
146 let db_name = &session.database();
147 let (schema_name, real_sink_name) =
148 Binder::resolve_schema_qualified_name(db_name, sink_name.clone())?;
149 let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
150 let search_path = session.config().search_path();
151 let user_name = &session.user_name();
152
153 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
154
155 let sink_id = {
156 let reader = session.env().catalog_reader().read_guard();
157 let (sink, schema_name) = reader.get_sink_by_name(db_name, schema_path, &real_sink_name)?;
158 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
159 sink.id
160 };
161
162 let catalog_writer = session.catalog_writer()?;
163 catalog_writer
164 .alter_name(
165 alter_name_request::Object::SinkId(sink_id.sink_id),
166 &new_sink_name,
167 )
168 .await?;
169
170 Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
171}
172
173pub async fn handle_rename_subscription(
174 handler_args: HandlerArgs,
175 subscription_name: ObjectName,
176 new_subscription_name: ObjectName,
177) -> Result<RwPgResponse> {
178 let session = handler_args.session;
179 let db_name = &session.database();
180 let (schema_name, real_subscription_name) =
181 Binder::resolve_schema_qualified_name(db_name, subscription_name.clone())?;
182 let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
183 let search_path = session.config().search_path();
184 let user_name = &session.user_name();
185
186 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
187
188 let subscription_id = {
189 let reader = session.env().catalog_reader().read_guard();
190 let (subscription, schema_name) =
191 reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
192 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
193 subscription.id
194 };
195
196 let catalog_writer = session.catalog_writer()?;
197 catalog_writer
198 .alter_name(
199 alter_name_request::Object::SubscriptionId(subscription_id.subscription_id),
200 &new_subscription_name,
201 )
202 .await?;
203
204 Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
205}
206
207pub async fn handle_rename_source(
208 handler_args: HandlerArgs,
209 source_name: ObjectName,
210 new_source_name: ObjectName,
211) -> Result<RwPgResponse> {
212 let session = handler_args.session;
213 let db_name = &session.database();
214 let (schema_name, real_source_name) =
215 Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
216 let new_source_name = Binder::resolve_source_name(new_source_name)?;
217 let search_path = session.config().search_path();
218 let user_name = &session.user_name();
219
220 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
221
222 let source_id = {
223 let reader = session.env().catalog_reader().read_guard();
224 let (source, schema_name) =
225 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
226
227 if source.associated_table_id.is_some() {
229 return Err(ErrorCode::InvalidInputSyntax(
230 "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
231 )
232 .into());
233 }
234
235 session.check_privilege_for_drop_alter(schema_name, &**source)?;
236 source.id
237 };
238
239 let catalog_writer = session.catalog_writer()?;
240 catalog_writer
241 .alter_name(
242 alter_name_request::Object::SourceId(source_id),
243 &new_source_name,
244 )
245 .await?;
246
247 Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
248}
249
250pub async fn handle_rename_schema(
251 handler_args: HandlerArgs,
252 schema_name: ObjectName,
253 new_schema_name: ObjectName,
254) -> Result<RwPgResponse> {
255 let session = handler_args.session;
256 let db_name = &session.database();
257 let schema_name = Binder::resolve_schema_name(schema_name)?;
258 let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
259
260 let schema_id = {
261 let user_reader = session.env().user_info_reader().read_guard();
262 let catalog_reader = session.env().catalog_reader().read_guard();
263 let schema = catalog_reader.get_schema_by_name(db_name, &schema_name)?;
264 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
265
266 if is_system_schema(&schema.name()) {
268 return Err(ErrorCode::ProtocolError(format!(
269 "permission denied to rename on \"{}\", System catalog modifications are currently disallowed.",
270 schema_name
271 )).into());
272 }
273
274 session.check_privilege_for_drop_alter_db_schema(schema)?;
276
277 if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
279 if !user.is_super
280 && !user.has_privilege(&grant_privilege::Object::DatabaseId(db_id), AclMode::Create)
281 {
282 return Err(ErrorCode::PermissionDenied(
283 "Do not have create privilege on the current database".to_owned(),
284 )
285 .into());
286 }
287 } else {
288 return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
289 }
290
291 schema.id()
292 };
293
294 let catalog_writer = session.catalog_writer()?;
295 catalog_writer
296 .alter_name(
297 alter_name_request::Object::SchemaId(schema_id),
298 &new_schema_name,
299 )
300 .await?;
301
302 Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
303}
304
305pub async fn handle_rename_database(
306 handler_args: HandlerArgs,
307 database_name: ObjectName,
308 new_database_name: ObjectName,
309) -> Result<RwPgResponse> {
310 let session = handler_args.session;
311 let database_name = Binder::resolve_database_name(database_name)?;
312 let new_database_name = Binder::resolve_database_name(new_database_name)?;
313
314 let database_id = {
315 let user_reader = session.env().user_info_reader().read_guard();
316 let catalog_reader = session.env().catalog_reader().read_guard();
317 let database = catalog_reader.get_database_by_name(&database_name)?;
318
319 session.check_privilege_for_drop_alter_db_schema(database)?;
321
322 if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
324 if !user.is_super && !user.can_create_db {
325 return Err(ErrorCode::PermissionDenied(
326 "Non-superuser owners must also have the CREATEDB privilege".to_owned(),
327 )
328 .into());
329 }
330 } else {
331 return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
332 }
333
334 if database_name == session.database() {
336 return Err(ErrorCode::PermissionDenied(
337 "Current database cannot be renamed".to_owned(),
338 )
339 .into());
340 }
341
342 database.id()
343 };
344
345 let catalog_writer = session.catalog_writer()?;
346 catalog_writer
347 .alter_name(
348 alter_name_request::Object::DatabaseId(database_id),
349 &new_database_name,
350 )
351 .await?;
352
353 Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
354}
355
356#[cfg(test)]
357mod tests {
358 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
359
360 use crate::catalog::root_catalog::SchemaPath;
361 use crate::test_utils::LocalFrontend;
362
363 #[tokio::test]
364 async fn test_alter_table_name_handler() {
365 let frontend = LocalFrontend::new(Default::default()).await;
366 let session = frontend.session_ref();
367 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
368
369 let sql = "create table t (i int, r real);";
370 frontend.run_sql(sql).await.unwrap();
371
372 let table_id = {
373 let catalog_reader = session.env().catalog_reader().read_guard();
374 catalog_reader
375 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
376 .unwrap()
377 .0
378 .id
379 };
380
381 let sql = "alter table t rename to t1;";
383 frontend.run_sql(sql).await.unwrap();
384
385 let catalog_reader = session.env().catalog_reader().read_guard();
386 let altered_table_name = catalog_reader
387 .get_any_table_by_id(&table_id)
388 .unwrap()
389 .name()
390 .to_owned();
391 assert_eq!(altered_table_name, "t1");
392 }
393}