1#![rustfmt::skip]
26
27use std::collections::{HashMap, HashSet};
28use std::sync::LazyLock;
29use crate::error::ConnectorError;
30use crate::sink::remote::JdbcSink;
31use crate::sink::Sink;
32
33macro_rules! use_source_properties {
34 ({ $({ $variant_name:ident, $prop_name:ty, $split:ty }),* }) => {
35 $(
36 #[allow(unused_imports)]
37 pub(super) use $prop_name;
38 )*
39 };
40}
41
42mod source_properties {
43 use crate::for_all_sources;
44 use crate::source::base::SourceProperties;
45
46 for_all_sources!(use_source_properties);
47
48 macro_rules! impl_source_name_to_prop_type_name_inner {
54 ({ $({$variant:ident, $prop_name:ty, $split:ty}),* }) => {
55 pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str> {
56 match source_name {
57 $(
58 <$prop_name>::SOURCE_NAME => Some(std::any::type_name::<$prop_name>()),
59 )*
60 _ => None,
61 }
62 }
63 };
64 }
65
66 macro_rules! impl_source_name_to_prop_type_name {
67 () => {
68 $crate::for_all_sources! { impl_source_name_to_prop_type_name_inner }
69 };
70 }
71
72 impl_source_name_to_prop_type_name!();
73}
74
75mod sink_properties {
76 use crate::use_all_sink_configs;
77 use crate::sink::Sink;
78 use crate::sink::file_sink::fs::FsSink;
79
80 use_all_sink_configs!();
81
82 macro_rules! impl_sink_name_to_config_type_name_inner {
83 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
84 pub fn sink_name_to_config_type_name(sink_name: &str) -> Option<&'static str> {
85 match sink_name {
86 $(
87 <$sink_type>::SINK_NAME => Some(std::any::type_name::<$config_type>()),
88 )*
89 _ => None,
90 }
91 }
92 };
93 }
94
95 macro_rules! impl_sink_name_to_config_type_name {
96 () => {
97 $crate::for_all_sinks! { impl_sink_name_to_config_type_name_inner }
98 };
99 }
100
101 impl_sink_name_to_config_type_name!();
102}
103
104pub static SOURCE_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
106 use source_properties::*;
107 let mut map = HashMap::new();
108 map.try_insert(
110 std::any::type_name::<MysqlCdcProperties>().to_owned(),
111 [
112 "cdc.source.wait.streaming.start.timeout".to_owned(),
113 "debezium.max.queue.size".to_owned(),
114 "debezium.queue.memory.ratio".to_owned(),
115 "password".to_owned(),
116 ].into_iter().collect(),
117 ).unwrap();
118 map.try_insert(
119 std::any::type_name::<PostgresCdcProperties>().to_owned(),
120 [
121 "cdc.source.wait.streaming.start.timeout".to_owned(),
122 "debezium.max.queue.size".to_owned(),
123 "debezium.queue.memory.ratio".to_owned(),
124 "password".to_owned(),
125 ].into_iter().collect(),
126 ).unwrap();
127 map.try_insert(
128 std::any::type_name::<SqlServerCdcProperties>().to_owned(),
129 [
130 "cdc.source.wait.streaming.start.timeout".to_owned(),
131 "debezium.max.queue.size".to_owned(),
132 "debezium.queue.memory.ratio".to_owned(),
133 "password".to_owned(),
134 ].into_iter().collect(),
135 ).unwrap();
136
137 map.try_insert(
138 std::any::type_name::<MongodbCdcProperties>().to_owned(),
139 [
140 "cdc.source.wait.streaming.start.timeout".to_owned(),
141 "debezium.max.queue.size".to_owned(),
142 "debezium.queue.memory.ratio".to_owned(),
143 ].into_iter().collect(),
144 ).unwrap();
145 map.try_insert(
147 std::any::type_name::<KafkaProperties>().to_owned(),
148 [
149 "group.id.prefix".to_owned(),
150 "properties.sync.call.timeout".to_owned(),
151 "properties.security.protocol".to_owned(),
152 "properties.ssl.endpoint.identification.algorithm".to_owned(),
153 "properties.sasl.mechanism".to_owned(),
154 "properties.sasl.username".to_owned(),
155 "properties.sasl.password".to_owned(),
156 "properties.message.max.bytes".to_owned(),
157 "properties.receive.message.max.bytes".to_owned(),
158 "properties.statistics.interval.ms".to_owned(),
159 "properties.client.id".to_owned(),
160 "properties.enable.ssl.certificate.verification".to_owned(),
161 "properties.queued.min.messages".to_owned(),
162 "properties.queued.max.messages.kbytes".to_owned(),
163 "properties.fetch.wait.max.ms".to_owned(),
164 "properties.fetch.queue.backoff.ms".to_owned(),
165 "properties.fetch.max.bytes".to_owned(),
166 "properties.enable.auto.commit".to_owned(),
167 ].into_iter().collect(),
168 ).unwrap();
169 map.try_insert(
171 std::any::type_name::<PubsubProperties>().to_owned(),
172 [
173 "pubsub.ack_deadline_seconds".to_owned(),
174 ].into_iter().collect(),
175 ).unwrap();
176 map.try_insert(
178 std::any::type_name::<PulsarProperties>().to_owned(),
179 [
180 "pulsar.operation.retry.max.retries".to_owned(),
181 "pulsar.operation.retry.delay".to_owned(),
182 ].into_iter().collect(),
183 ).unwrap();
184 map
185});
186
187pub static SINK_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
189 use sink_properties::*;
190 let mut map = HashMap::new();
191 map.try_insert(
193 std::any::type_name::<ClickHouseConfig>().to_owned(),
194 [
195 "commit_checkpoint_interval".to_owned(),
196 ].into_iter().collect(),
197 ).unwrap();
198 map.try_insert(
200 std::any::type_name::<DeltaLakeConfig>().to_owned(),
201 [
202 "commit_checkpoint_interval".to_owned(),
203 ].into_iter().collect(),
204 ).unwrap();
205 map.try_insert(
207 std::any::type_name::<DorisConfig>().to_owned(),
208 [
209 "doris.stream_load.http.timeout.ms".to_owned(),
210 ].into_iter().collect(),
211 ).unwrap();
212 map.try_insert(
214 std::any::type_name::<IcebergConfig>().to_owned(),
215 [
216 "commit_checkpoint_interval".to_owned(),
217 "enable_compaction".to_owned(),
218 "compaction_interval_sec".to_owned(),
219 "enable_snapshot_expiration".to_owned(),
220 "snapshot_expiration_max_age_millis".to_owned(),
221 "snapshot_expiration_retain_last".to_owned(),
222 "snapshot_expiration_clear_expired_files".to_owned(),
223 "snapshot_expiration_clear_expired_meta_data".to_owned(),
224 "compaction.max_snapshots_num".to_owned(),
225 "compaction.small_files_threshold_mb".to_owned(),
226 "compaction.delete_files_count_threshold".to_owned(),
227 "compaction.trigger_snapshot_count".to_owned(),
228 "compaction.target_file_size_mb".to_owned(),
229 "compaction.type".to_owned(),
230 "compaction.write_parquet_compression".to_owned(),
231 "compaction.write_parquet_max_row_group_rows".to_owned(),
232 "compaction.write_parquet_max_row_group_bytes".to_owned(),
233 ].into_iter().collect(),
234 ).unwrap();
235 map.try_insert(
237 std::any::type_name::<KafkaConfig>().to_owned(),
238 [
239 "properties.sync.call.timeout".to_owned(),
240 "properties.security.protocol".to_owned(),
241 "properties.ssl.endpoint.identification.algorithm".to_owned(),
242 "properties.sasl.mechanism".to_owned(),
243 "properties.sasl.username".to_owned(),
244 "properties.sasl.password".to_owned(),
245 "properties.message.max.bytes".to_owned(),
246 "properties.receive.message.max.bytes".to_owned(),
247 "properties.statistics.interval.ms".to_owned(),
248 "properties.client.id".to_owned(),
249 "properties.enable.ssl.certificate.verification".to_owned(),
250 "properties.allow.auto.create.topics".to_owned(),
251 "properties.queue.buffering.max.messages".to_owned(),
252 "properties.queue.buffering.max.kbytes".to_owned(),
253 "properties.queue.buffering.max.ms".to_owned(),
254 "properties.enable.idempotence".to_owned(),
255 "properties.message.send.max.retries".to_owned(),
256 "properties.retry.backoff.ms".to_owned(),
257 "properties.batch.num.messages".to_owned(),
258 "properties.batch.size".to_owned(),
259 "properties.message.timeout.ms".to_owned(),
260 "properties.max.in.flight.requests.per.connection".to_owned(),
261 "properties.request.required.acks".to_owned(),
262 ].into_iter().collect(),
263 ).unwrap();
264 map.try_insert(
266 std::any::type_name::<SnowflakeV2Config>().to_owned(),
267 [
268 "commit_checkpoint_interval".to_owned(),
269 ].into_iter().collect(),
270 ).unwrap();
271 map.try_insert(
273 std::any::type_name::<StarrocksConfig>().to_owned(),
274 [
275 "starrocks.stream_load.http.timeout.ms".to_owned(),
276 "commit_checkpoint_interval".to_owned(),
277 ].into_iter().collect(),
278 ).unwrap();
279 map.try_insert(
281 JdbcSink::SINK_NAME.to_owned(),
282 [
283 "jdbc.url".to_owned(),
284 "user".to_owned(),
285 "password".to_owned(),
286 ].into_iter().collect(),
287 ).unwrap();
288 map
289});
290
291pub static CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
293 use crate::connector_common::*;
294 let mut map = HashMap::new();
295 map.try_insert(
297 std::any::type_name::<KafkaConnection>().to_owned(),
298 [
299 "properties.security.protocol".to_owned(),
300 "properties.ssl.endpoint.identification.algorithm".to_owned(),
301 "properties.sasl.mechanism".to_owned(),
302 "properties.sasl.username".to_owned(),
303 "properties.sasl.password".to_owned(),
304 ].into_iter().collect(),
305 ).unwrap();
306 map.try_insert(
308 JdbcSink::SINK_NAME.to_owned(),
309 [
310 "jdbc.url".to_owned(),
311 "user".to_owned(),
312 "password".to_owned(),
313 ].into_iter().collect(),
314 ).unwrap();
315 map
316});
317
318pub fn get_source_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
320 SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
321}
322
323pub fn get_sink_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
325 SINK_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
326}
327
328pub fn get_connection_names_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
330 CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
331}
332
333pub fn check_source_allow_alter_on_fly_fields(
336 connector_name: &str,
337 fields: &[String],
338) -> crate::error::ConnectorResult<()> {
339 let Some(type_name) = source_properties::source_name_to_prop_type_name(connector_name) else {
341 return Err(ConnectorError::from(anyhow::anyhow!(
342 "Unknown source connector: {connector_name}"
343 )));
344 };
345 let Some(allowed_fields) = SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
346 return Err(ConnectorError::from(anyhow::anyhow!(
347 "No allow_alter_on_fly fields registered for connector: {connector_name}"
348 )));
349 };
350 for field in fields {
351 if !allowed_fields.contains(field) {
352 return Err(ConnectorError::from(anyhow::anyhow!(
353 "Field '{field}' is not allowed to be altered on the fly for connector: {connector_name}"
354 )));
355 }
356 }
357 Ok(())
358}
359
360pub fn check_connection_allow_alter_on_fly_fields(
361 connection_name: &str,
362 fields: &[String],
363) -> crate::error::ConnectorResult<()> {
364 use crate::source::connection_name_to_prop_type_name;
365
366 let Some(type_name) = connection_name_to_prop_type_name(connection_name) else {
368 return Err(ConnectorError::from(anyhow::anyhow!(
369 "Unknown connection: {connection_name}"
370 )));
371 };
372 let Some(allowed_fields) = CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
373 return Err(ConnectorError::from(anyhow::anyhow!(
374 "No allow_alter_on_fly fields registered for connection: {connection_name}"
375 )));
376 };
377 for field in fields {
378 if !allowed_fields.contains(field) {
379 return Err(ConnectorError::from(anyhow::anyhow!(
380 "Field '{field}' is not allowed to be altered on the fly for connection: {connection_name}"
381 )));
382 }
383 }
384 Ok(())
385}
386
387pub fn check_sink_allow_alter_on_fly_fields(
390 sink_name: &str,
391 fields: &[String],
392) -> crate::error::ConnectorResult<()> {
393 let allowed_fields = if sink_name == JdbcSink::SINK_NAME {
399 CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(JdbcSink::SINK_NAME)
400 } else {
401 let Some(type_name) = sink_properties::sink_name_to_config_type_name(sink_name) else {
403 return Err(ConnectorError::from(anyhow::anyhow!(
404 "Unknown sink connector: {sink_name}"
405 )));
406 };
407 SINK_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name)
408 };
409 let Some(allowed_fields) = allowed_fields else {
410 return Err(ConnectorError::from(anyhow::anyhow!(
411 "No allow_alter_on_fly fields registered for sink: {sink_name}"
412 )));
413 };
414 for field in fields {
415 if !allowed_fields.contains(field) {
416 return Err(ConnectorError::from(anyhow::anyhow!(
417 "Field '{field}' is not allowed to be altered on the fly for sink: {sink_name}"
418 )));
419 }
420 }
421 Ok(())
422}