risingwave_frontend/handler/
alter_swap_rename.rs1use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::bail_not_implemented;
19use risingwave_pb::ddl_service::alter_swap_rename_request;
20use risingwave_pb::ddl_service::alter_swap_rename_request::ObjectNameSwapPair;
21use risingwave_sqlparser::ast::ObjectName;
22
23use crate::Binder;
24use crate::catalog::CatalogError;
25use crate::catalog::root_catalog::SchemaPath;
26use crate::error::{ErrorCode, Result};
27use crate::handler::{HandlerArgs, RwPgResponse};
28use crate::session::SessionImpl;
29use crate::user::UserId;
30
31fn check_swap_rename_privilege(
33 session_impl: &Arc<SessionImpl>,
34 src_owner: UserId,
35 target_owner: UserId,
36) -> Result<()> {
37 if !session_impl.is_super_user()
38 && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id())
39 {
40 return Err(ErrorCode::PermissionDenied(format!(
41 "{} is not super user and not the owner of the objects.",
42 session_impl.user_name()
43 ))
44 .into());
45 }
46 Ok(())
47}
48
49pub async fn handle_swap_rename(
50 handler_args: HandlerArgs,
51 source_object: ObjectName,
52 target_object: ObjectName,
53 stmt_type: StatementType,
54) -> Result<RwPgResponse> {
55 let session = handler_args.session;
56 let db_name = &session.database();
57 let (src_schema_name, src_obj_name) =
58 Binder::resolve_schema_qualified_name(db_name, source_object)?;
59 let search_path = session.config().search_path();
60 let user_name = &session.user_name();
61 let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
62 let (target_schema_name, target_obj_name) =
63 Binder::resolve_schema_qualified_name(db_name, target_object)?;
64 let target_schema_path =
65 SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name);
66
67 let obj = match stmt_type {
68 StatementType::ALTER_SCHEMA => {
69 bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet");
71 }
72 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
73 let catalog_reader = session.env().catalog_reader().read_guard();
74 let (src_table, _) = catalog_reader.get_created_table_by_name(
75 db_name,
76 src_schema_path,
77 &src_obj_name,
78 )?;
79 let (target_table, _) = catalog_reader.get_created_table_by_name(
80 db_name,
81 target_schema_path,
82 &target_obj_name,
83 )?;
84
85 if src_table.table_type != target_table.table_type {
86 return Err(ErrorCode::PermissionDenied(format!(
87 "cannot swap between {} and {}: type mismatch",
88 src_obj_name, target_obj_name
89 ))
90 .into());
91 }
92 if stmt_type == StatementType::ALTER_TABLE && !src_table.is_user_table() {
93 return Err(CatalogError::NotFound("table", src_obj_name.clone()).into());
94 } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
95 return Err(
96 CatalogError::NotFound("materialized view", src_obj_name.clone()).into(),
97 );
98 }
99
100 check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?;
101
102 alter_swap_rename_request::Object::Table(ObjectNameSwapPair {
103 src_object_id: src_table.id.table_id,
104 dst_object_id: target_table.id.table_id,
105 })
106 }
107 StatementType::ALTER_VIEW => {
108 let catalog_reader = session.env().catalog_reader().read_guard();
109 let (src_view, _) =
110 catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?;
111 let (target_view, _) =
112 catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?;
113 check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?;
114
115 alter_swap_rename_request::Object::View(ObjectNameSwapPair {
116 src_object_id: src_view.id,
117 dst_object_id: target_view.id,
118 })
119 }
120 StatementType::ALTER_SOURCE => {
121 let catalog_reader = session.env().catalog_reader().read_guard();
122 let (src_source, _) =
123 catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?;
124 let (target_source, _) =
125 catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?;
126 check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?;
127
128 alter_swap_rename_request::Object::Source(ObjectNameSwapPair {
129 src_object_id: src_source.id,
130 dst_object_id: target_source.id,
131 })
132 }
133 StatementType::ALTER_SINK => {
134 let catalog_reader = session.env().catalog_reader().read_guard();
135 let (src_sink, _) =
136 catalog_reader.get_sink_by_name(db_name, src_schema_path, &src_obj_name)?;
137 let (target_sink, _) =
138 catalog_reader.get_sink_by_name(db_name, target_schema_path, &target_obj_name)?;
139 check_swap_rename_privilege(
140 &session,
141 src_sink.owner.user_id,
142 target_sink.owner.user_id,
143 )?;
144
145 alter_swap_rename_request::Object::Sink(ObjectNameSwapPair {
146 src_object_id: src_sink.id.sink_id,
147 dst_object_id: target_sink.id.sink_id,
148 })
149 }
150 StatementType::ALTER_SUBSCRIPTION => {
151 let catalog_reader = session.env().catalog_reader().read_guard();
152 let (src_subscription, _) =
153 catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?;
154 let (target_subscription, _) = catalog_reader.get_subscription_by_name(
155 db_name,
156 target_schema_path,
157 &target_obj_name,
158 )?;
159 check_swap_rename_privilege(
160 &session,
161 src_subscription.owner.user_id,
162 target_subscription.owner.user_id,
163 )?;
164
165 alter_swap_rename_request::Object::Subscription(ObjectNameSwapPair {
166 src_object_id: src_subscription.id.subscription_id,
167 dst_object_id: target_subscription.id.subscription_id,
168 })
169 }
170 _ => {
171 unreachable!("handle_swap_rename: unsupported statement type")
172 }
173 };
174
175 let catalog_writer = session.catalog_writer()?;
176 catalog_writer.alter_swap_rename(obj).await?;
177
178 Ok(RwPgResponse::empty_result(stmt_type))
179}