risingwave_frontend/handler/
alter_set_schema.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_sqlparser::ast::{ObjectName, OperateFunctionArg};
17
18use super::{HandlerArgs, RwPgResponse};
19use crate::catalog::root_catalog::SchemaPath;
20use crate::error::{ErrorCode, Result};
21use crate::{Binder, bind_data_type};
22
23// Steps for validation:
24// 1. Check permission to alter original object.
25// 2. Check duplicate name in the new schema.
26// 3. Check permission to create in the new schema.
27
28/// Handle `ALTER [TABLE | [MATERIALIZED] VIEW | SOURCE | SINK | CONNECTION | FUNCTION] <name> SET SCHEMA <schema_name>` statements.
29pub async fn handle_alter_set_schema(
30    handler_args: HandlerArgs,
31    obj_name: ObjectName,
32    new_schema_name: ObjectName,
33    stmt_type: StatementType,
34    func_args: Option<Vec<OperateFunctionArg>>,
35) -> Result<RwPgResponse> {
36    let session = handler_args.session;
37    let db_name = &session.database();
38    let (schema_name, real_obj_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
39    let search_path = session.config().search_path();
40    let user_name = &session.user_name();
41    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
42
43    let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
44    let object = {
45        let catalog_reader = session.env().catalog_reader().read_guard();
46
47        match stmt_type {
48            StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
49                let (table, old_schema_name) = catalog_reader.get_created_table_by_name(
50                    db_name,
51                    schema_path,
52                    &real_obj_name,
53                )?;
54                if old_schema_name == new_schema_name {
55                    return Ok(RwPgResponse::empty_result(stmt_type));
56                }
57                session.check_privilege_for_drop_alter(old_schema_name, &**table)?;
58                catalog_reader.check_relation_name_duplicated(
59                    db_name,
60                    &new_schema_name,
61                    table.name(),
62                )?;
63                table.id.into()
64            }
65            StatementType::ALTER_VIEW => {
66                let (view, old_schema_name) =
67                    catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
68                if old_schema_name == new_schema_name {
69                    return Ok(RwPgResponse::empty_result(stmt_type));
70                }
71                session.check_privilege_for_drop_alter(old_schema_name, &**view)?;
72                catalog_reader.check_relation_name_duplicated(
73                    db_name,
74                    &new_schema_name,
75                    view.name(),
76                )?;
77                view.id.into()
78            }
79            StatementType::ALTER_SOURCE => {
80                let (source, old_schema_name) =
81                    catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
82                if old_schema_name == new_schema_name {
83                    return Ok(RwPgResponse::empty_result(stmt_type));
84                }
85                session.check_privilege_for_drop_alter(old_schema_name, &**source)?;
86                catalog_reader.check_relation_name_duplicated(
87                    db_name,
88                    &new_schema_name,
89                    &source.name,
90                )?;
91                source.id.into()
92            }
93            StatementType::ALTER_SINK => {
94                let (sink, old_schema_name) = catalog_reader.get_created_sink_by_name(
95                    db_name,
96                    schema_path,
97                    &real_obj_name,
98                )?;
99                if old_schema_name == new_schema_name {
100                    return Ok(RwPgResponse::empty_result(stmt_type));
101                }
102                session.check_privilege_for_drop_alter(old_schema_name, &**sink)?;
103                catalog_reader.check_relation_name_duplicated(
104                    db_name,
105                    &new_schema_name,
106                    &sink.name,
107                )?;
108                sink.id.into()
109            }
110            StatementType::ALTER_SUBSCRIPTION => {
111                let (subscription, old_schema_name) = catalog_reader.get_subscription_by_name(
112                    db_name,
113                    schema_path,
114                    &real_obj_name,
115                )?;
116                if old_schema_name == new_schema_name {
117                    return Ok(RwPgResponse::empty_result(stmt_type));
118                }
119                session.check_privilege_for_drop_alter(old_schema_name, &**subscription)?;
120                catalog_reader.check_relation_name_duplicated(
121                    db_name,
122                    &new_schema_name,
123                    &subscription.name,
124                )?;
125                subscription.id.into()
126            }
127            StatementType::ALTER_CONNECTION => {
128                let (connection, old_schema_name) =
129                    catalog_reader.get_connection_by_name(db_name, schema_path, &real_obj_name)?;
130                if old_schema_name == new_schema_name {
131                    return Ok(RwPgResponse::empty_result(stmt_type));
132                }
133                session.check_privilege_for_drop_alter(old_schema_name, &**connection)?;
134                catalog_reader.check_connection_name_duplicated(
135                    db_name,
136                    &new_schema_name,
137                    &connection.name,
138                )?;
139                connection.id.into()
140            }
141            StatementType::ALTER_FUNCTION => {
142                let (function, old_schema_name) = if let Some(args) = func_args {
143                    let mut arg_types = Vec::with_capacity(args.len());
144                    for arg in args {
145                        arg_types.push(bind_data_type(&arg.data_type)?);
146                    }
147                    catalog_reader.get_function_by_name_args(
148                        db_name,
149                        schema_path,
150                        &real_obj_name,
151                        &arg_types,
152                    )?
153                } else {
154                    let (functions, old_schema_name) = catalog_reader.get_functions_by_name(
155                        db_name,
156                        schema_path,
157                        &real_obj_name,
158                    )?;
159                    if functions.len() > 1 {
160                        return Err(ErrorCode::CatalogError(format!("function name {real_obj_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously.").into()).into());
161                    }
162                    (
163                        functions.into_iter().next().expect("no functions"),
164                        old_schema_name,
165                    )
166                };
167                if old_schema_name == new_schema_name {
168                    return Ok(RwPgResponse::empty_result(stmt_type));
169                }
170                session.check_privilege_for_drop_alter(old_schema_name, &**function)?;
171                catalog_reader.check_function_name_duplicated(
172                    db_name,
173                    &new_schema_name,
174                    &function.name,
175                    &function.arg_types,
176                )?;
177                function.id.into()
178            }
179            _ => unreachable!(),
180        }
181    };
182
183    let (_, new_schema_id) =
184        session.get_database_and_schema_id_for_create(Some(new_schema_name))?;
185
186    let catalog_writer = session.catalog_writer()?;
187    catalog_writer
188        .alter_set_schema(object, new_schema_id)
189        .await?;
190
191    Ok(RwPgResponse::empty_result(stmt_type))
192}