risingwave_frontend/handler/
alter_source_props.rs1use risingwave_common::id::{ConnectionId, SourceId};
2use risingwave_pb::catalog::connection::Info::ConnectionParams;
16
17use super::RwPgResponse;
18use crate::catalog::catalog_service::CatalogReadGuard;
19use crate::catalog::root_catalog::SchemaPath;
20use crate::error::{ErrorCode, Result};
21use crate::handler::{HandlerArgs, ObjectName, SqlOption, StatementType};
22use crate::session::SessionImpl;
23use crate::utils::resolve_connection_ref_and_secret_ref;
24use crate::{Binder, WithOptions};
25
26pub async fn handle_alter_table_connector_props(
27 handler_args: HandlerArgs,
28 table_name: ObjectName,
29 alter_props: Vec<SqlOption>,
30) -> Result<RwPgResponse> {
31 let session = handler_args.session;
32 let db_name = &session.database();
33 let (schema_name, real_table_name) =
34 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
35 let search_path = session.config().search_path();
36 let user_name = &session.user_name();
37 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
38
39 let source_id = {
40 let reader = session.env().catalog_reader().read_guard();
41 let (table, schema_name) =
42 reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
43 let Some(associate_source_id) = table.associated_source_id else {
44 return Err(ErrorCode::InvalidInputSyntax(
45 "Only table with connector can use ALTER TABLE CONNECTOR syntax.".to_owned(),
46 )
47 .into());
48 };
49
50 session.check_privilege_for_drop_alter(schema_name, &**table)?;
51 let (source_catalog, _) =
52 reader.get_source_by_id(db_name, schema_path, associate_source_id)?;
53
54 ensure_alter_props_not_set_by_connection(
55 &reader,
56 db_name,
57 source_catalog.connection_id,
58 &alter_props,
59 )?;
60
61 tracing::info!(
62 "handle_alter_table_connector_props: table_name: {}, table id: {}, source_id: {}",
63 real_table_name,
64 table.id,
65 associate_source_id
66 );
67
68 associate_source_id
69 };
70
71 handle_alter_source_props_inner(&session, alter_props, source_id).await?;
72
73 Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE))
74}
75
76async fn handle_alter_source_props_inner(
77 session: &SessionImpl,
78 alter_props: Vec<SqlOption>,
79 source_id: SourceId,
80) -> Result<()> {
81 let meta_client = session.env().meta_client();
82 let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
83 WithOptions::try_from(alter_props.as_ref() as &[SqlOption])?,
84 session,
85 None,
86 )?;
87 let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
88 if connector_conn_ref.is_some() {
89 return Err(ErrorCode::InvalidInputSyntax(
90 "ALTER SOURCE CONNECTOR does not support CONNECTION".to_owned(),
91 )
92 .into());
93 }
94
95 if let Some(timeout_value) = changed_props.get("cdc.source.wait.streaming.start.timeout")
97 && timeout_value.parse::<u32>().is_err()
98 {
99 return Err(ErrorCode::InvalidConfigValue {
100 config_entry: "cdc.source.wait.streaming.start.timeout".to_owned(),
101 config_value: timeout_value.to_owned(),
102 }
103 .into());
104 }
105
106 if let Some(queue_size_value) = changed_props.get("debezium.max.queue.size")
108 && queue_size_value.parse::<u32>().is_err()
109 {
110 return Err(ErrorCode::InvalidConfigValue {
111 config_entry: "debezium.max.queue.size".to_owned(),
112 config_value: queue_size_value.to_owned(),
113 }
114 .into());
115 }
116
117 meta_client
118 .alter_source_connector_props(
119 source_id,
120 changed_props,
121 changed_secret_refs,
122 connector_conn_ref, )
124 .await?;
125 Ok(())
126}
127
128pub async fn handle_alter_source_connector_props(
129 handler_args: HandlerArgs,
130 source_name: ObjectName,
131 alter_props: Vec<SqlOption>,
132) -> Result<RwPgResponse> {
133 let session = handler_args.session;
134 let db_name = &session.database();
135 let (schema_name, real_source_name) =
136 Binder::resolve_schema_qualified_name(db_name, &source_name)?;
137 let search_path = session.config().search_path();
138 let user_name = &session.user_name();
139 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
140
141 let source_id = {
142 let reader = session.env().catalog_reader().read_guard();
143 let (source, schema_name) =
144 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
145
146 if source.associated_table_id.is_some() {
148 return Err(ErrorCode::InvalidInputSyntax(
149 "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
150 )
151 .into());
152 }
153
154 session.check_privilege_for_drop_alter(schema_name, &**source)?;
155
156 ensure_alter_props_not_set_by_connection(
157 &reader,
158 db_name,
159 source.connection_id,
160 &alter_props,
161 )?;
162
163 source.id
164 };
165
166 handle_alter_source_props_inner(&session, alter_props, source_id).await?;
167
168 Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
169}
170
171pub(crate) fn ensure_alter_props_not_set_by_connection(
173 reader: &CatalogReadGuard,
174 db_name: &str,
175 connection_id: Option<ConnectionId>,
176 alter_props: &[SqlOption],
177) -> Result<()> {
178 if let Some(conn_id) = connection_id {
179 let conn = reader.get_connection_by_id(db_name, conn_id)?;
180 if let ConnectionParams(params) = &conn.info {
181 for prop in alter_props {
182 let prop_key = prop.name.real_value();
183 if params.properties.contains_key(&prop_key)
184 || params.secret_refs.contains_key(&prop_key)
185 {
186 return Err(ErrorCode::InvalidInputSyntax(
187 "Cannot alter connector properties that are set by CONNECTION..".to_owned(),
188 )
189 .into());
190 }
191 }
192 }
193 }
194 Ok(())
195}