risingwave_frontend/handler/
alter_source_props.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::id::{ConnectionId, SourceId};
16use risingwave_pb::catalog::connection::Info::ConnectionParams;
17
18use super::RwPgResponse;
19use crate::catalog::catalog_service::CatalogReadGuard;
20use crate::catalog::root_catalog::SchemaPath;
21use crate::error::{ErrorCode, Result};
22use crate::handler::{HandlerArgs, ObjectName, SqlOption, StatementType};
23use crate::session::SessionImpl;
24use crate::utils::resolve_connection_ref_and_secret_ref;
25use crate::{Binder, WithOptions};
26
27pub async fn handle_alter_table_connector_props(
28    handler_args: HandlerArgs,
29    table_name: ObjectName,
30    alter_props: Vec<SqlOption>,
31) -> Result<RwPgResponse> {
32    let session = handler_args.session;
33    let db_name = &session.database();
34    let (schema_name, real_table_name) =
35        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
36    let search_path = session.config().search_path();
37    let user_name = &session.user_name();
38    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
39
40    let source_id = {
41        let reader = session.env().catalog_reader().read_guard();
42        let (table, schema_name) =
43            reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
44        let Some(associate_source_id) = table.associated_source_id else {
45            return Err(ErrorCode::InvalidInputSyntax(
46                "Only table with connector can use ALTER TABLE CONNECTOR syntax.".to_owned(),
47            )
48            .into());
49        };
50
51        session.check_privilege_for_drop_alter(schema_name, &**table)?;
52        let (source_catalog, _) =
53            reader.get_source_by_id(db_name, schema_path, associate_source_id)?;
54
55        ensure_alter_props_not_set_by_connection(
56            &reader,
57            db_name,
58            source_catalog.connection_id,
59            &alter_props,
60        )?;
61
62        tracing::info!(
63            "handle_alter_table_connector_props: table_name: {}, table id: {}, source_id: {}",
64            real_table_name,
65            table.id,
66            associate_source_id
67        );
68
69        associate_source_id
70    };
71
72    handle_alter_source_props_inner(&session, alter_props, source_id).await?;
73
74    Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE))
75}
76
77async fn handle_alter_source_props_inner(
78    session: &SessionImpl,
79    alter_props: Vec<SqlOption>,
80    source_id: SourceId,
81) -> Result<()> {
82    let meta_client = session.env().meta_client();
83    let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
84        WithOptions::try_from(alter_props.as_ref() as &[SqlOption])?,
85        session,
86        None,
87    )?;
88    let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
89    if connector_conn_ref.is_some() {
90        return Err(ErrorCode::InvalidInputSyntax(
91            "ALTER SOURCE CONNECTOR does not support CONNECTION".to_owned(),
92        )
93        .into());
94    }
95
96    // Validate cdc.source.wait.streaming.start.timeout if present
97    if let Some(timeout_value) = changed_props.get("cdc.source.wait.streaming.start.timeout")
98        && timeout_value.parse::<u32>().is_err()
99    {
100        return Err(ErrorCode::InvalidConfigValue {
101            config_entry: "cdc.source.wait.streaming.start.timeout".to_owned(),
102            config_value: timeout_value.to_owned(),
103        }
104        .into());
105    }
106
107    // Validate debezium.max.queue.size if present
108    if let Some(queue_size_value) = changed_props.get("debezium.max.queue.size")
109        && queue_size_value.parse::<u32>().is_err()
110    {
111        return Err(ErrorCode::InvalidConfigValue {
112            config_entry: "debezium.max.queue.size".to_owned(),
113            config_value: queue_size_value.to_owned(),
114        }
115        .into());
116    }
117
118    meta_client
119        .alter_source_connector_props(
120            source_id,
121            changed_props,
122            changed_secret_refs,
123            connector_conn_ref, // always None, keep the interface for future extension
124        )
125        .await?;
126    Ok(())
127}
128
129pub async fn handle_alter_source_connector_props(
130    handler_args: HandlerArgs,
131    source_name: ObjectName,
132    alter_props: Vec<SqlOption>,
133) -> Result<RwPgResponse> {
134    let session = handler_args.session;
135    let db_name = &session.database();
136    let (schema_name, real_source_name) =
137        Binder::resolve_schema_qualified_name(db_name, &source_name)?;
138    let search_path = session.config().search_path();
139    let user_name = &session.user_name();
140    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
141
142    let source_id = {
143        let reader = session.env().catalog_reader().read_guard();
144        let (source, schema_name) =
145            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
146
147        // For `CREATE TABLE WITH (connector = '...')`, users should call `ALTER TABLE` instead.
148        if source.associated_table_id.is_some() {
149            return Err(ErrorCode::InvalidInputSyntax(
150                "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
151            )
152            .into());
153        }
154
155        session.check_privilege_for_drop_alter(schema_name, &**source)?;
156
157        ensure_alter_props_not_set_by_connection(
158            &reader,
159            db_name,
160            source.connection_id,
161            &alter_props,
162        )?;
163
164        source.id
165    };
166
167    handle_alter_source_props_inner(&session, alter_props, source_id).await?;
168
169    Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
170}
171
172/// Validates that the properties being altered don't conflict with properties set by a CONNECTION.
173pub(crate) fn ensure_alter_props_not_set_by_connection(
174    reader: &CatalogReadGuard,
175    db_name: &str,
176    connection_id: Option<ConnectionId>,
177    alter_props: &[SqlOption],
178) -> Result<()> {
179    if let Some(conn_id) = connection_id {
180        let conn = reader.get_connection_by_id(db_name, conn_id)?;
181        if let ConnectionParams(params) = &conn.info {
182            for prop in alter_props {
183                let prop_key = prop.name.real_value();
184                if params.properties.contains_key(&prop_key)
185                    || params.secret_refs.contains_key(&prop_key)
186                {
187                    return Err(ErrorCode::InvalidInputSyntax(
188                        "Cannot alter connector properties that are set by CONNECTION..".to_owned(),
189                    )
190                    .into());
191                }
192            }
193        }
194    }
195    Ok(())
196}