risingwave_frontend/handler/
alter_rename.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::is_system_schema;
18use risingwave_pb::ddl_service::alter_name_request;
19use risingwave_pb::user::grant_privilege;
20use risingwave_sqlparser::ast::ObjectName;
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::Binder;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::table_catalog::TableType;
26use crate::error::{ErrorCode, Result};
27
28pub async fn handle_rename_table(
29    handler_args: HandlerArgs,
30    table_type: TableType,
31    table_name: ObjectName,
32    new_table_name: ObjectName,
33) -> Result<RwPgResponse> {
34    let session = handler_args.session;
35    let db_name = &session.database();
36    let (schema_name, real_table_name) =
37        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
38    let new_table_name = Binder::resolve_table_name(new_table_name)?;
39    let search_path = session.config().search_path();
40    let user_name = &session.user_name();
41
42    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
43
44    let table_id = {
45        let reader = session.env().catalog_reader().read_guard();
46        let (table, schema_name) =
47            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
48        if table_type != table.table_type {
49            return Err(ErrorCode::InvalidInputSyntax(format!(
50                "\"{table_name}\" is not a {}",
51                table_type.to_prost().as_str_name()
52            ))
53            .into());
54        }
55
56        session.check_privilege_for_drop_alter(schema_name, &**table)?;
57        table.id
58    };
59
60    let catalog_writer = session.catalog_writer()?;
61    catalog_writer
62        .alter_name(
63            alter_name_request::Object::TableId(table_id.table_id),
64            &new_table_name,
65        )
66        .await?;
67
68    let stmt_type = match table_type {
69        TableType::Table => StatementType::ALTER_TABLE,
70        TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW,
71        _ => unreachable!(),
72    };
73    Ok(PgResponse::empty_result(stmt_type))
74}
75
76pub async fn handle_rename_index(
77    handler_args: HandlerArgs,
78    index_name: ObjectName,
79    new_index_name: ObjectName,
80) -> Result<RwPgResponse> {
81    let session = handler_args.session;
82    let db_name = &session.database();
83    let (schema_name, real_index_name) =
84        Binder::resolve_schema_qualified_name(db_name, &index_name)?;
85    let new_index_name = Binder::resolve_index_name(new_index_name)?;
86    let search_path = session.config().search_path();
87    let user_name = &session.user_name();
88
89    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
90
91    let index_id = {
92        let reader = session.env().catalog_reader().read_guard();
93        let (index, schema_name) =
94            reader.get_index_by_name(db_name, schema_path, &real_index_name)?;
95        session.check_privilege_for_drop_alter(schema_name, &**index)?;
96        index.id
97    };
98
99    let catalog_writer = session.catalog_writer()?;
100    catalog_writer
101        .alter_name(
102            alter_name_request::Object::IndexId(index_id.index_id),
103            &new_index_name,
104        )
105        .await?;
106
107    Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
108}
109
110pub async fn handle_rename_view(
111    handler_args: HandlerArgs,
112    view_name: ObjectName,
113    new_view_name: ObjectName,
114) -> Result<RwPgResponse> {
115    let session = handler_args.session;
116    let db_name = &session.database();
117    let (schema_name, real_view_name) = Binder::resolve_schema_qualified_name(db_name, &view_name)?;
118    let new_view_name = Binder::resolve_view_name(new_view_name)?;
119    let search_path = session.config().search_path();
120    let user_name = &session.user_name();
121
122    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
123
124    let view_id = {
125        let reader = session.env().catalog_reader().read_guard();
126        let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?;
127        session.check_privilege_for_drop_alter(schema_name, &**view)?;
128        view.id
129    };
130
131    let catalog_writer = session.catalog_writer()?;
132    catalog_writer
133        .alter_name(alter_name_request::Object::ViewId(view_id), &new_view_name)
134        .await?;
135
136    Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
137}
138
139pub async fn handle_rename_sink(
140    handler_args: HandlerArgs,
141    sink_name: ObjectName,
142    new_sink_name: ObjectName,
143) -> Result<RwPgResponse> {
144    let session = handler_args.session;
145    let db_name = &session.database();
146    let (schema_name, real_sink_name) = Binder::resolve_schema_qualified_name(db_name, &sink_name)?;
147    let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
148    let search_path = session.config().search_path();
149    let user_name = &session.user_name();
150
151    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
152
153    let sink_id = {
154        let reader = session.env().catalog_reader().read_guard();
155        let (sink, schema_name) =
156            reader.get_created_sink_by_name(db_name, schema_path, &real_sink_name)?;
157        session.check_privilege_for_drop_alter(schema_name, &**sink)?;
158        sink.id
159    };
160
161    let catalog_writer = session.catalog_writer()?;
162    catalog_writer
163        .alter_name(
164            alter_name_request::Object::SinkId(sink_id.sink_id),
165            &new_sink_name,
166        )
167        .await?;
168
169    Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
170}
171
172pub async fn handle_rename_subscription(
173    handler_args: HandlerArgs,
174    subscription_name: ObjectName,
175    new_subscription_name: ObjectName,
176) -> Result<RwPgResponse> {
177    let session = handler_args.session;
178    let db_name = &session.database();
179    let (schema_name, real_subscription_name) =
180        Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
181    let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
182    let search_path = session.config().search_path();
183    let user_name = &session.user_name();
184
185    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
186
187    let subscription_id = {
188        let reader = session.env().catalog_reader().read_guard();
189        let (subscription, schema_name) =
190            reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
191        session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
192        subscription.id
193    };
194
195    let catalog_writer = session.catalog_writer()?;
196    catalog_writer
197        .alter_name(
198            alter_name_request::Object::SubscriptionId(subscription_id.subscription_id),
199            &new_subscription_name,
200        )
201        .await?;
202
203    Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
204}
205
206pub async fn handle_rename_source(
207    handler_args: HandlerArgs,
208    source_name: ObjectName,
209    new_source_name: ObjectName,
210) -> Result<RwPgResponse> {
211    let session = handler_args.session;
212    let db_name = &session.database();
213    let (schema_name, real_source_name) =
214        Binder::resolve_schema_qualified_name(db_name, &source_name)?;
215    let new_source_name = Binder::resolve_source_name(new_source_name)?;
216    let search_path = session.config().search_path();
217    let user_name = &session.user_name();
218
219    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
220
221    let source_id = {
222        let reader = session.env().catalog_reader().read_guard();
223        let (source, schema_name) =
224            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
225
226        // For `CREATE TABLE WITH (connector = '...')`, users should call `ALTER TABLE` instead.
227        if source.associated_table_id.is_some() {
228            return Err(ErrorCode::InvalidInputSyntax(
229                "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
230            )
231            .into());
232        }
233
234        session.check_privilege_for_drop_alter(schema_name, &**source)?;
235        source.id
236    };
237
238    let catalog_writer = session.catalog_writer()?;
239    catalog_writer
240        .alter_name(
241            alter_name_request::Object::SourceId(source_id),
242            &new_source_name,
243        )
244        .await?;
245
246    Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
247}
248
249pub async fn handle_rename_schema(
250    handler_args: HandlerArgs,
251    schema_name: ObjectName,
252    new_schema_name: ObjectName,
253) -> Result<RwPgResponse> {
254    let session = handler_args.session;
255    let db_name = &session.database();
256    let schema_name = Binder::resolve_schema_name(schema_name)?;
257    let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
258
259    let schema_id = {
260        let user_reader = session.env().user_info_reader().read_guard();
261        let catalog_reader = session.env().catalog_reader().read_guard();
262        let schema = catalog_reader.get_schema_by_name(db_name, &schema_name)?;
263        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
264
265        // The current one should not be system schema.
266        if is_system_schema(&schema.name()) {
267            return Err(ErrorCode::ProtocolError(format!(
268                "permission denied to rename on \"{}\", System catalog modifications are currently disallowed.",
269                schema_name
270            )).into());
271        }
272
273        // The user should be super user or owner to alter the schema.
274        session.check_privilege_for_drop_alter_db_schema(schema)?;
275
276        // To rename a schema you must also have the CREATE privilege for the database.
277        if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
278            if !user.is_super
279                && !user.has_privilege(&grant_privilege::Object::DatabaseId(db_id), AclMode::Create)
280            {
281                return Err(ErrorCode::PermissionDenied(
282                    "Do not have CREATE privilege on the current database".to_owned(),
283                )
284                .into());
285            }
286        } else {
287            return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
288        }
289
290        schema.id()
291    };
292
293    let catalog_writer = session.catalog_writer()?;
294    catalog_writer
295        .alter_name(
296            alter_name_request::Object::SchemaId(schema_id),
297            &new_schema_name,
298        )
299        .await?;
300
301    Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
302}
303
304pub async fn handle_rename_database(
305    handler_args: HandlerArgs,
306    database_name: ObjectName,
307    new_database_name: ObjectName,
308) -> Result<RwPgResponse> {
309    let session = handler_args.session;
310    let database_name = Binder::resolve_database_name(database_name)?;
311    let new_database_name = Binder::resolve_database_name(new_database_name)?;
312
313    let database_id = {
314        let user_reader = session.env().user_info_reader().read_guard();
315        let catalog_reader = session.env().catalog_reader().read_guard();
316        let database = catalog_reader.get_database_by_name(&database_name)?;
317
318        // The user should be super user or owner to alter the database.
319        session.check_privilege_for_drop_alter_db_schema(database)?;
320
321        // Non-superuser owners must also have the CREATEDB privilege.
322        if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
323            if !user.is_super && !user.can_create_db {
324                return Err(ErrorCode::PermissionDenied(
325                    "Non-superuser owners must also have the CREATEDB privilege".to_owned(),
326                )
327                .into());
328            }
329        } else {
330            return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
331        }
332
333        // The current database cannot be renamed.
334        if database_name == session.database() {
335            return Err(ErrorCode::PermissionDenied(
336                "Current database cannot be renamed".to_owned(),
337            )
338            .into());
339        }
340
341        database.id()
342    };
343
344    let catalog_writer = session.catalog_writer()?;
345    catalog_writer
346        .alter_name(
347            alter_name_request::Object::DatabaseId(database_id),
348            &new_database_name,
349        )
350        .await?;
351
352    Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
353}
354
355#[cfg(test)]
356mod tests {
357    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
358
359    use crate::catalog::root_catalog::SchemaPath;
360    use crate::test_utils::LocalFrontend;
361
362    #[tokio::test]
363    async fn test_alter_table_name_handler() {
364        let frontend = LocalFrontend::new(Default::default()).await;
365        let session = frontend.session_ref();
366        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
367
368        let sql = "create table t (i int, r real);";
369        frontend.run_sql(sql).await.unwrap();
370
371        let table_id = {
372            let catalog_reader = session.env().catalog_reader().read_guard();
373            catalog_reader
374                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
375                .unwrap()
376                .0
377                .id
378        };
379
380        // Alter table name.
381        let sql = "alter table t rename to t1;";
382        frontend.run_sql(sql).await.unwrap();
383
384        let catalog_reader = session.env().catalog_reader().read_guard();
385        let altered_table_name = catalog_reader
386            .get_any_table_by_id(&table_id)
387            .unwrap()
388            .name()
389            .to_owned();
390        assert_eq!(altered_table_name, "t1");
391    }
392}