risingwave_frontend/handler/
alter_set_schema.rs1use pgwire::pg_response::StatementType;
16use risingwave_sqlparser::ast::{ObjectName, OperateFunctionArg};
17
18use super::{HandlerArgs, RwPgResponse};
19use crate::catalog::root_catalog::SchemaPath;
20use crate::error::{ErrorCode, Result};
21use crate::{Binder, bind_data_type};
22
23pub async fn handle_alter_set_schema(
30 handler_args: HandlerArgs,
31 obj_name: ObjectName,
32 new_schema_name: ObjectName,
33 stmt_type: StatementType,
34 func_args: Option<Vec<OperateFunctionArg>>,
35) -> Result<RwPgResponse> {
36 let session = handler_args.session;
37 let db_name = &session.database();
38 let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
39 let search_path = session.config().search_path();
40 let user_name = &session.user_name();
41 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
42
43 let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
44 let object = {
45 let catalog_reader = session.env().catalog_reader().read_guard();
46
47 match stmt_type {
48 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
49 let (table, old_schema_name) = catalog_reader.get_created_table_by_name(
50 db_name,
51 schema_path,
52 &real_obj_name,
53 )?;
54 if old_schema_name == new_schema_name {
55 return Ok(RwPgResponse::empty_result(stmt_type));
56 }
57 session.check_privilege_for_drop_alter(old_schema_name, &**table)?;
58 catalog_reader.check_relation_name_duplicated(
59 db_name,
60 &new_schema_name,
61 table.name(),
62 )?;
63 table.id.into()
64 }
65 StatementType::ALTER_VIEW => {
66 let (view, old_schema_name) =
67 catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
68 if old_schema_name == new_schema_name {
69 return Ok(RwPgResponse::empty_result(stmt_type));
70 }
71 session.check_privilege_for_drop_alter(old_schema_name, &**view)?;
72 catalog_reader.check_relation_name_duplicated(
73 db_name,
74 &new_schema_name,
75 view.name(),
76 )?;
77 view.id.into()
78 }
79 StatementType::ALTER_SOURCE => {
80 let (source, old_schema_name) =
81 catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
82 if old_schema_name == new_schema_name {
83 return Ok(RwPgResponse::empty_result(stmt_type));
84 }
85 session.check_privilege_for_drop_alter(old_schema_name, &**source)?;
86 catalog_reader.check_relation_name_duplicated(
87 db_name,
88 &new_schema_name,
89 &source.name,
90 )?;
91 source.id.into()
92 }
93 StatementType::ALTER_SINK => {
94 let (sink, old_schema_name) = catalog_reader.get_created_sink_by_name(
95 db_name,
96 schema_path,
97 &real_obj_name,
98 )?;
99 if old_schema_name == new_schema_name {
100 return Ok(RwPgResponse::empty_result(stmt_type));
101 }
102 session.check_privilege_for_drop_alter(old_schema_name, &**sink)?;
103 catalog_reader.check_relation_name_duplicated(
104 db_name,
105 &new_schema_name,
106 &sink.name,
107 )?;
108 sink.id.into()
109 }
110 StatementType::ALTER_SUBSCRIPTION => {
111 let (subscription, old_schema_name) = catalog_reader.get_subscription_by_name(
112 db_name,
113 schema_path,
114 &real_obj_name,
115 )?;
116 if old_schema_name == new_schema_name {
117 return Ok(RwPgResponse::empty_result(stmt_type));
118 }
119 session.check_privilege_for_drop_alter(old_schema_name, &**subscription)?;
120 catalog_reader.check_relation_name_duplicated(
121 db_name,
122 &new_schema_name,
123 &subscription.name,
124 )?;
125 subscription.id.into()
126 }
127 StatementType::ALTER_CONNECTION => {
128 let (connection, old_schema_name) =
129 catalog_reader.get_connection_by_name(db_name, schema_path, &real_obj_name)?;
130 if old_schema_name == new_schema_name {
131 return Ok(RwPgResponse::empty_result(stmt_type));
132 }
133 session.check_privilege_for_drop_alter(old_schema_name, &**connection)?;
134 catalog_reader.check_connection_name_duplicated(
135 db_name,
136 &new_schema_name,
137 &connection.name,
138 )?;
139 connection.id.into()
140 }
141 StatementType::ALTER_FUNCTION => {
142 let (function, old_schema_name) = if let Some(args) = func_args {
143 let mut arg_types = Vec::with_capacity(args.len());
144 for arg in args {
145 arg_types.push(bind_data_type(&arg.data_type)?);
146 }
147 catalog_reader.get_function_by_name_args(
148 db_name,
149 schema_path,
150 &real_obj_name,
151 &arg_types,
152 )?
153 } else {
154 let (functions, old_schema_name) = catalog_reader.get_functions_by_name(
155 db_name,
156 schema_path,
157 &real_obj_name,
158 )?;
159 if functions.len() > 1 {
160 return Err(ErrorCode::CatalogError(format!("function name {real_obj_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously.").into()).into());
161 }
162 (
163 functions.into_iter().next().expect("no functions"),
164 old_schema_name,
165 )
166 };
167 if old_schema_name == new_schema_name {
168 return Ok(RwPgResponse::empty_result(stmt_type));
169 }
170 session.check_privilege_for_drop_alter(old_schema_name, &**function)?;
171 catalog_reader.check_function_name_duplicated(
172 db_name,
173 &new_schema_name,
174 &function.name,
175 &function.arg_types,
176 )?;
177 function.id.into()
178 }
179 _ => unreachable!(),
180 }
181 };
182
183 let (_, new_schema_id) =
184 session.get_database_and_schema_id_for_create(Some(new_schema_name))?;
185
186 let catalog_writer = session.catalog_writer()?;
187 catalog_writer
188 .alter_set_schema(object, new_schema_id)
189 .await?;
190
191 Ok(RwPgResponse::empty_result(stmt_type))
192}