risingwave_frontend/handler/
alter_set_schema.rs1use pgwire::pg_response::StatementType;
16use risingwave_pb::ddl_service::alter_set_schema_request::Object;
17use risingwave_sqlparser::ast::{ObjectName, OperateFunctionArg};
18
19use super::{HandlerArgs, RwPgResponse};
20use crate::catalog::root_catalog::SchemaPath;
21use crate::error::{ErrorCode, Result};
22use crate::{Binder, bind_data_type};
23
24pub async fn handle_alter_set_schema(
31 handler_args: HandlerArgs,
32 obj_name: ObjectName,
33 new_schema_name: ObjectName,
34 stmt_type: StatementType,
35 func_args: Option<Vec<OperateFunctionArg>>,
36) -> Result<RwPgResponse> {
37 let session = handler_args.session;
38 let db_name = &session.database();
39 let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
40 let search_path = session.config().search_path();
41 let user_name = &session.user_name();
42 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
43
44 let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
45 let object = {
46 let catalog_reader = session.env().catalog_reader().read_guard();
47
48 match stmt_type {
49 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
50 let (table, old_schema_name) = catalog_reader.get_created_table_by_name(
51 db_name,
52 schema_path,
53 &real_obj_name,
54 )?;
55 if old_schema_name == new_schema_name {
56 return Ok(RwPgResponse::empty_result(stmt_type));
57 }
58 session.check_privilege_for_drop_alter(old_schema_name, &**table)?;
59 catalog_reader.check_relation_name_duplicated(
60 db_name,
61 &new_schema_name,
62 table.name(),
63 )?;
64 Object::TableId(table.id.table_id)
65 }
66 StatementType::ALTER_VIEW => {
67 let (view, old_schema_name) =
68 catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
69 if old_schema_name == new_schema_name {
70 return Ok(RwPgResponse::empty_result(stmt_type));
71 }
72 session.check_privilege_for_drop_alter(old_schema_name, &**view)?;
73 catalog_reader.check_relation_name_duplicated(
74 db_name,
75 &new_schema_name,
76 view.name(),
77 )?;
78 Object::ViewId(view.id)
79 }
80 StatementType::ALTER_SOURCE => {
81 let (source, old_schema_name) =
82 catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
83 if old_schema_name == new_schema_name {
84 return Ok(RwPgResponse::empty_result(stmt_type));
85 }
86 session.check_privilege_for_drop_alter(old_schema_name, &**source)?;
87 catalog_reader.check_relation_name_duplicated(
88 db_name,
89 &new_schema_name,
90 &source.name,
91 )?;
92 Object::SourceId(source.id)
93 }
94 StatementType::ALTER_SINK => {
95 let (sink, old_schema_name) = catalog_reader.get_created_sink_by_name(
96 db_name,
97 schema_path,
98 &real_obj_name,
99 )?;
100 if old_schema_name == new_schema_name {
101 return Ok(RwPgResponse::empty_result(stmt_type));
102 }
103 session.check_privilege_for_drop_alter(old_schema_name, &**sink)?;
104 catalog_reader.check_relation_name_duplicated(
105 db_name,
106 &new_schema_name,
107 &sink.name,
108 )?;
109 Object::SinkId(sink.id.sink_id)
110 }
111 StatementType::ALTER_SUBSCRIPTION => {
112 let (subscription, old_schema_name) = catalog_reader.get_subscription_by_name(
113 db_name,
114 schema_path,
115 &real_obj_name,
116 )?;
117 if old_schema_name == new_schema_name {
118 return Ok(RwPgResponse::empty_result(stmt_type));
119 }
120 session.check_privilege_for_drop_alter(old_schema_name, &**subscription)?;
121 catalog_reader.check_relation_name_duplicated(
122 db_name,
123 &new_schema_name,
124 &subscription.name,
125 )?;
126 Object::SubscriptionId(subscription.id.subscription_id)
127 }
128 StatementType::ALTER_CONNECTION => {
129 let (connection, old_schema_name) =
130 catalog_reader.get_connection_by_name(db_name, schema_path, &real_obj_name)?;
131 if old_schema_name == new_schema_name {
132 return Ok(RwPgResponse::empty_result(stmt_type));
133 }
134 session.check_privilege_for_drop_alter(old_schema_name, &**connection)?;
135 catalog_reader.check_connection_name_duplicated(
136 db_name,
137 &new_schema_name,
138 &connection.name,
139 )?;
140 Object::ConnectionId(connection.id)
141 }
142 StatementType::ALTER_FUNCTION => {
143 let (function, old_schema_name) = if let Some(args) = func_args {
144 let mut arg_types = Vec::with_capacity(args.len());
145 for arg in args {
146 arg_types.push(bind_data_type(&arg.data_type)?);
147 }
148 catalog_reader.get_function_by_name_args(
149 db_name,
150 schema_path,
151 &real_obj_name,
152 &arg_types,
153 )?
154 } else {
155 let (functions, old_schema_name) = catalog_reader.get_functions_by_name(
156 db_name,
157 schema_path,
158 &real_obj_name,
159 )?;
160 if functions.len() > 1 {
161 return Err(ErrorCode::CatalogError(format!("function name {real_obj_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously.").into()).into());
162 }
163 (
164 functions.into_iter().next().expect("no functions"),
165 old_schema_name,
166 )
167 };
168 if old_schema_name == new_schema_name {
169 return Ok(RwPgResponse::empty_result(stmt_type));
170 }
171 session.check_privilege_for_drop_alter(old_schema_name, &**function)?;
172 catalog_reader.check_function_name_duplicated(
173 db_name,
174 &new_schema_name,
175 &function.name,
176 &function.arg_types,
177 )?;
178 Object::FunctionId(function.id.function_id())
179 }
180 _ => unreachable!(),
181 }
182 };
183
184 let (_, new_schema_id) =
185 session.get_database_and_schema_id_for_create(Some(new_schema_name))?;
186
187 let catalog_writer = session.catalog_writer()?;
188 catalog_writer
189 .alter_set_schema(object, new_schema_id)
190 .await?;
191
192 Ok(RwPgResponse::empty_result(stmt_type))
193}