risingwave_frontend/handler/
alter_source_props.rs

1use risingwave_common::id::{ConnectionId, SourceId};
2// Copyright 2025 RisingWave Labs
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15use risingwave_pb::catalog::connection::Info::ConnectionParams;
16
17use super::RwPgResponse;
18use crate::catalog::catalog_service::CatalogReadGuard;
19use crate::catalog::root_catalog::SchemaPath;
20use crate::error::{ErrorCode, Result};
21use crate::handler::{HandlerArgs, ObjectName, SqlOption, StatementType};
22use crate::session::SessionImpl;
23use crate::utils::resolve_connection_ref_and_secret_ref;
24use crate::{Binder, WithOptions};
25
26pub async fn handle_alter_table_connector_props(
27    handler_args: HandlerArgs,
28    table_name: ObjectName,
29    alter_props: Vec<SqlOption>,
30) -> Result<RwPgResponse> {
31    let session = handler_args.session;
32    let db_name = &session.database();
33    let (schema_name, real_table_name) =
34        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
35    let search_path = session.config().search_path();
36    let user_name = &session.user_name();
37    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
38
39    let source_id = {
40        let reader = session.env().catalog_reader().read_guard();
41        let (table, schema_name) =
42            reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
43        let Some(associate_source_id) = table.associated_source_id else {
44            return Err(ErrorCode::InvalidInputSyntax(
45                "Only table with connector can use ALTER TABLE CONNECTOR syntax.".to_owned(),
46            )
47            .into());
48        };
49
50        session.check_privilege_for_drop_alter(schema_name, &**table)?;
51        let (source_catalog, _) =
52            reader.get_source_by_id(db_name, schema_path, associate_source_id)?;
53
54        ensure_alter_props_not_set_by_connection(
55            &reader,
56            db_name,
57            source_catalog.connection_id,
58            &alter_props,
59        )?;
60
61        tracing::info!(
62            "handle_alter_table_connector_props: table_name: {}, table id: {}, source_id: {}",
63            real_table_name,
64            table.id,
65            associate_source_id
66        );
67
68        associate_source_id
69    };
70
71    handle_alter_source_props_inner(&session, alter_props, source_id).await?;
72
73    Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE))
74}
75
76async fn handle_alter_source_props_inner(
77    session: &SessionImpl,
78    alter_props: Vec<SqlOption>,
79    source_id: SourceId,
80) -> Result<()> {
81    let meta_client = session.env().meta_client();
82    let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
83        WithOptions::try_from(alter_props.as_ref() as &[SqlOption])?,
84        session,
85        None,
86    )?;
87    let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
88    if connector_conn_ref.is_some() {
89        return Err(ErrorCode::InvalidInputSyntax(
90            "ALTER SOURCE CONNECTOR does not support CONNECTION".to_owned(),
91        )
92        .into());
93    }
94
95    // Validate cdc.source.wait.streaming.start.timeout if present
96    if let Some(timeout_value) = changed_props.get("cdc.source.wait.streaming.start.timeout")
97        && timeout_value.parse::<u32>().is_err()
98    {
99        return Err(ErrorCode::InvalidConfigValue {
100            config_entry: "cdc.source.wait.streaming.start.timeout".to_owned(),
101            config_value: timeout_value.to_owned(),
102        }
103        .into());
104    }
105
106    // Validate debezium.max.queue.size if present
107    if let Some(queue_size_value) = changed_props.get("debezium.max.queue.size")
108        && queue_size_value.parse::<u32>().is_err()
109    {
110        return Err(ErrorCode::InvalidConfigValue {
111            config_entry: "debezium.max.queue.size".to_owned(),
112            config_value: queue_size_value.to_owned(),
113        }
114        .into());
115    }
116
117    meta_client
118        .alter_source_connector_props(
119            source_id,
120            changed_props,
121            changed_secret_refs,
122            connector_conn_ref, // always None, keep the interface for future extension
123        )
124        .await?;
125    Ok(())
126}
127
128pub async fn handle_alter_source_connector_props(
129    handler_args: HandlerArgs,
130    source_name: ObjectName,
131    alter_props: Vec<SqlOption>,
132) -> Result<RwPgResponse> {
133    let session = handler_args.session;
134    let db_name = &session.database();
135    let (schema_name, real_source_name) =
136        Binder::resolve_schema_qualified_name(db_name, &source_name)?;
137    let search_path = session.config().search_path();
138    let user_name = &session.user_name();
139    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
140
141    let source_id = {
142        let reader = session.env().catalog_reader().read_guard();
143        let (source, schema_name) =
144            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
145
146        // For `CREATE TABLE WITH (connector = '...')`, users should call `ALTER TABLE` instead.
147        if source.associated_table_id.is_some() {
148            return Err(ErrorCode::InvalidInputSyntax(
149                "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
150            )
151            .into());
152        }
153
154        session.check_privilege_for_drop_alter(schema_name, &**source)?;
155
156        ensure_alter_props_not_set_by_connection(
157            &reader,
158            db_name,
159            source.connection_id,
160            &alter_props,
161        )?;
162
163        source.id
164    };
165
166    handle_alter_source_props_inner(&session, alter_props, source_id).await?;
167
168    Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
169}
170
171/// Validates that the properties being altered don't conflict with properties set by a CONNECTION.
172pub(crate) fn ensure_alter_props_not_set_by_connection(
173    reader: &CatalogReadGuard,
174    db_name: &str,
175    connection_id: Option<ConnectionId>,
176    alter_props: &[SqlOption],
177) -> Result<()> {
178    if let Some(conn_id) = connection_id {
179        let conn = reader.get_connection_by_id(db_name, conn_id)?;
180        if let ConnectionParams(params) = &conn.info {
181            for prop in alter_props {
182                let prop_key = prop.name.real_value();
183                if params.properties.contains_key(&prop_key)
184                    || params.secret_refs.contains_key(&prop_key)
185                {
186                    return Err(ErrorCode::InvalidInputSyntax(
187                        "Cannot alter connector properties that are set by CONNECTION..".to_owned(),
188                    )
189                    .into());
190                }
191            }
192        }
193    }
194    Ok(())
195}