risingwave_frontend/handler/
alter_source_props.rs1use risingwave_common::id::{ConnectionId, SourceId};
16use risingwave_pb::catalog::connection::Info::ConnectionParams;
17
18use super::RwPgResponse;
19use crate::catalog::catalog_service::CatalogReadGuard;
20use crate::catalog::root_catalog::SchemaPath;
21use crate::error::{ErrorCode, Result};
22use crate::handler::{HandlerArgs, ObjectName, SqlOption, StatementType};
23use crate::session::SessionImpl;
24use crate::utils::resolve_connection_ref_and_secret_ref;
25use crate::{Binder, WithOptions};
26
27pub async fn handle_alter_table_connector_props(
28 handler_args: HandlerArgs,
29 table_name: ObjectName,
30 alter_props: Vec<SqlOption>,
31) -> Result<RwPgResponse> {
32 let session = handler_args.session;
33 let db_name = &session.database();
34 let (schema_name, real_table_name) =
35 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
36 let search_path = session.config().search_path();
37 let user_name = &session.user_name();
38 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
39
40 let source_id = {
41 let reader = session.env().catalog_reader().read_guard();
42 let (table, schema_name) =
43 reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
44 let Some(associate_source_id) = table.associated_source_id else {
45 return Err(ErrorCode::InvalidInputSyntax(
46 "Only table with connector can use ALTER TABLE CONNECTOR syntax.".to_owned(),
47 )
48 .into());
49 };
50
51 session.check_privilege_for_drop_alter(schema_name, &**table)?;
52 let (source_catalog, _) =
53 reader.get_source_by_id(db_name, schema_path, associate_source_id)?;
54
55 ensure_alter_props_not_set_by_connection(
56 &reader,
57 db_name,
58 source_catalog.connection_id,
59 &alter_props,
60 )?;
61
62 tracing::info!(
63 "handle_alter_table_connector_props: table_name: {}, table id: {}, source_id: {}",
64 real_table_name,
65 table.id,
66 associate_source_id
67 );
68
69 associate_source_id
70 };
71
72 handle_alter_source_props_inner(&session, alter_props, source_id).await?;
73
74 Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE))
75}
76
77async fn handle_alter_source_props_inner(
78 session: &SessionImpl,
79 alter_props: Vec<SqlOption>,
80 source_id: SourceId,
81) -> Result<()> {
82 let meta_client = session.env().meta_client();
83 let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
84 WithOptions::try_from(alter_props.as_ref() as &[SqlOption])?,
85 session,
86 None,
87 )?;
88 let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
89 if connector_conn_ref.is_some() {
90 return Err(ErrorCode::InvalidInputSyntax(
91 "ALTER SOURCE CONNECTOR does not support CONNECTION".to_owned(),
92 )
93 .into());
94 }
95
96 if let Some(timeout_value) = changed_props.get("cdc.source.wait.streaming.start.timeout")
98 && timeout_value.parse::<u32>().is_err()
99 {
100 return Err(ErrorCode::InvalidConfigValue {
101 config_entry: "cdc.source.wait.streaming.start.timeout".to_owned(),
102 config_value: timeout_value.to_owned(),
103 }
104 .into());
105 }
106
107 if let Some(queue_size_value) = changed_props.get("debezium.max.queue.size")
109 && queue_size_value.parse::<u32>().is_err()
110 {
111 return Err(ErrorCode::InvalidConfigValue {
112 config_entry: "debezium.max.queue.size".to_owned(),
113 config_value: queue_size_value.to_owned(),
114 }
115 .into());
116 }
117
118 meta_client
119 .alter_source_connector_props(
120 source_id,
121 changed_props,
122 changed_secret_refs,
123 connector_conn_ref, )
125 .await?;
126 Ok(())
127}
128
129pub async fn handle_alter_source_connector_props(
130 handler_args: HandlerArgs,
131 source_name: ObjectName,
132 alter_props: Vec<SqlOption>,
133) -> Result<RwPgResponse> {
134 let session = handler_args.session;
135 let db_name = &session.database();
136 let (schema_name, real_source_name) =
137 Binder::resolve_schema_qualified_name(db_name, &source_name)?;
138 let search_path = session.config().search_path();
139 let user_name = &session.user_name();
140 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
141
142 let source_id = {
143 let reader = session.env().catalog_reader().read_guard();
144 let (source, schema_name) =
145 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
146
147 if source.associated_table_id.is_some() {
149 return Err(ErrorCode::InvalidInputSyntax(
150 "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
151 )
152 .into());
153 }
154
155 session.check_privilege_for_drop_alter(schema_name, &**source)?;
156
157 ensure_alter_props_not_set_by_connection(
158 &reader,
159 db_name,
160 source.connection_id,
161 &alter_props,
162 )?;
163
164 source.id
165 };
166
167 handle_alter_source_props_inner(&session, alter_props, source_id).await?;
168
169 Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
170}
171
172pub(crate) fn ensure_alter_props_not_set_by_connection(
174 reader: &CatalogReadGuard,
175 db_name: &str,
176 connection_id: Option<ConnectionId>,
177 alter_props: &[SqlOption],
178) -> Result<()> {
179 if let Some(conn_id) = connection_id {
180 let conn = reader.get_connection_by_id(db_name, conn_id)?;
181 if let ConnectionParams(params) = &conn.info {
182 for prop in alter_props {
183 let prop_key = prop.name.real_value();
184 if params.properties.contains_key(&prop_key)
185 || params.secret_refs.contains_key(&prop_key)
186 {
187 return Err(ErrorCode::InvalidInputSyntax(
188 "Cannot alter connector properties that are set by CONNECTION..".to_owned(),
189 )
190 .into());
191 }
192 }
193 }
194 }
195 Ok(())
196}