risingwave_frontend/handler/
alter_rename.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::is_system_schema;
18use risingwave_pb::ddl_service::alter_name_request;
19use risingwave_sqlparser::ast::ObjectName;
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::Binder;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::table_catalog::TableType;
25use crate::error::{ErrorCode, Result};
26
27pub async fn handle_rename_table(
28 handler_args: HandlerArgs,
29 table_type: TableType,
30 table_name: ObjectName,
31 new_table_name: ObjectName,
32) -> Result<RwPgResponse> {
33 let session = handler_args.session;
34 let db_name = &session.database();
35 let (schema_name, real_table_name) =
36 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
37 let new_table_name = Binder::resolve_table_name(new_table_name)?;
38 let search_path = session.config().search_path();
39 let user_name = &session.user_name();
40
41 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
42
43 let table_id = {
44 let reader = session.env().catalog_reader().read_guard();
45 let (table, schema_name) =
46 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
47 if table_type != table.table_type {
48 return Err(ErrorCode::InvalidInputSyntax(format!(
49 "\"{table_name}\" is not a {}",
50 table_type.to_prost().as_str_name()
51 ))
52 .into());
53 }
54
55 session.check_privilege_for_drop_alter(schema_name, &**table)?;
56 table.id
57 };
58
59 let catalog_writer = session.catalog_writer()?;
60 catalog_writer
61 .alter_name(table_id.into(), &new_table_name)
62 .await?;
63
64 let stmt_type = match table_type {
65 TableType::Table => StatementType::ALTER_TABLE,
66 TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW,
67 _ => unreachable!(),
68 };
69 Ok(PgResponse::empty_result(stmt_type))
70}
71
72pub async fn handle_rename_index(
73 handler_args: HandlerArgs,
74 index_name: ObjectName,
75 new_index_name: ObjectName,
76) -> Result<RwPgResponse> {
77 let session = handler_args.session;
78 let db_name = &session.database();
79 let (schema_name, real_index_name) =
80 Binder::resolve_schema_qualified_name(db_name, &index_name)?;
81 let new_index_name = Binder::resolve_index_name(new_index_name)?;
82 let search_path = session.config().search_path();
83 let user_name = &session.user_name();
84
85 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
86
87 let index_id = {
88 let reader = session.env().catalog_reader().read_guard();
89 let (index, schema_name) =
90 reader.get_index_by_name(db_name, schema_path, &real_index_name)?;
91 session.check_privilege_for_drop_alter(schema_name, &**index)?;
92 index.id
93 };
94
95 let catalog_writer = session.catalog_writer()?;
96 catalog_writer
97 .alter_name(
98 alter_name_request::Object::IndexId(index_id.index_id),
99 &new_index_name,
100 )
101 .await?;
102
103 Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
104}
105
106pub async fn handle_rename_view(
107 handler_args: HandlerArgs,
108 view_name: ObjectName,
109 new_view_name: ObjectName,
110) -> Result<RwPgResponse> {
111 let session = handler_args.session;
112 let db_name = &session.database();
113 let (schema_name, real_view_name) = Binder::resolve_schema_qualified_name(db_name, &view_name)?;
114 let new_view_name = Binder::resolve_view_name(new_view_name)?;
115 let search_path = session.config().search_path();
116 let user_name = &session.user_name();
117
118 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
119
120 let view_id = {
121 let reader = session.env().catalog_reader().read_guard();
122 let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?;
123 session.check_privilege_for_drop_alter(schema_name, &**view)?;
124 view.id
125 };
126
127 let catalog_writer = session.catalog_writer()?;
128 catalog_writer
129 .alter_name(alter_name_request::Object::ViewId(view_id), &new_view_name)
130 .await?;
131
132 Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
133}
134
135pub async fn handle_rename_sink(
136 handler_args: HandlerArgs,
137 sink_name: ObjectName,
138 new_sink_name: ObjectName,
139) -> Result<RwPgResponse> {
140 let session = handler_args.session;
141 let db_name = &session.database();
142 let (schema_name, real_sink_name) = Binder::resolve_schema_qualified_name(db_name, &sink_name)?;
143 let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
144 let search_path = session.config().search_path();
145 let user_name = &session.user_name();
146
147 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
148
149 let sink_id = {
150 let reader = session.env().catalog_reader().read_guard();
151 let (sink, schema_name) =
152 reader.get_created_sink_by_name(db_name, schema_path, &real_sink_name)?;
153 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
154 sink.id
155 };
156
157 let catalog_writer = session.catalog_writer()?;
158 catalog_writer
159 .alter_name(
160 alter_name_request::Object::SinkId(sink_id.sink_id),
161 &new_sink_name,
162 )
163 .await?;
164
165 Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
166}
167
168pub async fn handle_rename_subscription(
169 handler_args: HandlerArgs,
170 subscription_name: ObjectName,
171 new_subscription_name: ObjectName,
172) -> Result<RwPgResponse> {
173 let session = handler_args.session;
174 let db_name = &session.database();
175 let (schema_name, real_subscription_name) =
176 Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
177 let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
178 let search_path = session.config().search_path();
179 let user_name = &session.user_name();
180
181 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
182
183 let subscription_id = {
184 let reader = session.env().catalog_reader().read_guard();
185 let (subscription, schema_name) =
186 reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
187 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
188 subscription.id
189 };
190
191 let catalog_writer = session.catalog_writer()?;
192 catalog_writer
193 .alter_name(
194 alter_name_request::Object::SubscriptionId(subscription_id.subscription_id),
195 &new_subscription_name,
196 )
197 .await?;
198
199 Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
200}
201
202pub async fn handle_rename_source(
203 handler_args: HandlerArgs,
204 source_name: ObjectName,
205 new_source_name: ObjectName,
206) -> Result<RwPgResponse> {
207 let session = handler_args.session;
208 let db_name = &session.database();
209 let (schema_name, real_source_name) =
210 Binder::resolve_schema_qualified_name(db_name, &source_name)?;
211 let new_source_name = Binder::resolve_source_name(new_source_name)?;
212 let search_path = session.config().search_path();
213 let user_name = &session.user_name();
214
215 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
216
217 let source_id = {
218 let reader = session.env().catalog_reader().read_guard();
219 let (source, schema_name) =
220 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
221
222 if source.associated_table_id.is_some() {
224 return Err(ErrorCode::InvalidInputSyntax(
225 "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
226 )
227 .into());
228 }
229
230 session.check_privilege_for_drop_alter(schema_name, &**source)?;
231 source.id
232 };
233
234 let catalog_writer = session.catalog_writer()?;
235 catalog_writer
236 .alter_name(
237 alter_name_request::Object::SourceId(source_id),
238 &new_source_name,
239 )
240 .await?;
241
242 Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
243}
244
245pub async fn handle_rename_schema(
246 handler_args: HandlerArgs,
247 schema_name: ObjectName,
248 new_schema_name: ObjectName,
249) -> Result<RwPgResponse> {
250 let session = handler_args.session;
251 let db_name = &session.database();
252 let schema_name = Binder::resolve_schema_name(schema_name)?;
253 let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
254
255 let schema_id = {
256 let user_reader = session.env().user_info_reader().read_guard();
257 let catalog_reader = session.env().catalog_reader().read_guard();
258 let schema = catalog_reader.get_schema_by_name(db_name, &schema_name)?;
259 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
260
261 if is_system_schema(&schema.name()) {
263 return Err(ErrorCode::ProtocolError(format!(
264 "permission denied to rename on \"{}\", System catalog modifications are currently disallowed.",
265 schema_name
266 )).into());
267 }
268
269 session.check_privilege_for_drop_alter_db_schema(schema)?;
271
272 if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
274 if !user.is_super && !user.has_privilege(db_id, AclMode::Create) {
275 return Err(ErrorCode::PermissionDenied(
276 "Do not have CREATE privilege on the current database".to_owned(),
277 )
278 .into());
279 }
280 } else {
281 return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
282 }
283
284 schema.id()
285 };
286
287 let catalog_writer = session.catalog_writer()?;
288 catalog_writer
289 .alter_name(schema_id.into(), &new_schema_name)
290 .await?;
291
292 Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
293}
294
295pub async fn handle_rename_database(
296 handler_args: HandlerArgs,
297 database_name: ObjectName,
298 new_database_name: ObjectName,
299) -> Result<RwPgResponse> {
300 let session = handler_args.session;
301 let database_name = Binder::resolve_database_name(database_name)?;
302 let new_database_name = Binder::resolve_database_name(new_database_name)?;
303
304 let database_id = {
305 let user_reader = session.env().user_info_reader().read_guard();
306 let catalog_reader = session.env().catalog_reader().read_guard();
307 let database = catalog_reader.get_database_by_name(&database_name)?;
308
309 session.check_privilege_for_drop_alter_db_schema(database)?;
311
312 if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
314 if !user.is_super && !user.can_create_db {
315 return Err(ErrorCode::PermissionDenied(
316 "Non-superuser owners must also have the CREATEDB privilege".to_owned(),
317 )
318 .into());
319 }
320 } else {
321 return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
322 }
323
324 if database_name == session.database() {
326 return Err(ErrorCode::PermissionDenied(
327 "Current database cannot be renamed".to_owned(),
328 )
329 .into());
330 }
331
332 database.id()
333 };
334
335 let catalog_writer = session.catalog_writer()?;
336 catalog_writer
337 .alter_name(database_id.into(), &new_database_name)
338 .await?;
339
340 Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
341}
342
343#[cfg(test)]
344mod tests {
345 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
346
347 use crate::catalog::root_catalog::SchemaPath;
348 use crate::test_utils::LocalFrontend;
349
350 #[tokio::test]
351 async fn test_alter_table_name_handler() {
352 let frontend = LocalFrontend::new(Default::default()).await;
353 let session = frontend.session_ref();
354 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
355
356 let sql = "create table t (i int, r real);";
357 frontend.run_sql(sql).await.unwrap();
358
359 let table_id = {
360 let catalog_reader = session.env().catalog_reader().read_guard();
361 catalog_reader
362 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
363 .unwrap()
364 .0
365 .id
366 };
367
368 let sql = "alter table t rename to t1;";
370 frontend.run_sql(sql).await.unwrap();
371
372 let catalog_reader = session.env().catalog_reader().read_guard();
373 let altered_table_name = catalog_reader
374 .get_any_table_by_id(&table_id)
375 .unwrap()
376 .name()
377 .to_owned();
378 assert_eq!(altered_table_name, "t1");
379 }
380}