risingwave_frontend/handler/
alter_rename.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::acl::AclMode;
17use risingwave_common::catalog::is_system_schema;
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::catalog::table_catalog::TableType;
24use crate::error::{ErrorCode, Result};
25
26pub async fn handle_rename_table(
27    handler_args: HandlerArgs,
28    table_type: TableType,
29    table_name: ObjectName,
30    new_table_name: ObjectName,
31) -> Result<RwPgResponse> {
32    let session = handler_args.session;
33    let db_name = &session.database();
34    let (schema_name, real_table_name) =
35        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
36    let new_table_name = Binder::resolve_table_name(new_table_name)?;
37    let search_path = session.config().search_path();
38    let user_name = &session.user_name();
39
40    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
41
42    let table_id = {
43        let reader = session.env().catalog_reader().read_guard();
44        let (table, schema_name) =
45            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
46        if table_type != table.table_type {
47            return Err(ErrorCode::InvalidInputSyntax(format!(
48                "\"{table_name}\" is not a {}",
49                table_type.to_prost().as_str_name()
50            ))
51            .into());
52        }
53
54        session.check_privilege_for_drop_alter(schema_name, &**table)?;
55        table.id
56    };
57
58    let catalog_writer = session.catalog_writer()?;
59    catalog_writer
60        .alter_name(table_id.into(), &new_table_name)
61        .await?;
62
63    let stmt_type = match table_type {
64        TableType::Table => StatementType::ALTER_TABLE,
65        TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW,
66        _ => unreachable!(),
67    };
68    Ok(PgResponse::empty_result(stmt_type))
69}
70
71pub async fn handle_rename_index(
72    handler_args: HandlerArgs,
73    index_name: ObjectName,
74    new_index_name: ObjectName,
75) -> Result<RwPgResponse> {
76    let session = handler_args.session;
77    let db_name = &session.database();
78    let (schema_name, real_index_name) =
79        Binder::resolve_schema_qualified_name(db_name, &index_name)?;
80    let new_index_name = Binder::resolve_index_name(new_index_name)?;
81    let search_path = session.config().search_path();
82    let user_name = &session.user_name();
83
84    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
85
86    let index_id = {
87        let reader = session.env().catalog_reader().read_guard();
88        let (index, schema_name) =
89            reader.get_index_by_name(db_name, schema_path, &real_index_name)?;
90        session.check_privilege_for_drop_alter(schema_name, &**index)?;
91        index.id
92    };
93
94    let catalog_writer = session.catalog_writer()?;
95    catalog_writer
96        .alter_name(index_id.into(), &new_index_name)
97        .await?;
98
99    Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
100}
101
102pub async fn handle_rename_view(
103    handler_args: HandlerArgs,
104    view_name: ObjectName,
105    new_view_name: ObjectName,
106) -> Result<RwPgResponse> {
107    let session = handler_args.session;
108    let db_name = &session.database();
109    let (schema_name, real_view_name) = Binder::resolve_schema_qualified_name(db_name, &view_name)?;
110    let new_view_name = Binder::resolve_view_name(new_view_name)?;
111    let search_path = session.config().search_path();
112    let user_name = &session.user_name();
113
114    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
115
116    let view_id = {
117        let reader = session.env().catalog_reader().read_guard();
118        let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?;
119        session.check_privilege_for_drop_alter(schema_name, &**view)?;
120        view.id
121    };
122
123    let catalog_writer = session.catalog_writer()?;
124    catalog_writer
125        .alter_name(view_id.into(), &new_view_name)
126        .await?;
127
128    Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
129}
130
131pub async fn handle_rename_sink(
132    handler_args: HandlerArgs,
133    sink_name: ObjectName,
134    new_sink_name: ObjectName,
135) -> Result<RwPgResponse> {
136    let session = handler_args.session;
137    let db_name = &session.database();
138    let (schema_name, real_sink_name) = Binder::resolve_schema_qualified_name(db_name, &sink_name)?;
139    let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
140    let search_path = session.config().search_path();
141    let user_name = &session.user_name();
142
143    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
144
145    let sink_id = {
146        let reader = session.env().catalog_reader().read_guard();
147        let (sink, schema_name) =
148            reader.get_created_sink_by_name(db_name, schema_path, &real_sink_name)?;
149        session.check_privilege_for_drop_alter(schema_name, &**sink)?;
150        sink.id
151    };
152
153    let catalog_writer = session.catalog_writer()?;
154    catalog_writer
155        .alter_name(sink_id.into(), &new_sink_name)
156        .await?;
157
158    Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
159}
160
161pub async fn handle_rename_subscription(
162    handler_args: HandlerArgs,
163    subscription_name: ObjectName,
164    new_subscription_name: ObjectName,
165) -> Result<RwPgResponse> {
166    let session = handler_args.session;
167    let db_name = &session.database();
168    let (schema_name, real_subscription_name) =
169        Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
170    let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
171    let search_path = session.config().search_path();
172    let user_name = &session.user_name();
173
174    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
175
176    let subscription_id = {
177        let reader = session.env().catalog_reader().read_guard();
178        let (subscription, schema_name) =
179            reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
180        session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
181        subscription.id
182    };
183
184    let catalog_writer = session.catalog_writer()?;
185    catalog_writer
186        .alter_name(subscription_id.into(), &new_subscription_name)
187        .await?;
188
189    Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
190}
191
192pub async fn handle_rename_source(
193    handler_args: HandlerArgs,
194    source_name: ObjectName,
195    new_source_name: ObjectName,
196) -> Result<RwPgResponse> {
197    let session = handler_args.session;
198    let db_name = &session.database();
199    let (schema_name, real_source_name) =
200        Binder::resolve_schema_qualified_name(db_name, &source_name)?;
201    let new_source_name = Binder::resolve_source_name(new_source_name)?;
202    let search_path = session.config().search_path();
203    let user_name = &session.user_name();
204
205    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
206
207    let source_id = {
208        let reader = session.env().catalog_reader().read_guard();
209        let (source, schema_name) =
210            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
211
212        // For `CREATE TABLE WITH (connector = '...')`, users should call `ALTER TABLE` instead.
213        if source.associated_table_id.is_some() {
214            return Err(ErrorCode::InvalidInputSyntax(
215                "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
216            )
217            .into());
218        }
219
220        session.check_privilege_for_drop_alter(schema_name, &**source)?;
221        source.id
222    };
223
224    let catalog_writer = session.catalog_writer()?;
225    catalog_writer
226        .alter_name(source_id.into(), &new_source_name)
227        .await?;
228
229    Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
230}
231
232pub async fn handle_rename_schema(
233    handler_args: HandlerArgs,
234    schema_name: ObjectName,
235    new_schema_name: ObjectName,
236) -> Result<RwPgResponse> {
237    let session = handler_args.session;
238    let db_name = &session.database();
239    let schema_name = Binder::resolve_schema_name(schema_name)?;
240    let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
241
242    let schema_id = {
243        let user_reader = session.env().user_info_reader().read_guard();
244        let catalog_reader = session.env().catalog_reader().read_guard();
245        let schema = catalog_reader.get_schema_by_name(db_name, &schema_name)?;
246        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
247
248        // The current one should not be system schema.
249        if is_system_schema(&schema.name()) {
250            return Err(ErrorCode::ProtocolError(format!(
251                "permission denied to rename on \"{}\", System catalog modifications are currently disallowed.",
252                schema_name
253            )).into());
254        }
255
256        // The user should be super user or owner to alter the schema.
257        session.check_privilege_for_drop_alter_db_schema(schema)?;
258
259        // To rename a schema you must also have the CREATE privilege for the database.
260        if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
261            if !user.is_super && !user.has_privilege(db_id, AclMode::Create) {
262                return Err(ErrorCode::PermissionDenied(
263                    "Do not have CREATE privilege on the current database".to_owned(),
264                )
265                .into());
266            }
267        } else {
268            return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
269        }
270
271        schema.id()
272    };
273
274    let catalog_writer = session.catalog_writer()?;
275    catalog_writer
276        .alter_name(schema_id.into(), &new_schema_name)
277        .await?;
278
279    Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
280}
281
282pub async fn handle_rename_database(
283    handler_args: HandlerArgs,
284    database_name: ObjectName,
285    new_database_name: ObjectName,
286) -> Result<RwPgResponse> {
287    let session = handler_args.session;
288    let database_name = Binder::resolve_database_name(database_name)?;
289    let new_database_name = Binder::resolve_database_name(new_database_name)?;
290
291    let database_id = {
292        let user_reader = session.env().user_info_reader().read_guard();
293        let catalog_reader = session.env().catalog_reader().read_guard();
294        let database = catalog_reader.get_database_by_name(&database_name)?;
295
296        // The user should be super user or owner to alter the database.
297        session.check_privilege_for_drop_alter_db_schema(database)?;
298
299        // Non-superuser owners must also have the CREATEDB privilege.
300        if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
301            if !user.is_super && !user.can_create_db {
302                return Err(ErrorCode::PermissionDenied(
303                    "Non-superuser owners must also have the CREATEDB privilege".to_owned(),
304                )
305                .into());
306            }
307        } else {
308            return Err(ErrorCode::PermissionDenied("Session user is invalid".to_owned()).into());
309        }
310
311        // The current database cannot be renamed.
312        if database_name == session.database() {
313            return Err(ErrorCode::PermissionDenied(
314                "Current database cannot be renamed".to_owned(),
315            )
316            .into());
317        }
318
319        database.id()
320    };
321
322    let catalog_writer = session.catalog_writer()?;
323    catalog_writer
324        .alter_name(database_id.into(), &new_database_name)
325        .await?;
326
327    Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
328}
329
330#[cfg(test)]
331mod tests {
332    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
333
334    use crate::catalog::root_catalog::SchemaPath;
335    use crate::test_utils::LocalFrontend;
336
337    #[tokio::test]
338    async fn test_alter_table_name_handler() {
339        let frontend = LocalFrontend::new(Default::default()).await;
340        let session = frontend.session_ref();
341        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
342
343        let sql = "create table t (i int, r real);";
344        frontend.run_sql(sql).await.unwrap();
345
346        let table_id = {
347            let catalog_reader = session.env().catalog_reader().read_guard();
348            catalog_reader
349                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
350                .unwrap()
351                .0
352                .id
353        };
354
355        // Alter table name.
356        let sql = "alter table t rename to t1;";
357        frontend.run_sql(sql).await.unwrap();
358
359        let catalog_reader = session.env().catalog_reader().read_guard();
360        let altered_table_name = catalog_reader
361            .get_any_table_by_id(table_id)
362            .unwrap()
363            .name()
364            .to_owned();
365        assert_eq!(altered_table_name, "t1");
366    }
367}