1#![rustfmt::skip]
26
27use std::collections::{HashMap, HashSet};
28use std::sync::LazyLock;
29use crate::error::ConnectorError;
30use crate::sink::remote::JdbcSink;
31use crate::sink::Sink;
32
33macro_rules! use_source_properties {
34 ({ $({ $variant_name:ident, $prop_name:ty, $split:ty }),* }) => {
35 $(
36 #[allow(unused_imports)]
37 pub(super) use $prop_name;
38 )*
39 };
40}
41
42mod source_properties {
43 use crate::for_all_sources;
44 use crate::source::base::SourceProperties;
45
46 for_all_sources!(use_source_properties);
47
48 macro_rules! impl_source_name_to_prop_type_name_inner {
54 ({ $({$variant:ident, $prop_name:ty, $split:ty}),* }) => {
55 pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str> {
56 match source_name {
57 $(
58 <$prop_name>::SOURCE_NAME => Some(std::any::type_name::<$prop_name>()),
59 )*
60 _ => None,
61 }
62 }
63 };
64 }
65
66 macro_rules! impl_source_name_to_prop_type_name {
67 () => {
68 $crate::for_all_sources! { impl_source_name_to_prop_type_name_inner }
69 };
70 }
71
72 impl_source_name_to_prop_type_name!();
73}
74
75mod sink_properties {
76 use crate::use_all_sink_configs;
77 use crate::sink::Sink;
78 use crate::sink::file_sink::fs::FsSink;
79
80 use_all_sink_configs!();
81
82 macro_rules! impl_sink_name_to_config_type_name_inner {
83 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
84 pub fn sink_name_to_config_type_name(sink_name: &str) -> Option<&'static str> {
85 match sink_name {
86 $(
87 <$sink_type>::SINK_NAME => Some(std::any::type_name::<$config_type>()),
88 )*
89 _ => None,
90 }
91 }
92 };
93 }
94
95 macro_rules! impl_sink_name_to_config_type_name {
96 () => {
97 $crate::for_all_sinks! { impl_sink_name_to_config_type_name_inner }
98 };
99 }
100
101 impl_sink_name_to_config_type_name!();
102}
103
104pub static SOURCE_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
106 use source_properties::*;
107 let mut map = HashMap::new();
108 map.try_insert(
110 std::any::type_name::<MysqlCdcProperties>().to_owned(),
111 [
112 "cdc.source.wait.streaming.start.timeout".to_owned(),
113 "debezium.max.queue.size".to_owned(),
114 "debezium.queue.memory.ratio".to_owned(),
115 "password".to_owned(),
116 ].into_iter().collect(),
117 ).unwrap();
118 map.try_insert(
119 std::any::type_name::<PostgresCdcProperties>().to_owned(),
120 [
121 "cdc.source.wait.streaming.start.timeout".to_owned(),
122 "debezium.max.queue.size".to_owned(),
123 "debezium.queue.memory.ratio".to_owned(),
124 "debezium.heartbeat.interval.ms".to_owned(),
125 "password".to_owned(),
126 ].into_iter().collect(),
127 ).unwrap();
128 map.try_insert(
129 std::any::type_name::<SqlServerCdcProperties>().to_owned(),
130 [
131 "cdc.source.wait.streaming.start.timeout".to_owned(),
132 "debezium.max.queue.size".to_owned(),
133 "debezium.queue.memory.ratio".to_owned(),
134 "password".to_owned(),
135 ].into_iter().collect(),
136 ).unwrap();
137
138 map.try_insert(
139 std::any::type_name::<MongodbCdcProperties>().to_owned(),
140 [
141 "cdc.source.wait.streaming.start.timeout".to_owned(),
142 "debezium.max.queue.size".to_owned(),
143 "debezium.queue.memory.ratio".to_owned(),
144 ].into_iter().collect(),
145 ).unwrap();
146 map.try_insert(
148 std::any::type_name::<KafkaProperties>().to_owned(),
149 [
150 "group.id.prefix".to_owned(),
151 "properties.sync.call.timeout".to_owned(),
152 "properties.security.protocol".to_owned(),
153 "properties.ssl.endpoint.identification.algorithm".to_owned(),
154 "properties.ssl.ca.location".to_owned(),
155 "properties.ssl.ca.pem".to_owned(),
156 "properties.ssl.certificate.location".to_owned(),
157 "properties.ssl.certificate.pem".to_owned(),
158 "properties.ssl.key.location".to_owned(),
159 "properties.ssl.key.pem".to_owned(),
160 "properties.ssl.key.password".to_owned(),
161 "properties.sasl.mechanism".to_owned(),
162 "properties.sasl.username".to_owned(),
163 "properties.sasl.password".to_owned(),
164 "properties.message.max.bytes".to_owned(),
165 "properties.receive.message.max.bytes".to_owned(),
166 "properties.statistics.interval.ms".to_owned(),
167 "properties.client.id".to_owned(),
168 "properties.enable.ssl.certificate.verification".to_owned(),
169 "properties.queued.min.messages".to_owned(),
170 "properties.queued.max.messages.kbytes".to_owned(),
171 "properties.fetch.wait.max.ms".to_owned(),
172 "properties.fetch.queue.backoff.ms".to_owned(),
173 "properties.fetch.max.bytes".to_owned(),
174 "properties.enable.auto.commit".to_owned(),
175 ].into_iter().collect(),
176 ).unwrap();
177 map.try_insert(
179 std::any::type_name::<PubsubProperties>().to_owned(),
180 [
181 "pubsub.ack_deadline_seconds".to_owned(),
182 ].into_iter().collect(),
183 ).unwrap();
184 map.try_insert(
186 std::any::type_name::<PulsarProperties>().to_owned(),
187 [
188 "pulsar.operation.retry.max.retries".to_owned(),
189 "pulsar.operation.retry.delay".to_owned(),
190 ].into_iter().collect(),
191 ).unwrap();
192 map
193});
194
195pub static SINK_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
197 use sink_properties::*;
198 let mut map = HashMap::new();
199 map.try_insert(
201 std::any::type_name::<ClickHouseConfig>().to_owned(),
202 [
203 "commit_checkpoint_interval".to_owned(),
204 ].into_iter().collect(),
205 ).unwrap();
206 map.try_insert(
208 std::any::type_name::<DeltaLakeConfig>().to_owned(),
209 [
210 "commit_checkpoint_interval".to_owned(),
211 ].into_iter().collect(),
212 ).unwrap();
213 map.try_insert(
215 std::any::type_name::<DorisConfig>().to_owned(),
216 [
217 "doris.stream_load.http.timeout.ms".to_owned(),
218 ].into_iter().collect(),
219 ).unwrap();
220 map.try_insert(
222 std::any::type_name::<IcebergConfig>().to_owned(),
223 [
224 "commit_checkpoint_interval".to_owned(),
225 "enable_compaction".to_owned(),
226 "compaction_interval_sec".to_owned(),
227 "enable_snapshot_expiration".to_owned(),
228 "snapshot_expiration_max_age_millis".to_owned(),
229 "snapshot_expiration_retain_last".to_owned(),
230 "snapshot_expiration_clear_expired_files".to_owned(),
231 "snapshot_expiration_clear_expired_meta_data".to_owned(),
232 "compaction.max_snapshots_num".to_owned(),
233 "compaction.small_files_threshold_mb".to_owned(),
234 "compaction.delete_files_count_threshold".to_owned(),
235 "compaction.trigger_snapshot_count".to_owned(),
236 "compaction.target_file_size_mb".to_owned(),
237 "compaction.type".to_owned(),
238 "compaction.write_parquet_compression".to_owned(),
239 "compaction.write_parquet_max_row_group_rows".to_owned(),
240 "compaction.write_parquet_max_row_group_bytes".to_owned(),
241 ].into_iter().collect(),
242 ).unwrap();
243 map.try_insert(
245 std::any::type_name::<KafkaConfig>().to_owned(),
246 [
247 "properties.sync.call.timeout".to_owned(),
248 "properties.security.protocol".to_owned(),
249 "properties.ssl.endpoint.identification.algorithm".to_owned(),
250 "properties.ssl.ca.location".to_owned(),
251 "properties.ssl.ca.pem".to_owned(),
252 "properties.ssl.certificate.location".to_owned(),
253 "properties.ssl.certificate.pem".to_owned(),
254 "properties.ssl.key.location".to_owned(),
255 "properties.ssl.key.pem".to_owned(),
256 "properties.ssl.key.password".to_owned(),
257 "properties.sasl.mechanism".to_owned(),
258 "properties.sasl.username".to_owned(),
259 "properties.sasl.password".to_owned(),
260 "properties.message.max.bytes".to_owned(),
261 "properties.receive.message.max.bytes".to_owned(),
262 "properties.statistics.interval.ms".to_owned(),
263 "properties.client.id".to_owned(),
264 "properties.enable.ssl.certificate.verification".to_owned(),
265 "properties.allow.auto.create.topics".to_owned(),
266 "properties.queue.buffering.max.messages".to_owned(),
267 "properties.queue.buffering.max.kbytes".to_owned(),
268 "properties.queue.buffering.max.ms".to_owned(),
269 "properties.enable.idempotence".to_owned(),
270 "properties.message.send.max.retries".to_owned(),
271 "properties.retry.backoff.ms".to_owned(),
272 "properties.batch.num.messages".to_owned(),
273 "properties.batch.size".to_owned(),
274 "properties.message.timeout.ms".to_owned(),
275 "properties.max.in.flight.requests.per.connection".to_owned(),
276 "properties.request.required.acks".to_owned(),
277 ].into_iter().collect(),
278 ).unwrap();
279 map.try_insert(
281 std::any::type_name::<SnowflakeV2Config>().to_owned(),
282 [
283 "commit_checkpoint_interval".to_owned(),
284 ].into_iter().collect(),
285 ).unwrap();
286 map.try_insert(
288 std::any::type_name::<StarrocksConfig>().to_owned(),
289 [
290 "starrocks.stream_load.http.timeout.ms".to_owned(),
291 "commit_checkpoint_interval".to_owned(),
292 ].into_iter().collect(),
293 ).unwrap();
294 map.try_insert(
296 JdbcSink::SINK_NAME.to_owned(),
297 [
298 "jdbc.url".to_owned(),
299 "user".to_owned(),
300 "password".to_owned(),
301 ].into_iter().collect(),
302 ).unwrap();
303 map
304});
305
306pub static CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
308 use crate::connector_common::*;
309 let mut map = HashMap::new();
310 map.try_insert(
312 std::any::type_name::<KafkaConnection>().to_owned(),
313 [
314 "properties.security.protocol".to_owned(),
315 "properties.ssl.endpoint.identification.algorithm".to_owned(),
316 "properties.ssl.ca.location".to_owned(),
317 "properties.ssl.ca.pem".to_owned(),
318 "properties.ssl.certificate.location".to_owned(),
319 "properties.ssl.certificate.pem".to_owned(),
320 "properties.ssl.key.location".to_owned(),
321 "properties.ssl.key.pem".to_owned(),
322 "properties.ssl.key.password".to_owned(),
323 "properties.sasl.mechanism".to_owned(),
324 "properties.sasl.username".to_owned(),
325 "properties.sasl.password".to_owned(),
326 ].into_iter().collect(),
327 ).unwrap();
328 map.try_insert(
330 JdbcSink::SINK_NAME.to_owned(),
331 [
332 "jdbc.url".to_owned(),
333 "user".to_owned(),
334 "password".to_owned(),
335 ].into_iter().collect(),
336 ).unwrap();
337 map
338});
339
340pub fn get_source_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
342 SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
343}
344
345pub fn get_sink_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
347 SINK_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
348}
349
350pub fn get_connection_names_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
352 CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
353}
354
355pub fn check_source_allow_alter_on_fly_fields(
358 connector_name: &str,
359 fields: &[String],
360) -> crate::error::ConnectorResult<()> {
361 let Some(type_name) = source_properties::source_name_to_prop_type_name(connector_name) else {
363 return Err(ConnectorError::from(anyhow::anyhow!(
364 "Unknown source connector: {connector_name}"
365 )));
366 };
367 let Some(allowed_fields) = SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
368 return Err(ConnectorError::from(anyhow::anyhow!(
369 "No allow_alter_on_fly fields registered for connector: {connector_name}"
370 )));
371 };
372 for field in fields {
373 if !allowed_fields.contains(field) {
374 return Err(ConnectorError::from(anyhow::anyhow!(
375 "Field '{field}' is not allowed to be altered on the fly for connector: {connector_name}"
376 )));
377 }
378 }
379 Ok(())
380}
381
382pub fn check_connection_allow_alter_on_fly_fields(
383 connection_name: &str,
384 fields: &[String],
385) -> crate::error::ConnectorResult<()> {
386 use crate::source::connection_name_to_prop_type_name;
387
388 let Some(type_name) = connection_name_to_prop_type_name(connection_name) else {
390 return Err(ConnectorError::from(anyhow::anyhow!(
391 "Unknown connection: {connection_name}"
392 )));
393 };
394 let Some(allowed_fields) = CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
395 return Err(ConnectorError::from(anyhow::anyhow!(
396 "No allow_alter_on_fly fields registered for connection: {connection_name}"
397 )));
398 };
399 for field in fields {
400 if !allowed_fields.contains(field) {
401 return Err(ConnectorError::from(anyhow::anyhow!(
402 "Field '{field}' is not allowed to be altered on the fly for connection: {connection_name}"
403 )));
404 }
405 }
406 Ok(())
407}
408
409pub fn check_sink_allow_alter_on_fly_fields(
412 sink_name: &str,
413 fields: &[String],
414) -> crate::error::ConnectorResult<()> {
415 let allowed_fields = if sink_name == JdbcSink::SINK_NAME {
421 CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(JdbcSink::SINK_NAME)
422 } else {
423 let Some(type_name) = sink_properties::sink_name_to_config_type_name(sink_name) else {
425 return Err(ConnectorError::from(anyhow::anyhow!(
426 "Unknown sink connector: {sink_name}"
427 )));
428 };
429 SINK_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name)
430 };
431 let Some(allowed_fields) = allowed_fields else {
432 return Err(ConnectorError::from(anyhow::anyhow!(
433 "No allow_alter_on_fly fields registered for sink: {sink_name}"
434 )));
435 };
436 for field in fields {
437 if !allowed_fields.contains(field) {
438 return Err(ConnectorError::from(anyhow::anyhow!(
439 "Field '{field}' is not allowed to be altered on the fly for sink: {sink_name}"
440 )));
441 }
442 }
443 Ok(())
444}