risingwave_frontend/handler/
alter_swap_rename.rs1use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::Binder;
22use crate::catalog::CatalogError;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::error::{ErrorCode, Result};
25use crate::handler::{HandlerArgs, RwPgResponse};
26use crate::session::SessionImpl;
27use crate::user::UserId;
28
29fn check_swap_rename_privilege(
31 session_impl: &Arc<SessionImpl>,
32 src_owner: UserId,
33 target_owner: UserId,
34) -> Result<()> {
35 if !session_impl.is_super_user()
36 && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id())
37 {
38 return Err(ErrorCode::PermissionDenied(format!(
39 "{} is not super user and not the owner of the objects.",
40 session_impl.user_name()
41 ))
42 .into());
43 }
44 Ok(())
45}
46
47pub async fn handle_swap_rename(
48 handler_args: HandlerArgs,
49 source_object: ObjectName,
50 target_object: ObjectName,
51 stmt_type: StatementType,
52) -> Result<RwPgResponse> {
53 let session = handler_args.session;
54 let db_name = &session.database();
55 let (src_schema_name, src_obj_name) =
56 Binder::resolve_schema_qualified_name(db_name, &source_object)?;
57 let search_path = session.config().search_path();
58 let user_name = &session.user_name();
59 let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
60 let (target_schema_name, target_obj_name) =
61 Binder::resolve_schema_qualified_name(db_name, &target_object)?;
62 let target_schema_path =
63 SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name);
64
65 let obj = match stmt_type {
66 StatementType::ALTER_SCHEMA => {
67 bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet");
69 }
70 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
71 let catalog_reader = session.env().catalog_reader().read_guard();
72 let (src_table, _) = catalog_reader.get_created_table_by_name(
73 db_name,
74 src_schema_path,
75 &src_obj_name,
76 )?;
77 let (target_table, _) = catalog_reader.get_created_table_by_name(
78 db_name,
79 target_schema_path,
80 &target_obj_name,
81 )?;
82
83 if src_table.table_type != target_table.table_type {
84 return Err(ErrorCode::PermissionDenied(format!(
85 "cannot swap between {} and {}: type mismatch",
86 src_obj_name, target_obj_name
87 ))
88 .into());
89 }
90 if stmt_type == StatementType::ALTER_TABLE && !src_table.is_user_table() {
91 return Err(CatalogError::not_found("table", &src_obj_name).into());
92 } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
93 return Err(CatalogError::not_found("materialized view", &src_obj_name).into());
94 }
95
96 check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?;
97
98 (src_table.id, target_table.id).into()
99 }
100 StatementType::ALTER_VIEW => {
101 let catalog_reader = session.env().catalog_reader().read_guard();
102 let (src_view, _) =
103 catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?;
104 let (target_view, _) =
105 catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?;
106 check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?;
107
108 (src_view.id, target_view.id).into()
109 }
110 StatementType::ALTER_SOURCE => {
111 let catalog_reader = session.env().catalog_reader().read_guard();
112 let (src_source, _) =
113 catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?;
114 let (target_source, _) =
115 catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?;
116 check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?;
117
118 (src_source.id, target_source.id).into()
119 }
120 StatementType::ALTER_SINK => {
121 let catalog_reader = session.env().catalog_reader().read_guard();
122 let (src_sink, _) =
123 catalog_reader.get_created_sink_by_name(db_name, src_schema_path, &src_obj_name)?;
124 let (target_sink, _) = catalog_reader.get_created_sink_by_name(
125 db_name,
126 target_schema_path,
127 &target_obj_name,
128 )?;
129 check_swap_rename_privilege(
130 &session,
131 src_sink.owner.user_id,
132 target_sink.owner.user_id,
133 )?;
134
135 (src_sink.id, target_sink.id).into()
136 }
137 StatementType::ALTER_SUBSCRIPTION => {
138 let catalog_reader = session.env().catalog_reader().read_guard();
139 let (src_subscription, _) =
140 catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?;
141 let (target_subscription, _) = catalog_reader.get_subscription_by_name(
142 db_name,
143 target_schema_path,
144 &target_obj_name,
145 )?;
146 check_swap_rename_privilege(
147 &session,
148 src_subscription.owner.user_id,
149 target_subscription.owner.user_id,
150 )?;
151
152 (src_subscription.id, target_subscription.id).into()
153 }
154 _ => {
155 unreachable!("handle_swap_rename: unsupported statement type")
156 }
157 };
158
159 let catalog_writer = session.catalog_writer()?;
160 catalog_writer.alter_swap_rename(obj).await?;
161
162 Ok(RwPgResponse::empty_result(stmt_type))
163}