risingwave_frontend/handler/
alter_source_props.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use risingwave_pb::catalog::connection::Info::ConnectionParams;
15
16use super::RwPgResponse;
17use crate::catalog::catalog_service::CatalogReadGuard;
18use crate::catalog::root_catalog::SchemaPath;
19use crate::error::{ErrorCode, Result};
20use crate::handler::{HandlerArgs, ObjectName, SqlOption, StatementType};
21use crate::session::SessionImpl;
22use crate::utils::resolve_connection_ref_and_secret_ref;
23use crate::{Binder, WithOptions};
24
25pub async fn handle_alter_table_connector_props(
26    handler_args: HandlerArgs,
27    table_name: ObjectName,
28    alter_props: Vec<SqlOption>,
29) -> Result<RwPgResponse> {
30    let session = handler_args.session;
31    let db_name = &session.database();
32    let (schema_name, real_table_name) =
33        Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
34    let search_path = session.config().search_path();
35    let user_name = &session.user_name();
36    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
37
38    let source_id = {
39        let reader = session.env().catalog_reader().read_guard();
40        let (table, schema_name) =
41            reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
42        let Some(associate_source_id) = table.associated_source_id else {
43            return Err(ErrorCode::InvalidInputSyntax(
44                "Only table with connector can use ALTER TABLE CONNECTOR syntax.".to_owned(),
45            )
46            .into());
47        };
48
49        session.check_privilege_for_drop_alter(schema_name, &**table)?;
50        let (source_catalog, _) =
51            reader.get_source_by_id(db_name, schema_path, &associate_source_id.table_id)?;
52
53        ensure_alter_props_not_set_by_connection(
54            &reader,
55            db_name,
56            source_catalog.connection_id,
57            &alter_props,
58        )?;
59
60        tracing::info!(
61            "handle_alter_table_connector_props: table_name: {}, table id: {}, source_id: {}",
62            real_table_name,
63            table.id,
64            associate_source_id.table_id
65        );
66
67        associate_source_id.table_id
68    };
69
70    handle_alter_source_props_inner(&session, alter_props, source_id).await?;
71
72    Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE))
73}
74
75async fn handle_alter_source_props_inner(
76    session: &SessionImpl,
77    alter_props: Vec<SqlOption>,
78    source_id: u32,
79) -> Result<()> {
80    let meta_client = session.env().meta_client();
81    let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
82        WithOptions::try_from(alter_props.as_ref() as &[SqlOption])?,
83        session,
84        None,
85    )?;
86    let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
87    if connector_conn_ref.is_some() {
88        return Err(ErrorCode::InvalidInputSyntax(
89            "ALTER SOURCE CONNECTOR does not support CONNECTION".to_owned(),
90        )
91        .into());
92    }
93
94    meta_client
95        .alter_source_connector_props(
96            source_id,
97            changed_props,
98            changed_secret_refs,
99            connector_conn_ref, // always None, keep the interface for future extension
100        )
101        .await?;
102    Ok(())
103}
104
105pub async fn handle_alter_source_connector_props(
106    handler_args: HandlerArgs,
107    source_name: ObjectName,
108    alter_props: Vec<SqlOption>,
109) -> Result<RwPgResponse> {
110    let session = handler_args.session;
111    let db_name = &session.database();
112    let (schema_name, real_source_name) =
113        Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
114    let search_path = session.config().search_path();
115    let user_name = &session.user_name();
116    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
117
118    let source_id = {
119        let reader = session.env().catalog_reader().read_guard();
120        let (source, schema_name) =
121            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
122
123        // For `CREATE TABLE WITH (connector = '...')`, users should call `ALTER TABLE` instead.
124        if source.associated_table_id.is_some() {
125            return Err(ErrorCode::InvalidInputSyntax(
126                "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
127            )
128            .into());
129        }
130
131        session.check_privilege_for_drop_alter(schema_name, &**source)?;
132
133        ensure_alter_props_not_set_by_connection(
134            &reader,
135            db_name,
136            source.connection_id,
137            &alter_props,
138        )?;
139
140        source.id
141    };
142
143    handle_alter_source_props_inner(&session, alter_props, source_id).await?;
144
145    Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
146}
147
148/// Validates that the properties being altered don't conflict with properties set by a CONNECTION.
149pub(crate) fn ensure_alter_props_not_set_by_connection(
150    reader: &CatalogReadGuard,
151    db_name: &str,
152    connection_id: Option<u32>,
153    alter_props: &[SqlOption],
154) -> Result<()> {
155    if let Some(conn_id) = connection_id {
156        let conn = reader.get_connection_by_id(db_name, conn_id)?;
157        if let ConnectionParams(params) = &conn.info {
158            for prop in alter_props {
159                let prop_key = prop.name.real_value();
160                if params.properties.contains_key(&prop_key)
161                    || params.secret_refs.contains_key(&prop_key)
162                {
163                    return Err(ErrorCode::InvalidInputSyntax(
164                        "Cannot alter connector properties that are set by CONNECTION..".to_owned(),
165                    )
166                    .into());
167                }
168            }
169        }
170    }
171    Ok(())
172}