risingwave_frontend/handler/
alter_set_schema.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_pb::ddl_service::alter_set_schema_request::Object;
17use risingwave_sqlparser::ast::{ObjectName, OperateFunctionArg};
18
19use super::{HandlerArgs, RwPgResponse};
20use crate::catalog::root_catalog::SchemaPath;
21use crate::error::{ErrorCode, Result};
22use crate::{Binder, bind_data_type};
23
24// Steps for validation:
25// 1. Check permission to alter original object.
26// 2. Check duplicate name in the new schema.
27// 3. Check permission to create in the new schema.
28
29/// Handle `ALTER [TABLE | [MATERIALIZED] VIEW | SOURCE | SINK | CONNECTION | FUNCTION] <name> SET SCHEMA <schema_name>` statements.
30pub async fn handle_alter_set_schema(
31    handler_args: HandlerArgs,
32    obj_name: ObjectName,
33    new_schema_name: ObjectName,
34    stmt_type: StatementType,
35    func_args: Option<Vec<OperateFunctionArg>>,
36) -> Result<RwPgResponse> {
37    let session = handler_args.session;
38    let db_name = &session.database();
39    let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
40    let search_path = session.config().search_path();
41    let user_name = &session.user_name();
42    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
43
44    let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
45    let object = {
46        let catalog_reader = session.env().catalog_reader().read_guard();
47
48        match stmt_type {
49            StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
50                let (table, old_schema_name) = catalog_reader.get_created_table_by_name(
51                    db_name,
52                    schema_path,
53                    &real_obj_name,
54                )?;
55                if old_schema_name == new_schema_name {
56                    return Ok(RwPgResponse::empty_result(stmt_type));
57                }
58                session.check_privilege_for_drop_alter(old_schema_name, &**table)?;
59                catalog_reader.check_relation_name_duplicated(
60                    db_name,
61                    &new_schema_name,
62                    table.name(),
63                )?;
64                Object::TableId(table.id.table_id)
65            }
66            StatementType::ALTER_VIEW => {
67                let (view, old_schema_name) =
68                    catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
69                if old_schema_name == new_schema_name {
70                    return Ok(RwPgResponse::empty_result(stmt_type));
71                }
72                session.check_privilege_for_drop_alter(old_schema_name, &**view)?;
73                catalog_reader.check_relation_name_duplicated(
74                    db_name,
75                    &new_schema_name,
76                    view.name(),
77                )?;
78                Object::ViewId(view.id)
79            }
80            StatementType::ALTER_SOURCE => {
81                let (source, old_schema_name) =
82                    catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
83                if old_schema_name == new_schema_name {
84                    return Ok(RwPgResponse::empty_result(stmt_type));
85                }
86                session.check_privilege_for_drop_alter(old_schema_name, &**source)?;
87                catalog_reader.check_relation_name_duplicated(
88                    db_name,
89                    &new_schema_name,
90                    &source.name,
91                )?;
92                Object::SourceId(source.id)
93            }
94            StatementType::ALTER_SINK => {
95                let (sink, old_schema_name) = catalog_reader.get_created_sink_by_name(
96                    db_name,
97                    schema_path,
98                    &real_obj_name,
99                )?;
100                if old_schema_name == new_schema_name {
101                    return Ok(RwPgResponse::empty_result(stmt_type));
102                }
103                session.check_privilege_for_drop_alter(old_schema_name, &**sink)?;
104                catalog_reader.check_relation_name_duplicated(
105                    db_name,
106                    &new_schema_name,
107                    &sink.name,
108                )?;
109                Object::SinkId(sink.id.sink_id)
110            }
111            StatementType::ALTER_SUBSCRIPTION => {
112                let (subscription, old_schema_name) = catalog_reader.get_subscription_by_name(
113                    db_name,
114                    schema_path,
115                    &real_obj_name,
116                )?;
117                if old_schema_name == new_schema_name {
118                    return Ok(RwPgResponse::empty_result(stmt_type));
119                }
120                session.check_privilege_for_drop_alter(old_schema_name, &**subscription)?;
121                catalog_reader.check_relation_name_duplicated(
122                    db_name,
123                    &new_schema_name,
124                    &subscription.name,
125                )?;
126                Object::SubscriptionId(subscription.id.subscription_id)
127            }
128            StatementType::ALTER_CONNECTION => {
129                let (connection, old_schema_name) =
130                    catalog_reader.get_connection_by_name(db_name, schema_path, &real_obj_name)?;
131                if old_schema_name == new_schema_name {
132                    return Ok(RwPgResponse::empty_result(stmt_type));
133                }
134                session.check_privilege_for_drop_alter(old_schema_name, &**connection)?;
135                catalog_reader.check_connection_name_duplicated(
136                    db_name,
137                    &new_schema_name,
138                    &connection.name,
139                )?;
140                Object::ConnectionId(connection.id)
141            }
142            StatementType::ALTER_FUNCTION => {
143                let (function, old_schema_name) = if let Some(args) = func_args {
144                    let mut arg_types = Vec::with_capacity(args.len());
145                    for arg in args {
146                        arg_types.push(bind_data_type(&arg.data_type)?);
147                    }
148                    catalog_reader.get_function_by_name_args(
149                        db_name,
150                        schema_path,
151                        &real_obj_name,
152                        &arg_types,
153                    )?
154                } else {
155                    let (functions, old_schema_name) = catalog_reader.get_functions_by_name(
156                        db_name,
157                        schema_path,
158                        &real_obj_name,
159                    )?;
160                    if functions.len() > 1 {
161                        return Err(ErrorCode::CatalogError(format!("function name {real_obj_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously.").into()).into());
162                    }
163                    (
164                        functions.into_iter().next().expect("no functions"),
165                        old_schema_name,
166                    )
167                };
168                if old_schema_name == new_schema_name {
169                    return Ok(RwPgResponse::empty_result(stmt_type));
170                }
171                session.check_privilege_for_drop_alter(old_schema_name, &**function)?;
172                catalog_reader.check_function_name_duplicated(
173                    db_name,
174                    &new_schema_name,
175                    &function.name,
176                    &function.arg_types,
177                )?;
178                Object::FunctionId(function.id.function_id())
179            }
180            _ => unreachable!(),
181        }
182    };
183
184    let (_, new_schema_id) =
185        session.get_database_and_schema_id_for_create(Some(new_schema_name))?;
186
187    let catalog_writer = session.catalog_writer()?;
188    catalog_writer
189        .alter_set_schema(object, new_schema_id)
190        .await?;
191
192    Ok(RwPgResponse::empty_result(stmt_type))
193}