risingwave_frontend/handler/
alter_swap_rename.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::Binder;
22use crate::catalog::CatalogError;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::error::{ErrorCode, Result};
25use crate::handler::{HandlerArgs, RwPgResponse};
26use crate::session::SessionImpl;
27use crate::user::UserId;
28
29/// Check if the session user has the privilege to swap and rename the objects.
30fn check_swap_rename_privilege(
31    session_impl: &Arc<SessionImpl>,
32    src_owner: UserId,
33    target_owner: UserId,
34) -> Result<()> {
35    if !session_impl.is_super_user()
36        && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id())
37    {
38        return Err(ErrorCode::PermissionDenied(format!(
39            "{} is not super user and not the owner of the objects.",
40            session_impl.user_name()
41        ))
42        .into());
43    }
44    Ok(())
45}
46
47pub async fn handle_swap_rename(
48    handler_args: HandlerArgs,
49    source_object: ObjectName,
50    target_object: ObjectName,
51    stmt_type: StatementType,
52) -> Result<RwPgResponse> {
53    let session = handler_args.session;
54    let db_name = &session.database();
55    let (src_schema_name, src_obj_name) =
56        Binder::resolve_schema_qualified_name(db_name, &source_object)?;
57    let search_path = session.config().search_path();
58    let user_name = &session.user_name();
59    let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
60    let (target_schema_name, target_obj_name) =
61        Binder::resolve_schema_qualified_name(db_name, &target_object)?;
62    let target_schema_path =
63        SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name);
64
65    let obj = match stmt_type {
66        StatementType::ALTER_SCHEMA => {
67            // TODO: support it until resolves https://github.com/risingwavelabs/risingwave/issues/19028
68            bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet");
69        }
70        StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
71            let catalog_reader = session.env().catalog_reader().read_guard();
72            let (src_table, _) = catalog_reader.get_created_table_by_name(
73                db_name,
74                src_schema_path,
75                &src_obj_name,
76            )?;
77            let (target_table, _) = catalog_reader.get_created_table_by_name(
78                db_name,
79                target_schema_path,
80                &target_obj_name,
81            )?;
82
83            if src_table.table_type != target_table.table_type {
84                return Err(ErrorCode::PermissionDenied(format!(
85                    "cannot swap between {} and {}: type mismatch",
86                    src_obj_name, target_obj_name
87                ))
88                .into());
89            }
90            if stmt_type == StatementType::ALTER_TABLE && !src_table.is_user_table() {
91                return Err(CatalogError::not_found("table", &src_obj_name).into());
92            } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
93                return Err(CatalogError::not_found("materialized view", &src_obj_name).into());
94            }
95
96            check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?;
97
98            (src_table.id, target_table.id).into()
99        }
100        StatementType::ALTER_VIEW => {
101            let catalog_reader = session.env().catalog_reader().read_guard();
102            let (src_view, _) =
103                catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?;
104            let (target_view, _) =
105                catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?;
106            check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?;
107
108            (src_view.id, target_view.id).into()
109        }
110        StatementType::ALTER_SOURCE => {
111            let catalog_reader = session.env().catalog_reader().read_guard();
112            let (src_source, _) =
113                catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?;
114            let (target_source, _) =
115                catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?;
116            check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?;
117
118            (src_source.id, target_source.id).into()
119        }
120        StatementType::ALTER_SINK => {
121            let catalog_reader = session.env().catalog_reader().read_guard();
122            let (src_sink, _) =
123                catalog_reader.get_created_sink_by_name(db_name, src_schema_path, &src_obj_name)?;
124            let (target_sink, _) = catalog_reader.get_created_sink_by_name(
125                db_name,
126                target_schema_path,
127                &target_obj_name,
128            )?;
129            check_swap_rename_privilege(
130                &session,
131                src_sink.owner.user_id,
132                target_sink.owner.user_id,
133            )?;
134
135            (src_sink.id, target_sink.id).into()
136        }
137        StatementType::ALTER_SUBSCRIPTION => {
138            let catalog_reader = session.env().catalog_reader().read_guard();
139            let (src_subscription, _) =
140                catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?;
141            let (target_subscription, _) = catalog_reader.get_subscription_by_name(
142                db_name,
143                target_schema_path,
144                &target_obj_name,
145            )?;
146            check_swap_rename_privilege(
147                &session,
148                src_subscription.owner.user_id,
149                target_subscription.owner.user_id,
150            )?;
151
152            (src_subscription.id, target_subscription.id).into()
153        }
154        _ => {
155            unreachable!("handle_swap_rename: unsupported statement type")
156        }
157    };
158
159    let catalog_writer = session.catalog_writer()?;
160    catalog_writer.alter_swap_rename(obj).await?;
161
162    Ok(RwPgResponse::empty_result(stmt_type))
163}