risingwave_frontend/handler/
alter_swap_rename.rs1use std::sync::Arc;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::Binder;
22use crate::catalog::CatalogError;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::error::{ErrorCode, Result};
25use crate::handler::{HandlerArgs, RwPgResponse};
26use crate::session::SessionImpl;
27use crate::user::UserId;
28
29fn check_swap_rename_privilege(
31 session_impl: &Arc<SessionImpl>,
32 src_owner: UserId,
33 target_owner: UserId,
34) -> Result<()> {
35 if !session_impl.is_super_user()
36 && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id())
37 {
38 return Err(ErrorCode::PermissionDenied(format!(
39 "{} is not super user and not the owner of the objects.",
40 session_impl.user_name()
41 ))
42 .into());
43 }
44 Ok(())
45}
46
47pub async fn handle_swap_rename(
48 handler_args: HandlerArgs,
49 source_object: ObjectName,
50 target_object: ObjectName,
51 stmt_type: StatementType,
52) -> Result<RwPgResponse> {
53 let session = handler_args.session;
54 let db_name = &session.database();
55 let (src_schema_name, src_obj_name) =
56 Binder::resolve_schema_qualified_name(db_name, &source_object)?;
57 let search_path = session.config().search_path();
58 let user_name = &session.user_name();
59 let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
60 let (target_schema_name, target_obj_name) =
61 Binder::resolve_schema_qualified_name(db_name, &target_object)?;
62 let target_schema_path =
63 SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name);
64
65 let obj = match stmt_type {
66 StatementType::ALTER_SCHEMA => {
67 bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet");
69 }
70 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
71 let catalog_reader = session.env().catalog_reader().read_guard();
72 let (src_table, _) = catalog_reader.get_created_table_by_name(
73 db_name,
74 src_schema_path,
75 &src_obj_name,
76 )?;
77 let (target_table, _) = catalog_reader.get_created_table_by_name(
78 db_name,
79 target_schema_path,
80 &target_obj_name,
81 )?;
82
83 if src_table.table_type != target_table.table_type {
84 return Err(ErrorCode::PermissionDenied(format!(
85 "cannot swap between {} and {}: type mismatch",
86 src_obj_name, target_obj_name
87 ))
88 .into());
89 }
90 if stmt_type == StatementType::ALTER_TABLE && !src_table.is_user_table() {
91 return Err(CatalogError::NotFound("table", src_obj_name.clone()).into());
92 } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
93 return Err(
94 CatalogError::NotFound("materialized view", src_obj_name.clone()).into(),
95 );
96 }
97
98 check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?;
99
100 (src_table.id, target_table.id).into()
101 }
102 StatementType::ALTER_VIEW => {
103 let catalog_reader = session.env().catalog_reader().read_guard();
104 let (src_view, _) =
105 catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?;
106 let (target_view, _) =
107 catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?;
108 check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?;
109
110 (src_view.id, target_view.id).into()
111 }
112 StatementType::ALTER_SOURCE => {
113 let catalog_reader = session.env().catalog_reader().read_guard();
114 let (src_source, _) =
115 catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?;
116 let (target_source, _) =
117 catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?;
118 check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?;
119
120 (src_source.id, target_source.id).into()
121 }
122 StatementType::ALTER_SINK => {
123 let catalog_reader = session.env().catalog_reader().read_guard();
124 let (src_sink, _) =
125 catalog_reader.get_created_sink_by_name(db_name, src_schema_path, &src_obj_name)?;
126 let (target_sink, _) = catalog_reader.get_created_sink_by_name(
127 db_name,
128 target_schema_path,
129 &target_obj_name,
130 )?;
131 check_swap_rename_privilege(
132 &session,
133 src_sink.owner.user_id,
134 target_sink.owner.user_id,
135 )?;
136
137 (src_sink.id, target_sink.id).into()
138 }
139 StatementType::ALTER_SUBSCRIPTION => {
140 let catalog_reader = session.env().catalog_reader().read_guard();
141 let (src_subscription, _) =
142 catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?;
143 let (target_subscription, _) = catalog_reader.get_subscription_by_name(
144 db_name,
145 target_schema_path,
146 &target_obj_name,
147 )?;
148 check_swap_rename_privilege(
149 &session,
150 src_subscription.owner.user_id,
151 target_subscription.owner.user_id,
152 )?;
153
154 (src_subscription.id, target_subscription.id).into()
155 }
156 _ => {
157 unreachable!("handle_swap_rename: unsupported statement type")
158 }
159 };
160
161 let catalog_writer = session.catalog_writer()?;
162 catalog_writer.alter_swap_rename(obj).await?;
163
164 Ok(RwPgResponse::empty_result(stmt_type))
165}