risingwave_frontend/handler/
alter_rename.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::is_system_schema;
18use risingwave_pb::ddl_service::alter_name_request;
19use risingwave_sqlparser::ast::ObjectName;
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::Binder;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::table_catalog::TableType;
25use crate::error::{ErrorCode, Result};
26
27pub async fn handle_rename_table(
28    handler_args: HandlerArgs,
29    table_type: TableType,
30    table_name: ObjectName,
31    new_table_name: ObjectName,
32) -> Result<RwPgResponse> {
33    let session = handler_args.session;
34    let db_name = &session.database();
35    let (schema_name, real_table_name) =
36        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
37    let new_table_name = Binder::resolve_table_name(new_table_name)?;
38    let search_path = session.config().search_path();
39    let user_name = &session.user_name();
40
41    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
42
43    let table_id = {
44        let reader = session.env().catalog_reader().read_guard();
45        let (table, schema_name) =
46            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
47        if table_type != table.table_type {
48            return Err(ErrorCode::InvalidInputSyntax(format!(
49                "\"{table_name}\" is not a {}",
50                table_type.to_prost().as_str_name()
51            ))
52            .into());
53        }
54
55        session.check_privilege_for_drop_alter(schema_name, &**table)?;
56        table.id
57    };
58
59    let catalog_writer = session.catalog_writer()?;
60    catalog_writer
61        .alter_name(table_id.into(), &new_table_name)
62        .await?;
63
64    let stmt_type = match table_type {
65        TableType::Table => StatementType::ALTER_TABLE,
66        TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW,
67        _ => unreachable!(),
68    };
69    Ok(PgResponse::empty_result(stmt_type))
70}
71
72pub async fn handle_rename_index(
73    handler_args: HandlerArgs,
74    index_name: ObjectName,
75    new_index_name: ObjectName,
76) -> Result<RwPgResponse> {
77    let session = handler_args.session;
78    let db_name = &session.database();
79    let (schema_name, real_index_name) =
80        Binder::resolve_schema_qualified_name(db_name, &index_name)?;
81    let new_index_name = Binder::resolve_index_name(new_index_name)?;
82    let search_path = session.config().search_path();
83    let user_name = &session.user_name();
84
85    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
86
87    let index_id = {
88        let reader = session.env().catalog_reader().read_guard();
89        let (index, schema_name) =
90            reader.get_index_by_name(db_name, schema_path, &real_index_name)?;
91        session.check_privilege_for_drop_alter(schema_name, &**index)?;
92        index.id
93    };
94
95    let catalog_writer = session.catalog_writer()?;
96    catalog_writer
97        .alter_name(
98            alter_name_request::Object::IndexId(index_id.index_id),
99            &new_index_name,
100        )
101        .await?;
102
103    Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
104}
105
106pub async fn handle_rename_view(
107    handler_args: HandlerArgs,
108    view_name: ObjectName,
109    new_view_name: ObjectName,
110) -> Result<RwPgResponse> {
111    let session = handler_args.session;
112    let db_name = &session.database();
113    let (schema_name, real_view_name) = Binder::resolve_schema_qualified_name(db_name, &view_name)?;
114    let new_view_name = Binder::resolve_view_name(new_view_name)?;
115    let search_path = session.config().search_path();
116    let user_name = &session.user_name();
117
118    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
119
120    let view_id = {
121        let reader = session.env().catalog_reader().read_guard();
122        let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?;
123        session.check_privilege_for_drop_alter(schema_name, &**view)?;
124        view.id
125    };
126
127    let catalog_writer = session.catalog_writer()?;
128    catalog_writer
129        .alter_name(alter_name_request::Object::ViewId(view_id), &new_view_name)
130        .await?;
131
132    Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
133}
134
135pub async fn handle_rename_sink(
136    handler_args: HandlerArgs,
137    sink_name: ObjectName,
138    new_sink_name: ObjectName,
139) -> Result<RwPgResponse> {
140    let session = handler_args.session;
141    let db_name = &session.database();
142    let (schema_name, real_sink_name) = Binder::resolve_schema_qualified_name(db_name, &sink_name)?;
143    let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
144    let search_path = session.config().search_path();
145    let user_name = &session.user_name();
146
147    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
148
149    let sink_id = {
150        let reader = session.env().catalog_reader().read_guard();
151        let (sink, schema_name) =
152            reader.get_created_sink_by_name(db_name, schema_path, &real_sink_name)?;
153        session.check_privilege_for_drop_alter(schema_name, &**sink)?;
154        sink.id
155    };
156
157    let catalog_writer = session.catalog_writer()?;
158    catalog_writer
159        .alter_name(
160            alter_name_request::Object::SinkId(sink_id.sink_id),
161            &new_sink_name,
162        )
163        .await?;
164
165    Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
166}
167
168pub async fn handle_rename_subscription(
169    handler_args: HandlerArgs,
170    subscription_name: ObjectName,
171    new_subscription_name: ObjectName,
172) -> Result<RwPgResponse> {
173    let session = handler_args.session;
174    let db_name = &session.database();
175    let (schema_name, real_subscription_name) =
176        Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
177    let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
178    let search_path = session.config().search_path();
179    let user_name = &session.user_name();
180
181    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
182
183    let subscription_id = {
184        let reader = session.env().catalog_reader().read_guard();
185        let (subscription, schema_name) =
186            reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
187        session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
188        subscription.id
189    };
190
191    let catalog_writer = session.catalog_writer()?;
192    catalog_writer
193        .alter_name(
194            alter_name_request::Object::SubscriptionId(subscription_id.subscription_id),
195            &new_subscription_name,
196        )
197        .await?;
198
199    Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
200}
201
202pub async fn handle_rename_source(
203    handler_args: HandlerArgs,
204    source_name: ObjectName,
205    new_source_name: ObjectName,
206) -> Result<RwPgResponse> {
207    let session = handler_args.session;
208    let db_name = &session.database();
209    let (schema_name, real_source_name) =
210        Binder::resolve_schema_qualified_name(db_name, &source_name)?;
211    let new_source_name = Binder::resolve_source_name(new_source_name)?;
212    let search_path = session.config().search_path();
213    let user_name = &session.user_name();
214
215    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
216
217    let source_id = {
218        let reader = session.env().catalog_reader().read_guard();
219        let (source, schema_name) =
220            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
221
222        // For `CREATE TABLE WITH (connector = '...')`, users should call `ALTER TABLE` instead.
223        if source.associated_table_id.is_some() {
224            return Err(ErrorCode::InvalidInputSyntax(
225                "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
226            )
227            .into());
228        }
229
230        session.check_privilege_for_drop_alter(schema_name, &**source)?;
231        source.id
232    };
233
234    let catalog_writer = session.catalog_writer()?;
235    catalog_writer
236        .alter_name(
237            alter_name_request::Object::SourceId(source_id),
238            &new_source_name,
239        )
240        .await?;
241
242    Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
243}
244
245pub async fn handle_rename_schema(
246    handler_args: HandlerArgs,
247    schema_name: ObjectName,
248    new_schema_name: ObjectName,
249) -> Result<RwPgResponse> {
250    let session = handler_args.session;
251    let db_name = &session.database();
252    let schema_name = Binder::resolve_schema_name(schema_name)?;
253    let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
254
255    let schema_id = {
256        let user_reader = session.env().user_info_reader().read_guard();
257        let catalog_reader = session.env().catalog_reader().read_guard();
258        let schema = catalog_reader.get_schema_by_name(db_name, &schema_name)?;
259        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
260
261        // The current one should not be system schema.
262        if is_system_schema(&schema.name()) {
263            return Err(ErrorCode::ProtocolError(format!(
264                "permission denied to rename on \"{}\", System catalog modifications are currently disallowed.",
265                schema_name
266            )).into());
267        }
268
269        // The user should be super user or owner to alter the schema.
270        session.check_privilege_for_drop_alter_db_schema(schema)?;
271
272        // To rename a schema you must also have the CREATE privilege for the database.
273        if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
274            if !user.is_super && !user.has_privilege(db_id, AclMode::Create) {
275                return Err(ErrorCode::PermissionDenied(
276                    "Do not have CREATE privilege on the current database".to_owned(),
277                )
278                .into());
279            }
280        } else {
281            return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
282        }
283
284        schema.id()
285    };
286
287    let catalog_writer = session.catalog_writer()?;
288    catalog_writer
289        .alter_name(schema_id.into(), &new_schema_name)
290        .await?;
291
292    Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
293}
294
295pub async fn handle_rename_database(
296    handler_args: HandlerArgs,
297    database_name: ObjectName,
298    new_database_name: ObjectName,
299) -> Result<RwPgResponse> {
300    let session = handler_args.session;
301    let database_name = Binder::resolve_database_name(database_name)?;
302    let new_database_name = Binder::resolve_database_name(new_database_name)?;
303
304    let database_id = {
305        let user_reader = session.env().user_info_reader().read_guard();
306        let catalog_reader = session.env().catalog_reader().read_guard();
307        let database = catalog_reader.get_database_by_name(&database_name)?;
308
309        // The user should be super user or owner to alter the database.
310        session.check_privilege_for_drop_alter_db_schema(database)?;
311
312        // Non-superuser owners must also have the CREATEDB privilege.
313        if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
314            if !user.is_super && !user.can_create_db {
315                return Err(ErrorCode::PermissionDenied(
316                    "Non-superuser owners must also have the CREATEDB privilege".to_owned(),
317                )
318                .into());
319            }
320        } else {
321            return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
322        }
323
324        // The current database cannot be renamed.
325        if database_name == session.database() {
326            return Err(ErrorCode::PermissionDenied(
327                "Current database cannot be renamed".to_owned(),
328            )
329            .into());
330        }
331
332        database.id()
333    };
334
335    let catalog_writer = session.catalog_writer()?;
336    catalog_writer
337        .alter_name(database_id.into(), &new_database_name)
338        .await?;
339
340    Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
341}
342
343#[cfg(test)]
344mod tests {
345    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
346
347    use crate::catalog::root_catalog::SchemaPath;
348    use crate::test_utils::LocalFrontend;
349
350    #[tokio::test]
351    async fn test_alter_table_name_handler() {
352        let frontend = LocalFrontend::new(Default::default()).await;
353        let session = frontend.session_ref();
354        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
355
356        let sql = "create table t (i int, r real);";
357        frontend.run_sql(sql).await.unwrap();
358
359        let table_id = {
360            let catalog_reader = session.env().catalog_reader().read_guard();
361            catalog_reader
362                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
363                .unwrap()
364                .0
365                .id
366        };
367
368        // Alter table name.
369        let sql = "alter table t rename to t1;";
370        frontend.run_sql(sql).await.unwrap();
371
372        let catalog_reader = session.env().catalog_reader().read_guard();
373        let altered_table_name = catalog_reader
374            .get_any_table_by_id(&table_id)
375            .unwrap()
376            .name()
377            .to_owned();
378        assert_eq!(altered_table_name, "t1");
379    }
380}