risingwave_frontend/handler/
alter_rename.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::is_system_schema;
18use risingwave_pb::ddl_service::alter_name_request;
19use risingwave_pb::user::grant_privilege;
20use risingwave_sqlparser::ast::ObjectName;
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::Binder;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::table_catalog::TableType;
26use crate::error::{ErrorCode, Result};
27
28pub async fn handle_rename_table(
29 handler_args: HandlerArgs,
30 table_type: TableType,
31 table_name: ObjectName,
32 new_table_name: ObjectName,
33) -> Result<RwPgResponse> {
34 let session = handler_args.session;
35 let db_name = &session.database();
36 let (schema_name, real_table_name) =
37 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
38 let new_table_name = Binder::resolve_table_name(new_table_name)?;
39 let search_path = session.config().search_path();
40 let user_name = &session.user_name();
41
42 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
43
44 let table_id = {
45 let reader = session.env().catalog_reader().read_guard();
46 let (table, schema_name) =
47 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
48 if table_type != table.table_type {
49 return Err(ErrorCode::InvalidInputSyntax(format!(
50 "\"{table_name}\" is not a {}",
51 table_type.to_prost().as_str_name()
52 ))
53 .into());
54 }
55
56 session.check_privilege_for_drop_alter(schema_name, &**table)?;
57 table.id
58 };
59
60 let catalog_writer = session.catalog_writer()?;
61 catalog_writer
62 .alter_name(
63 alter_name_request::Object::TableId(table_id.table_id),
64 &new_table_name,
65 )
66 .await?;
67
68 let stmt_type = match table_type {
69 TableType::Table => StatementType::ALTER_TABLE,
70 TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW,
71 _ => unreachable!(),
72 };
73 Ok(PgResponse::empty_result(stmt_type))
74}
75
76pub async fn handle_rename_index(
77 handler_args: HandlerArgs,
78 index_name: ObjectName,
79 new_index_name: ObjectName,
80) -> Result<RwPgResponse> {
81 let session = handler_args.session;
82 let db_name = &session.database();
83 let (schema_name, real_index_name) =
84 Binder::resolve_schema_qualified_name(db_name, &index_name)?;
85 let new_index_name = Binder::resolve_index_name(new_index_name)?;
86 let search_path = session.config().search_path();
87 let user_name = &session.user_name();
88
89 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
90
91 let index_id = {
92 let reader = session.env().catalog_reader().read_guard();
93 let (index, schema_name) =
94 reader.get_index_by_name(db_name, schema_path, &real_index_name)?;
95 session.check_privilege_for_drop_alter(schema_name, &**index)?;
96 index.id
97 };
98
99 let catalog_writer = session.catalog_writer()?;
100 catalog_writer
101 .alter_name(
102 alter_name_request::Object::IndexId(index_id.index_id),
103 &new_index_name,
104 )
105 .await?;
106
107 Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
108}
109
110pub async fn handle_rename_view(
111 handler_args: HandlerArgs,
112 view_name: ObjectName,
113 new_view_name: ObjectName,
114) -> Result<RwPgResponse> {
115 let session = handler_args.session;
116 let db_name = &session.database();
117 let (schema_name, real_view_name) = Binder::resolve_schema_qualified_name(db_name, &view_name)?;
118 let new_view_name = Binder::resolve_view_name(new_view_name)?;
119 let search_path = session.config().search_path();
120 let user_name = &session.user_name();
121
122 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
123
124 let view_id = {
125 let reader = session.env().catalog_reader().read_guard();
126 let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?;
127 session.check_privilege_for_drop_alter(schema_name, &**view)?;
128 view.id
129 };
130
131 let catalog_writer = session.catalog_writer()?;
132 catalog_writer
133 .alter_name(alter_name_request::Object::ViewId(view_id), &new_view_name)
134 .await?;
135
136 Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
137}
138
139pub async fn handle_rename_sink(
140 handler_args: HandlerArgs,
141 sink_name: ObjectName,
142 new_sink_name: ObjectName,
143) -> Result<RwPgResponse> {
144 let session = handler_args.session;
145 let db_name = &session.database();
146 let (schema_name, real_sink_name) = Binder::resolve_schema_qualified_name(db_name, &sink_name)?;
147 let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
148 let search_path = session.config().search_path();
149 let user_name = &session.user_name();
150
151 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
152
153 let sink_id = {
154 let reader = session.env().catalog_reader().read_guard();
155 let (sink, schema_name) =
156 reader.get_created_sink_by_name(db_name, schema_path, &real_sink_name)?;
157 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
158 sink.id
159 };
160
161 let catalog_writer = session.catalog_writer()?;
162 catalog_writer
163 .alter_name(
164 alter_name_request::Object::SinkId(sink_id.sink_id),
165 &new_sink_name,
166 )
167 .await?;
168
169 Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
170}
171
172pub async fn handle_rename_subscription(
173 handler_args: HandlerArgs,
174 subscription_name: ObjectName,
175 new_subscription_name: ObjectName,
176) -> Result<RwPgResponse> {
177 let session = handler_args.session;
178 let db_name = &session.database();
179 let (schema_name, real_subscription_name) =
180 Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
181 let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
182 let search_path = session.config().search_path();
183 let user_name = &session.user_name();
184
185 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
186
187 let subscription_id = {
188 let reader = session.env().catalog_reader().read_guard();
189 let (subscription, schema_name) =
190 reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
191 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
192 subscription.id
193 };
194
195 let catalog_writer = session.catalog_writer()?;
196 catalog_writer
197 .alter_name(
198 alter_name_request::Object::SubscriptionId(subscription_id.subscription_id),
199 &new_subscription_name,
200 )
201 .await?;
202
203 Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
204}
205
206pub async fn handle_rename_source(
207 handler_args: HandlerArgs,
208 source_name: ObjectName,
209 new_source_name: ObjectName,
210) -> Result<RwPgResponse> {
211 let session = handler_args.session;
212 let db_name = &session.database();
213 let (schema_name, real_source_name) =
214 Binder::resolve_schema_qualified_name(db_name, &source_name)?;
215 let new_source_name = Binder::resolve_source_name(new_source_name)?;
216 let search_path = session.config().search_path();
217 let user_name = &session.user_name();
218
219 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
220
221 let source_id = {
222 let reader = session.env().catalog_reader().read_guard();
223 let (source, schema_name) =
224 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
225
226 if source.associated_table_id.is_some() {
228 return Err(ErrorCode::InvalidInputSyntax(
229 "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
230 )
231 .into());
232 }
233
234 session.check_privilege_for_drop_alter(schema_name, &**source)?;
235 source.id
236 };
237
238 let catalog_writer = session.catalog_writer()?;
239 catalog_writer
240 .alter_name(
241 alter_name_request::Object::SourceId(source_id),
242 &new_source_name,
243 )
244 .await?;
245
246 Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
247}
248
249pub async fn handle_rename_schema(
250 handler_args: HandlerArgs,
251 schema_name: ObjectName,
252 new_schema_name: ObjectName,
253) -> Result<RwPgResponse> {
254 let session = handler_args.session;
255 let db_name = &session.database();
256 let schema_name = Binder::resolve_schema_name(schema_name)?;
257 let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
258
259 let schema_id = {
260 let user_reader = session.env().user_info_reader().read_guard();
261 let catalog_reader = session.env().catalog_reader().read_guard();
262 let schema = catalog_reader.get_schema_by_name(db_name, &schema_name)?;
263 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
264
265 if is_system_schema(&schema.name()) {
267 return Err(ErrorCode::ProtocolError(format!(
268 "permission denied to rename on \"{}\", System catalog modifications are currently disallowed.",
269 schema_name
270 )).into());
271 }
272
273 session.check_privilege_for_drop_alter_db_schema(schema)?;
275
276 if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
278 if !user.is_super
279 && !user.has_privilege(&grant_privilege::Object::DatabaseId(db_id), AclMode::Create)
280 {
281 return Err(ErrorCode::PermissionDenied(
282 "Do not have CREATE privilege on the current database".to_owned(),
283 )
284 .into());
285 }
286 } else {
287 return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
288 }
289
290 schema.id()
291 };
292
293 let catalog_writer = session.catalog_writer()?;
294 catalog_writer
295 .alter_name(
296 alter_name_request::Object::SchemaId(schema_id),
297 &new_schema_name,
298 )
299 .await?;
300
301 Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
302}
303
304pub async fn handle_rename_database(
305 handler_args: HandlerArgs,
306 database_name: ObjectName,
307 new_database_name: ObjectName,
308) -> Result<RwPgResponse> {
309 let session = handler_args.session;
310 let database_name = Binder::resolve_database_name(database_name)?;
311 let new_database_name = Binder::resolve_database_name(new_database_name)?;
312
313 let database_id = {
314 let user_reader = session.env().user_info_reader().read_guard();
315 let catalog_reader = session.env().catalog_reader().read_guard();
316 let database = catalog_reader.get_database_by_name(&database_name)?;
317
318 session.check_privilege_for_drop_alter_db_schema(database)?;
320
321 if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
323 if !user.is_super && !user.can_create_db {
324 return Err(ErrorCode::PermissionDenied(
325 "Non-superuser owners must also have the CREATEDB privilege".to_owned(),
326 )
327 .into());
328 }
329 } else {
330 return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
331 }
332
333 if database_name == session.database() {
335 return Err(ErrorCode::PermissionDenied(
336 "Current database cannot be renamed".to_owned(),
337 )
338 .into());
339 }
340
341 database.id()
342 };
343
344 let catalog_writer = session.catalog_writer()?;
345 catalog_writer
346 .alter_name(
347 alter_name_request::Object::DatabaseId(database_id),
348 &new_database_name,
349 )
350 .await?;
351
352 Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
353}
354
355#[cfg(test)]
356mod tests {
357 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
358
359 use crate::catalog::root_catalog::SchemaPath;
360 use crate::test_utils::LocalFrontend;
361
362 #[tokio::test]
363 async fn test_alter_table_name_handler() {
364 let frontend = LocalFrontend::new(Default::default()).await;
365 let session = frontend.session_ref();
366 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
367
368 let sql = "create table t (i int, r real);";
369 frontend.run_sql(sql).await.unwrap();
370
371 let table_id = {
372 let catalog_reader = session.env().catalog_reader().read_guard();
373 catalog_reader
374 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
375 .unwrap()
376 .0
377 .id
378 };
379
380 let sql = "alter table t rename to t1;";
382 frontend.run_sql(sql).await.unwrap();
383
384 let catalog_reader = session.env().catalog_reader().read_guard();
385 let altered_table_name = catalog_reader
386 .get_any_table_by_id(&table_id)
387 .unwrap()
388 .name()
389 .to_owned();
390 assert_eq!(altered_table_name, "t1");
391 }
392}