risingwave_frontend/handler/
alter_set_schema.rs1use pgwire::pg_response::StatementType;
16use risingwave_pb::ddl_service::alter_set_schema_request::Object;
17use risingwave_sqlparser::ast::{ObjectName, OperateFunctionArg};
18
19use super::{HandlerArgs, RwPgResponse};
20use crate::catalog::root_catalog::SchemaPath;
21use crate::error::{ErrorCode, Result};
22use crate::{Binder, bind_data_type};
23
24pub async fn handle_alter_set_schema(
31 handler_args: HandlerArgs,
32 obj_name: ObjectName,
33 new_schema_name: ObjectName,
34 stmt_type: StatementType,
35 func_args: Option<Vec<OperateFunctionArg>>,
36) -> Result<RwPgResponse> {
37 let session = handler_args.session;
38 let db_name = &session.database();
39 let (schema_name, real_obj_name) =
40 Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
41 let search_path = session.config().search_path();
42 let user_name = &session.user_name();
43 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
44
45 let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
46 let object = {
47 let catalog_reader = session.env().catalog_reader().read_guard();
48
49 match stmt_type {
50 StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
51 let (table, old_schema_name) = catalog_reader.get_created_table_by_name(
52 db_name,
53 schema_path,
54 &real_obj_name,
55 )?;
56 if old_schema_name == new_schema_name {
57 return Ok(RwPgResponse::empty_result(stmt_type));
58 }
59 session.check_privilege_for_drop_alter(old_schema_name, &**table)?;
60 catalog_reader.check_relation_name_duplicated(
61 db_name,
62 &new_schema_name,
63 table.name(),
64 )?;
65 Object::TableId(table.id.table_id)
66 }
67 StatementType::ALTER_VIEW => {
68 let (view, old_schema_name) =
69 catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
70 if old_schema_name == new_schema_name {
71 return Ok(RwPgResponse::empty_result(stmt_type));
72 }
73 session.check_privilege_for_drop_alter(old_schema_name, &**view)?;
74 catalog_reader.check_relation_name_duplicated(
75 db_name,
76 &new_schema_name,
77 view.name(),
78 )?;
79 Object::ViewId(view.id)
80 }
81 StatementType::ALTER_SOURCE => {
82 let (source, old_schema_name) =
83 catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
84 if old_schema_name == new_schema_name {
85 return Ok(RwPgResponse::empty_result(stmt_type));
86 }
87 session.check_privilege_for_drop_alter(old_schema_name, &**source)?;
88 catalog_reader.check_relation_name_duplicated(
89 db_name,
90 &new_schema_name,
91 &source.name,
92 )?;
93 Object::SourceId(source.id)
94 }
95 StatementType::ALTER_SINK => {
96 let (sink, old_schema_name) =
97 catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?;
98 if old_schema_name == new_schema_name {
99 return Ok(RwPgResponse::empty_result(stmt_type));
100 }
101 session.check_privilege_for_drop_alter(old_schema_name, &**sink)?;
102 catalog_reader.check_relation_name_duplicated(
103 db_name,
104 &new_schema_name,
105 &sink.name,
106 )?;
107 Object::SinkId(sink.id.sink_id)
108 }
109 StatementType::ALTER_SUBSCRIPTION => {
110 let (subscription, old_schema_name) = catalog_reader.get_subscription_by_name(
111 db_name,
112 schema_path,
113 &real_obj_name,
114 )?;
115 if old_schema_name == new_schema_name {
116 return Ok(RwPgResponse::empty_result(stmt_type));
117 }
118 session.check_privilege_for_drop_alter(old_schema_name, &**subscription)?;
119 catalog_reader.check_relation_name_duplicated(
120 db_name,
121 &new_schema_name,
122 &subscription.name,
123 )?;
124 Object::SubscriptionId(subscription.id.subscription_id)
125 }
126 StatementType::ALTER_CONNECTION => {
127 let (connection, old_schema_name) =
128 catalog_reader.get_connection_by_name(db_name, schema_path, &real_obj_name)?;
129 if old_schema_name == new_schema_name {
130 return Ok(RwPgResponse::empty_result(stmt_type));
131 }
132 session.check_privilege_for_drop_alter(old_schema_name, &**connection)?;
133 catalog_reader.check_connection_name_duplicated(
134 db_name,
135 &new_schema_name,
136 &connection.name,
137 )?;
138 Object::ConnectionId(connection.id)
139 }
140 StatementType::ALTER_FUNCTION => {
141 let (function, old_schema_name) = if let Some(args) = func_args {
142 let mut arg_types = Vec::with_capacity(args.len());
143 for arg in args {
144 arg_types.push(bind_data_type(&arg.data_type)?);
145 }
146 catalog_reader.get_function_by_name_args(
147 db_name,
148 schema_path,
149 &real_obj_name,
150 &arg_types,
151 )?
152 } else {
153 let (functions, old_schema_name) = catalog_reader.get_functions_by_name(
154 db_name,
155 schema_path,
156 &real_obj_name,
157 )?;
158 if functions.len() > 1 {
159 return Err(ErrorCode::CatalogError(format!("function name {real_obj_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously.").into()).into());
160 }
161 (
162 functions.into_iter().next().expect("no functions"),
163 old_schema_name,
164 )
165 };
166 if old_schema_name == new_schema_name {
167 return Ok(RwPgResponse::empty_result(stmt_type));
168 }
169 session.check_privilege_for_drop_alter(old_schema_name, &**function)?;
170 catalog_reader.check_function_name_duplicated(
171 db_name,
172 &new_schema_name,
173 &function.name,
174 &function.arg_types,
175 )?;
176 Object::FunctionId(function.id.function_id())
177 }
178 _ => unreachable!(),
179 }
180 };
181
182 let (_, new_schema_id) =
183 session.get_database_and_schema_id_for_create(Some(new_schema_name))?;
184
185 let catalog_writer = session.catalog_writer()?;
186 catalog_writer
187 .alter_set_schema(object, new_schema_id)
188 .await?;
189
190 Ok(RwPgResponse::empty_result(stmt_type))
191}