risingwave_frontend/handler/
alter_swap_rename.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::bail_not_implemented;
19use risingwave_pb::ddl_service::alter_swap_rename_request;
20use risingwave_pb::ddl_service::alter_swap_rename_request::ObjectNameSwapPair;
21use risingwave_sqlparser::ast::ObjectName;
22
23use crate::Binder;
24use crate::catalog::CatalogError;
25use crate::catalog::root_catalog::SchemaPath;
26use crate::error::{ErrorCode, Result};
27use crate::handler::{HandlerArgs, RwPgResponse};
28use crate::session::SessionImpl;
29use crate::user::UserId;
30
31/// Check if the session user has the privilege to swap and rename the objects.
32fn check_swap_rename_privilege(
33    session_impl: &Arc<SessionImpl>,
34    src_owner: UserId,
35    target_owner: UserId,
36) -> Result<()> {
37    if !session_impl.is_super_user()
38        && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id())
39    {
40        return Err(ErrorCode::PermissionDenied(format!(
41            "{} is not super user and not the owner of the objects.",
42            session_impl.user_name()
43        ))
44        .into());
45    }
46    Ok(())
47}
48
49pub async fn handle_swap_rename(
50    handler_args: HandlerArgs,
51    source_object: ObjectName,
52    target_object: ObjectName,
53    stmt_type: StatementType,
54) -> Result<RwPgResponse> {
55    let session = handler_args.session;
56    let db_name = &session.database();
57    let (src_schema_name, src_obj_name) =
58        Binder::resolve_schema_qualified_name(db_name, source_object)?;
59    let search_path = session.config().search_path();
60    let user_name = &session.user_name();
61    let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
62    let (target_schema_name, target_obj_name) =
63        Binder::resolve_schema_qualified_name(db_name, target_object)?;
64    let target_schema_path =
65        SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name);
66
67    let obj = match stmt_type {
68        StatementType::ALTER_SCHEMA => {
69            // TODO: support it until resolves https://github.com/risingwavelabs/risingwave/issues/19028
70            bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet");
71        }
72        StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
73            let catalog_reader = session.env().catalog_reader().read_guard();
74            let (src_table, _) = catalog_reader.get_created_table_by_name(
75                db_name,
76                src_schema_path,
77                &src_obj_name,
78            )?;
79            let (target_table, _) = catalog_reader.get_created_table_by_name(
80                db_name,
81                target_schema_path,
82                &target_obj_name,
83            )?;
84
85            if src_table.table_type != target_table.table_type {
86                return Err(ErrorCode::PermissionDenied(format!(
87                    "cannot swap between {} and {}: type mismatch",
88                    src_obj_name, target_obj_name
89                ))
90                .into());
91            }
92            if stmt_type == StatementType::ALTER_TABLE && !src_table.is_user_table() {
93                return Err(CatalogError::NotFound("table", src_obj_name.clone()).into());
94            } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
95                return Err(
96                    CatalogError::NotFound("materialized view", src_obj_name.clone()).into(),
97                );
98            }
99
100            check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?;
101
102            alter_swap_rename_request::Object::Table(ObjectNameSwapPair {
103                src_object_id: src_table.id.table_id,
104                dst_object_id: target_table.id.table_id,
105            })
106        }
107        StatementType::ALTER_VIEW => {
108            let catalog_reader = session.env().catalog_reader().read_guard();
109            let (src_view, _) =
110                catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?;
111            let (target_view, _) =
112                catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?;
113            check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?;
114
115            alter_swap_rename_request::Object::View(ObjectNameSwapPair {
116                src_object_id: src_view.id,
117                dst_object_id: target_view.id,
118            })
119        }
120        StatementType::ALTER_SOURCE => {
121            let catalog_reader = session.env().catalog_reader().read_guard();
122            let (src_source, _) =
123                catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?;
124            let (target_source, _) =
125                catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?;
126            check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?;
127
128            alter_swap_rename_request::Object::Source(ObjectNameSwapPair {
129                src_object_id: src_source.id,
130                dst_object_id: target_source.id,
131            })
132        }
133        StatementType::ALTER_SINK => {
134            let catalog_reader = session.env().catalog_reader().read_guard();
135            let (src_sink, _) =
136                catalog_reader.get_sink_by_name(db_name, src_schema_path, &src_obj_name)?;
137            let (target_sink, _) =
138                catalog_reader.get_sink_by_name(db_name, target_schema_path, &target_obj_name)?;
139            check_swap_rename_privilege(
140                &session,
141                src_sink.owner.user_id,
142                target_sink.owner.user_id,
143            )?;
144
145            alter_swap_rename_request::Object::Sink(ObjectNameSwapPair {
146                src_object_id: src_sink.id.sink_id,
147                dst_object_id: target_sink.id.sink_id,
148            })
149        }
150        StatementType::ALTER_SUBSCRIPTION => {
151            let catalog_reader = session.env().catalog_reader().read_guard();
152            let (src_subscription, _) =
153                catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?;
154            let (target_subscription, _) = catalog_reader.get_subscription_by_name(
155                db_name,
156                target_schema_path,
157                &target_obj_name,
158            )?;
159            check_swap_rename_privilege(
160                &session,
161                src_subscription.owner.user_id,
162                target_subscription.owner.user_id,
163            )?;
164
165            alter_swap_rename_request::Object::Subscription(ObjectNameSwapPair {
166                src_object_id: src_subscription.id.subscription_id,
167                dst_object_id: target_subscription.id.subscription_id,
168            })
169        }
170        _ => {
171            unreachable!("handle_swap_rename: unsupported statement type")
172        }
173    };
174
175    let catalog_writer = session.catalog_writer()?;
176    catalog_writer.alter_swap_rename(obj).await?;
177
178    Ok(RwPgResponse::empty_result(stmt_type))
179}