risingwave_frontend/handler/
alter_table_props.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_expr::bail;
17use risingwave_sqlparser::ast::{Ident, ObjectName, SqlOption};
18
19use super::alter_table_column::fetch_table_catalog_for_alter;
20use super::{HandlerArgs, RwPgResponse};
21use crate::catalog::root_catalog::SchemaPath;
22use crate::error::{ErrorCode, Result};
23use crate::handler::alter_source_props::handle_alter_table_connector_props;
24use crate::utils::resolve_connection_ref_and_secret_ref;
25use crate::{Binder, WithOptions};
26
27pub async fn handle_alter_table_props(
28 handler_args: HandlerArgs,
29 table_name: ObjectName,
30 changed_props: Vec<SqlOption>,
31) -> Result<RwPgResponse> {
32 let session = handler_args.session.clone();
33 let (original_table, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
34 match original_table.engine() {
35 risingwave_common::catalog::Engine::Hummock => {
36 handle_alter_table_connector_props(handler_args, table_name, changed_props).await
37 }
38 risingwave_common::catalog::Engine::Iceberg => {
39 handle_alter_iceberg_table_props(handler_args, table_name, changed_props).await
40 }
41 }
42}
43
44pub async fn handle_alter_iceberg_table_props(
45 handler_args: HandlerArgs,
46 table_name: ObjectName,
47 changed_props: Vec<SqlOption>,
48) -> Result<RwPgResponse> {
49 let session = handler_args.session.clone();
50 let db_name = &session.database();
51 let search_path = session.config().search_path();
52 let user_name = &session.user_name();
53 let (original_table, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
54 let (sink_id, source_id, table_id) = if let Some(sink_name) = original_table.iceberg_sink_name()
55 && let Some(source_name) = original_table.iceberg_source_name()
56 {
57 let mut source_names = table_name.0.clone();
58 let mut sink_names = table_name.0.clone();
59 source_names.pop();
60 sink_names.pop();
61 source_names.push(Ident::new_unchecked(source_name));
62 sink_names.push(Ident::new_unchecked(sink_name));
63 let reader = session.env().catalog_reader().read_guard();
64 let (schema_name, real_table_name) =
65 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
66 let (_schema_name, real_sink_name) =
67 Binder::resolve_schema_qualified_name(db_name, &ObjectName(sink_names))?;
68 let (_schema_name, real_source_name) =
69 Binder::resolve_schema_qualified_name(db_name, &ObjectName(source_names))?;
70 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
71 let (sink, _schema_name) =
72 reader.get_sink_by_name(db_name, schema_path, &real_sink_name, false)?;
73 let (source, _schema_name) =
74 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
75 let (table, schema_name) =
76 reader.get_table_by_name(db_name, schema_path, &real_table_name, false)?;
77 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
78 session.check_privilege_for_drop_alter(schema_name, &**source)?;
79 session.check_privilege_for_drop_alter(schema_name, &**table)?;
80 (sink.id.sink_id, source.id, table.id.table_id)
81 } else {
82 return Err(ErrorCode::NotSupported(
83 "ALTER TABLE With is only supported for iceberg tables".to_owned(),
84 "Try `ALTER TABLE .. ADD/DROP COLUMN ...`".to_owned(),
85 )
86 .into());
87 };
88
89 let meta_client = session.env().meta_client();
90 let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
91 WithOptions::try_from(changed_props.as_ref() as &[SqlOption])?,
92 &session,
93 None,
94 )?;
95 let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
96 if !changed_secret_refs.is_empty() || connector_conn_ref.is_some() {
97 bail!("ALTER ICEBERG TABLE does not support SECRET or CONNECTION now")
98 }
99 meta_client
100 .alter_iceberg_table_props(
101 table_id,
102 sink_id,
103 source_id,
104 changed_props,
105 changed_secret_refs,
106 connector_conn_ref,
107 )
108 .await?;
109
110 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
111}