risingwave_frontend/handler/
alter_rename.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::is_system_schema;
18use risingwave_pb::ddl_service::alter_name_request;
19use risingwave_pb::user::grant_privilege;
20use risingwave_sqlparser::ast::ObjectName;
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::Binder;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::table_catalog::TableType;
26use crate::error::{ErrorCode, Result};
27
28pub async fn handle_rename_table(
29    handler_args: HandlerArgs,
30    table_type: TableType,
31    table_name: ObjectName,
32    new_table_name: ObjectName,
33) -> Result<RwPgResponse> {
34    let session = handler_args.session;
35    let db_name = &session.database();
36    let (schema_name, real_table_name) =
37        Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
38    let new_table_name = Binder::resolve_table_name(new_table_name)?;
39    let search_path = session.config().search_path();
40    let user_name = &session.user_name();
41
42    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
43
44    let table_id = {
45        let reader = session.env().catalog_reader().read_guard();
46        let (table, schema_name) =
47            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
48        if table_type != table.table_type {
49            return Err(ErrorCode::InvalidInputSyntax(format!(
50                "\"{table_name}\" is not a {}",
51                table_type.to_prost().as_str_name()
52            ))
53            .into());
54        }
55
56        session.check_privilege_for_drop_alter(schema_name, &**table)?;
57        table.id
58    };
59
60    let catalog_writer = session.catalog_writer()?;
61    catalog_writer
62        .alter_name(
63            alter_name_request::Object::TableId(table_id.table_id),
64            &new_table_name,
65        )
66        .await?;
67
68    let stmt_type = match table_type {
69        TableType::Table => StatementType::ALTER_TABLE,
70        TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW,
71        _ => unreachable!(),
72    };
73    Ok(PgResponse::empty_result(stmt_type))
74}
75
76pub async fn handle_rename_index(
77    handler_args: HandlerArgs,
78    index_name: ObjectName,
79    new_index_name: ObjectName,
80) -> Result<RwPgResponse> {
81    let session = handler_args.session;
82    let db_name = &session.database();
83    let (schema_name, real_index_name) =
84        Binder::resolve_schema_qualified_name(db_name, index_name.clone())?;
85    let new_index_name = Binder::resolve_index_name(new_index_name)?;
86    let search_path = session.config().search_path();
87    let user_name = &session.user_name();
88
89    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
90
91    let index_id = {
92        let reader = session.env().catalog_reader().read_guard();
93        let (index, schema_name) =
94            reader.get_index_by_name(db_name, schema_path, &real_index_name)?;
95        session.check_privilege_for_drop_alter(schema_name, &**index)?;
96        index.id
97    };
98
99    let catalog_writer = session.catalog_writer()?;
100    catalog_writer
101        .alter_name(
102            alter_name_request::Object::IndexId(index_id.index_id),
103            &new_index_name,
104        )
105        .await?;
106
107    Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
108}
109
110pub async fn handle_rename_view(
111    handler_args: HandlerArgs,
112    view_name: ObjectName,
113    new_view_name: ObjectName,
114) -> Result<RwPgResponse> {
115    let session = handler_args.session;
116    let db_name = &session.database();
117    let (schema_name, real_view_name) =
118        Binder::resolve_schema_qualified_name(db_name, view_name.clone())?;
119    let new_view_name = Binder::resolve_view_name(new_view_name)?;
120    let search_path = session.config().search_path();
121    let user_name = &session.user_name();
122
123    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
124
125    let view_id = {
126        let reader = session.env().catalog_reader().read_guard();
127        let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?;
128        session.check_privilege_for_drop_alter(schema_name, &**view)?;
129        view.id
130    };
131
132    let catalog_writer = session.catalog_writer()?;
133    catalog_writer
134        .alter_name(alter_name_request::Object::ViewId(view_id), &new_view_name)
135        .await?;
136
137    Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
138}
139
140pub async fn handle_rename_sink(
141    handler_args: HandlerArgs,
142    sink_name: ObjectName,
143    new_sink_name: ObjectName,
144) -> Result<RwPgResponse> {
145    let session = handler_args.session;
146    let db_name = &session.database();
147    let (schema_name, real_sink_name) =
148        Binder::resolve_schema_qualified_name(db_name, sink_name.clone())?;
149    let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
150    let search_path = session.config().search_path();
151    let user_name = &session.user_name();
152
153    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
154
155    let sink_id = {
156        let reader = session.env().catalog_reader().read_guard();
157        let (sink, schema_name) = reader.get_sink_by_name(db_name, schema_path, &real_sink_name)?;
158        session.check_privilege_for_drop_alter(schema_name, &**sink)?;
159        sink.id
160    };
161
162    let catalog_writer = session.catalog_writer()?;
163    catalog_writer
164        .alter_name(
165            alter_name_request::Object::SinkId(sink_id.sink_id),
166            &new_sink_name,
167        )
168        .await?;
169
170    Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
171}
172
173pub async fn handle_rename_subscription(
174    handler_args: HandlerArgs,
175    subscription_name: ObjectName,
176    new_subscription_name: ObjectName,
177) -> Result<RwPgResponse> {
178    let session = handler_args.session;
179    let db_name = &session.database();
180    let (schema_name, real_subscription_name) =
181        Binder::resolve_schema_qualified_name(db_name, subscription_name.clone())?;
182    let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
183    let search_path = session.config().search_path();
184    let user_name = &session.user_name();
185
186    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
187
188    let subscription_id = {
189        let reader = session.env().catalog_reader().read_guard();
190        let (subscription, schema_name) =
191            reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
192        session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
193        subscription.id
194    };
195
196    let catalog_writer = session.catalog_writer()?;
197    catalog_writer
198        .alter_name(
199            alter_name_request::Object::SubscriptionId(subscription_id.subscription_id),
200            &new_subscription_name,
201        )
202        .await?;
203
204    Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
205}
206
207pub async fn handle_rename_source(
208    handler_args: HandlerArgs,
209    source_name: ObjectName,
210    new_source_name: ObjectName,
211) -> Result<RwPgResponse> {
212    let session = handler_args.session;
213    let db_name = &session.database();
214    let (schema_name, real_source_name) =
215        Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
216    let new_source_name = Binder::resolve_source_name(new_source_name)?;
217    let search_path = session.config().search_path();
218    let user_name = &session.user_name();
219
220    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
221
222    let source_id = {
223        let reader = session.env().catalog_reader().read_guard();
224        let (source, schema_name) =
225            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
226
227        // For `CREATE TABLE WITH (connector = '...')`, users should call `ALTER TABLE` instead.
228        if source.associated_table_id.is_some() {
229            return Err(ErrorCode::InvalidInputSyntax(
230                "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
231            )
232            .into());
233        }
234
235        session.check_privilege_for_drop_alter(schema_name, &**source)?;
236        source.id
237    };
238
239    let catalog_writer = session.catalog_writer()?;
240    catalog_writer
241        .alter_name(
242            alter_name_request::Object::SourceId(source_id),
243            &new_source_name,
244        )
245        .await?;
246
247    Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
248}
249
250pub async fn handle_rename_schema(
251    handler_args: HandlerArgs,
252    schema_name: ObjectName,
253    new_schema_name: ObjectName,
254) -> Result<RwPgResponse> {
255    let session = handler_args.session;
256    let db_name = &session.database();
257    let schema_name = Binder::resolve_schema_name(schema_name)?;
258    let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
259
260    let schema_id = {
261        let user_reader = session.env().user_info_reader().read_guard();
262        let catalog_reader = session.env().catalog_reader().read_guard();
263        let schema = catalog_reader.get_schema_by_name(db_name, &schema_name)?;
264        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
265
266        // The current one should not be system schema.
267        if is_system_schema(&schema.name()) {
268            return Err(ErrorCode::ProtocolError(format!(
269                "permission denied to rename on \"{}\", System catalog modifications are currently disallowed.",
270                schema_name
271            )).into());
272        }
273
274        // The user should be super user or owner to alter the schema.
275        session.check_privilege_for_drop_alter_db_schema(schema)?;
276
277        // To rename a schema you must also have the CREATE privilege for the database.
278        if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
279            if !user.is_super
280                && !user.has_privilege(&grant_privilege::Object::DatabaseId(db_id), AclMode::Create)
281            {
282                return Err(ErrorCode::PermissionDenied(
283                    "Do not have create privilege on the current database".to_owned(),
284                )
285                .into());
286            }
287        } else {
288            return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
289        }
290
291        schema.id()
292    };
293
294    let catalog_writer = session.catalog_writer()?;
295    catalog_writer
296        .alter_name(
297            alter_name_request::Object::SchemaId(schema_id),
298            &new_schema_name,
299        )
300        .await?;
301
302    Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
303}
304
305pub async fn handle_rename_database(
306    handler_args: HandlerArgs,
307    database_name: ObjectName,
308    new_database_name: ObjectName,
309) -> Result<RwPgResponse> {
310    let session = handler_args.session;
311    let database_name = Binder::resolve_database_name(database_name)?;
312    let new_database_name = Binder::resolve_database_name(new_database_name)?;
313
314    let database_id = {
315        let user_reader = session.env().user_info_reader().read_guard();
316        let catalog_reader = session.env().catalog_reader().read_guard();
317        let database = catalog_reader.get_database_by_name(&database_name)?;
318
319        // The user should be super user or owner to alter the database.
320        session.check_privilege_for_drop_alter_db_schema(database)?;
321
322        // Non-superuser owners must also have the CREATEDB privilege.
323        if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
324            if !user.is_super && !user.can_create_db {
325                return Err(ErrorCode::PermissionDenied(
326                    "Non-superuser owners must also have the CREATEDB privilege".to_owned(),
327                )
328                .into());
329            }
330        } else {
331            return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
332        }
333
334        // The current database cannot be renamed.
335        if database_name == session.database() {
336            return Err(ErrorCode::PermissionDenied(
337                "Current database cannot be renamed".to_owned(),
338            )
339            .into());
340        }
341
342        database.id()
343    };
344
345    let catalog_writer = session.catalog_writer()?;
346    catalog_writer
347        .alter_name(
348            alter_name_request::Object::DatabaseId(database_id),
349            &new_database_name,
350        )
351        .await?;
352
353    Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
354}
355
356#[cfg(test)]
357mod tests {
358    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
359
360    use crate::catalog::root_catalog::SchemaPath;
361    use crate::test_utils::LocalFrontend;
362
363    #[tokio::test]
364    async fn test_alter_table_name_handler() {
365        let frontend = LocalFrontend::new(Default::default()).await;
366        let session = frontend.session_ref();
367        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
368
369        let sql = "create table t (i int, r real);";
370        frontend.run_sql(sql).await.unwrap();
371
372        let table_id = {
373            let catalog_reader = session.env().catalog_reader().read_guard();
374            catalog_reader
375                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
376                .unwrap()
377                .0
378                .id
379        };
380
381        // Alter table name.
382        let sql = "alter table t rename to t1;";
383        frontend.run_sql(sql).await.unwrap();
384
385        let catalog_reader = session.env().catalog_reader().read_guard();
386        let altered_table_name = catalog_reader
387            .get_any_table_by_id(&table_id)
388            .unwrap()
389            .name()
390            .to_owned();
391        assert_eq!(altered_table_name, "t1");
392    }
393}