risingwave_frontend/handler/
alter_swap_rename.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::Binder;
22use crate::catalog::CatalogError;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::error::{ErrorCode, Result};
25use crate::handler::{HandlerArgs, RwPgResponse};
26use crate::session::SessionImpl;
27use crate::user::UserId;
28
29/// Check if the session user has the privilege to swap and rename the objects.
30fn check_swap_rename_privilege(
31    session_impl: &Arc<SessionImpl>,
32    src_owner: UserId,
33    target_owner: UserId,
34) -> Result<()> {
35    if !session_impl.is_super_user()
36        && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id())
37    {
38        return Err(ErrorCode::PermissionDenied(format!(
39            "{} is not super user and not the owner of the objects.",
40            session_impl.user_name()
41        ))
42        .into());
43    }
44    Ok(())
45}
46
47pub async fn handle_swap_rename(
48    handler_args: HandlerArgs,
49    source_object: ObjectName,
50    target_object: ObjectName,
51    stmt_type: StatementType,
52) -> Result<RwPgResponse> {
53    let session = handler_args.session;
54    let db_name = &session.database();
55    let (src_schema_name, src_obj_name) =
56        Binder::resolve_schema_qualified_name(db_name, &source_object)?;
57    let search_path = session.config().search_path();
58    let user_name = &session.user_name();
59    let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
60    let (target_schema_name, target_obj_name) =
61        Binder::resolve_schema_qualified_name(db_name, &target_object)?;
62    let target_schema_path =
63        SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name);
64
65    let obj = match stmt_type {
66        StatementType::ALTER_SCHEMA => {
67            // TODO: support it until resolves https://github.com/risingwavelabs/risingwave/issues/19028
68            bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet");
69        }
70        StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
71            let catalog_reader = session.env().catalog_reader().read_guard();
72            let (src_table, _) = catalog_reader.get_created_table_by_name(
73                db_name,
74                src_schema_path,
75                &src_obj_name,
76            )?;
77            let (target_table, _) = catalog_reader.get_created_table_by_name(
78                db_name,
79                target_schema_path,
80                &target_obj_name,
81            )?;
82
83            if src_table.table_type != target_table.table_type {
84                return Err(ErrorCode::PermissionDenied(format!(
85                    "cannot swap between {} and {}: type mismatch",
86                    src_obj_name, target_obj_name
87                ))
88                .into());
89            }
90            if stmt_type == StatementType::ALTER_TABLE && !src_table.is_user_table() {
91                return Err(CatalogError::NotFound("table", src_obj_name.clone()).into());
92            } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
93                return Err(
94                    CatalogError::NotFound("materialized view", src_obj_name.clone()).into(),
95                );
96            }
97
98            check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?;
99
100            (src_table.id, target_table.id).into()
101        }
102        StatementType::ALTER_VIEW => {
103            let catalog_reader = session.env().catalog_reader().read_guard();
104            let (src_view, _) =
105                catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?;
106            let (target_view, _) =
107                catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?;
108            check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?;
109
110            (src_view.id, target_view.id).into()
111        }
112        StatementType::ALTER_SOURCE => {
113            let catalog_reader = session.env().catalog_reader().read_guard();
114            let (src_source, _) =
115                catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?;
116            let (target_source, _) =
117                catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?;
118            check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?;
119
120            (src_source.id, target_source.id).into()
121        }
122        StatementType::ALTER_SINK => {
123            let catalog_reader = session.env().catalog_reader().read_guard();
124            let (src_sink, _) =
125                catalog_reader.get_created_sink_by_name(db_name, src_schema_path, &src_obj_name)?;
126            let (target_sink, _) = catalog_reader.get_created_sink_by_name(
127                db_name,
128                target_schema_path,
129                &target_obj_name,
130            )?;
131            check_swap_rename_privilege(
132                &session,
133                src_sink.owner.user_id,
134                target_sink.owner.user_id,
135            )?;
136
137            (src_sink.id, target_sink.id).into()
138        }
139        StatementType::ALTER_SUBSCRIPTION => {
140            let catalog_reader = session.env().catalog_reader().read_guard();
141            let (src_subscription, _) =
142                catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?;
143            let (target_subscription, _) = catalog_reader.get_subscription_by_name(
144                db_name,
145                target_schema_path,
146                &target_obj_name,
147            )?;
148            check_swap_rename_privilege(
149                &session,
150                src_subscription.owner.user_id,
151                target_subscription.owner.user_id,
152            )?;
153
154            (src_subscription.id, target_subscription.id).into()
155        }
156        _ => {
157            unreachable!("handle_swap_rename: unsupported statement type")
158        }
159    };
160
161    let catalog_writer = session.catalog_writer()?;
162    catalog_writer.alter_swap_rename(obj).await?;
163
164    Ok(RwPgResponse::empty_result(stmt_type))
165}