risingwave_frontend/handler/
alter_table_props.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_expr::bail;
17use risingwave_sqlparser::ast::{Ident, ObjectName, SqlOption};
18
19use super::alter_table_column::fetch_table_catalog_for_alter;
20use super::{HandlerArgs, RwPgResponse};
21use crate::catalog::root_catalog::SchemaPath;
22use crate::error::{ErrorCode, Result};
23use crate::handler::alter_source_props::handle_alter_table_connector_props;
24use crate::utils::resolve_connection_ref_and_secret_ref;
25use crate::{Binder, WithOptions};
26
27pub async fn handle_alter_table_props(
28    handler_args: HandlerArgs,
29    table_name: ObjectName,
30    changed_props: Vec<SqlOption>,
31) -> Result<RwPgResponse> {
32    let session = handler_args.session.clone();
33    let (original_table, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
34    match original_table.engine() {
35        risingwave_common::catalog::Engine::Hummock => {
36            handle_alter_table_connector_props(handler_args, table_name, changed_props).await
37        }
38        risingwave_common::catalog::Engine::Iceberg => {
39            handle_alter_iceberg_table_props(handler_args, table_name, changed_props).await
40        }
41    }
42}
43
44pub async fn handle_alter_iceberg_table_props(
45    handler_args: HandlerArgs,
46    table_name: ObjectName,
47    changed_props: Vec<SqlOption>,
48) -> Result<RwPgResponse> {
49    let session = handler_args.session.clone();
50    let db_name = &session.database();
51    let search_path = session.config().search_path();
52    let user_name = &session.user_name();
53    let (original_table, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
54    let (sink_id, source_id, table_id) = if let Some(sink_name) = original_table.iceberg_sink_name()
55        && let Some(source_name) = original_table.iceberg_source_name()
56    {
57        let mut source_names = table_name.0.clone();
58        let mut sink_names = table_name.0.clone();
59        source_names.pop();
60        sink_names.pop();
61        source_names.push(Ident::new_unchecked(source_name));
62        sink_names.push(Ident::new_unchecked(sink_name));
63        let reader = session.env().catalog_reader().read_guard();
64        let (schema_name, real_table_name) =
65            Binder::resolve_schema_qualified_name(db_name, &table_name)?;
66        let (_schema_name, real_sink_name) =
67            Binder::resolve_schema_qualified_name(db_name, &ObjectName(sink_names))?;
68        let (_schema_name, real_source_name) =
69            Binder::resolve_schema_qualified_name(db_name, &ObjectName(source_names))?;
70        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
71        let (sink, _schema_name) =
72            reader.get_sink_by_name(db_name, schema_path, &real_sink_name, false)?;
73        let (source, _schema_name) =
74            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
75        let (table, schema_name) =
76            reader.get_table_by_name(db_name, schema_path, &real_table_name, false)?;
77        session.check_privilege_for_drop_alter(schema_name, &**sink)?;
78        session.check_privilege_for_drop_alter(schema_name, &**source)?;
79        session.check_privilege_for_drop_alter(schema_name, &**table)?;
80        (sink.id.sink_id, source.id, table.id.table_id)
81    } else {
82        return Err(ErrorCode::NotSupported(
83            "ALTER TABLE With is only supported for iceberg tables".to_owned(),
84            "Try `ALTER TABLE .. ADD/DROP COLUMN ...`".to_owned(),
85        )
86        .into());
87    };
88
89    let meta_client = session.env().meta_client();
90    let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
91        WithOptions::try_from(changed_props.as_ref() as &[SqlOption])?,
92        &session,
93        None,
94    )?;
95    let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
96    if !changed_secret_refs.is_empty() || connector_conn_ref.is_some() {
97        bail!("ALTER ICEBERG TABLE does not support SECRET or CONNECTION now")
98    }
99    meta_client
100        .alter_iceberg_table_props(
101            table_id,
102            sink_id,
103            source_id,
104            changed_props,
105            changed_secret_refs,
106            connector_conn_ref,
107        )
108        .await?;
109
110    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
111}