risingwave_frontend/handler/
alter_swap_rename.rs1use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::Binder;
22use crate::catalog::CatalogError;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::error::{ErrorCode, Result};
25use crate::handler::{HandlerArgs, RwPgResponse};
26use crate::session::SessionImpl;
27use crate::user::UserId;
28
29fn check_swap_rename_privilege(
31 session_impl: &Arc<SessionImpl>,
32 src_owner: UserId,
33 target_owner: UserId,
34) -> Result<()> {
35 if !session_impl.is_super_user()
36 && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id())
37 {
38 return Err(ErrorCode::PermissionDenied(format!(
39 "{} is not super user and not the owner of the objects.",
40 session_impl.user_name()
41 ))
42 .into());
43 }
44 Ok(())
45}
46
47pub async fn handle_swap_rename(
48 handler_args: HandlerArgs,
49 source_object: ObjectName,
50 target_object: ObjectName,
51 stmt_type: StatementType,
52) -> Result<RwPgResponse> {
53 let session = handler_args.session;
54 let db_name = &session.database();
55 let (src_schema_name, src_obj_name) =
56 Binder::resolve_schema_qualified_name(db_name, &source_object)?;
57 let search_path = session.config().search_path();
58 let user_name = &session.user_name();
59 let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
60 let (target_schema_name, target_obj_name) =
61 Binder::resolve_schema_qualified_name(db_name, &target_object)?;
62 let target_schema_path =
63 SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name);
64
65 let obj = match stmt_type {
66 StatementType::ALTER_SCHEMA => {
67 bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet");
69 }
70 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
71 let catalog_reader = session.env().catalog_reader().read_guard();
72 let (src_table, _) = catalog_reader.get_created_table_by_name(
73 db_name,
74 src_schema_path,
75 &src_obj_name,
76 )?;
77 let (target_table, _) = catalog_reader.get_created_table_by_name(
78 db_name,
79 target_schema_path,
80 &target_obj_name,
81 )?;
82
83 if src_table.table_type != target_table.table_type {
84 return Err(ErrorCode::PermissionDenied(format!(
85 "cannot swap between {} and {}: type mismatch",
86 src_obj_name, target_obj_name
87 ))
88 .into());
89 }
90 if stmt_type == StatementType::ALTER_TABLE && !src_table.is_user_table() {
91 return Err(CatalogError::not_found("table", &src_obj_name).into());
92 } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
93 return Err(CatalogError::not_found("materialized view", &src_obj_name).into());
94 }
95
96 check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?;
97
98 (src_table.id, target_table.id).into()
99 }
100 StatementType::ALTER_VIEW => {
101 let catalog_reader = session.env().catalog_reader().read_guard();
102 let (src_view, _) =
103 catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?;
104 let (target_view, _) =
105 catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?;
106 check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?;
107
108 (src_view.id, target_view.id).into()
109 }
110 StatementType::ALTER_SOURCE => {
111 let catalog_reader = session.env().catalog_reader().read_guard();
112 let (src_source, _) =
113 catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?;
114 let (target_source, _) =
115 catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?;
116 check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?;
117
118 (src_source.id, target_source.id).into()
119 }
120 StatementType::ALTER_SINK => {
121 let catalog_reader = session.env().catalog_reader().read_guard();
122 let (src_sink, _) =
123 catalog_reader.get_created_sink_by_name(db_name, src_schema_path, &src_obj_name)?;
124 let (target_sink, _) = catalog_reader.get_created_sink_by_name(
125 db_name,
126 target_schema_path,
127 &target_obj_name,
128 )?;
129 check_swap_rename_privilege(&session, src_sink.owner, target_sink.owner)?;
130
131 (src_sink.id, target_sink.id).into()
132 }
133 StatementType::ALTER_SUBSCRIPTION => {
134 let catalog_reader = session.env().catalog_reader().read_guard();
135 let (src_subscription, _) =
136 catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?;
137 let (target_subscription, _) = catalog_reader.get_subscription_by_name(
138 db_name,
139 target_schema_path,
140 &target_obj_name,
141 )?;
142 check_swap_rename_privilege(
143 &session,
144 src_subscription.owner,
145 target_subscription.owner,
146 )?;
147
148 (src_subscription.id, target_subscription.id).into()
149 }
150 _ => {
151 unreachable!("handle_swap_rename: unsupported statement type")
152 }
153 };
154
155 let catalog_writer = session.catalog_writer()?;
156 catalog_writer.alter_swap_rename(obj).await?;
157
158 Ok(RwPgResponse::empty_result(stmt_type))
159}