1#![rustfmt::skip]
19
20use std::collections::{HashMap, HashSet};
21use std::sync::LazyLock;
22use crate::error::ConnectorError;
23
24macro_rules! use_source_properties {
25 ({ $({ $variant_name:ident, $prop_name:ty, $split:ty }),* }) => {
26 $(
27 #[allow(unused_imports)]
28 pub(super) use $prop_name;
29 )*
30 };
31}
32
33mod source_properties {
34 use crate::for_all_sources;
35 use crate::source::base::SourceProperties;
36
37 for_all_sources!(use_source_properties);
38
39 macro_rules! impl_source_name_to_prop_type_name_inner {
45 ({ $({$variant:ident, $prop_name:ty, $split:ty}),* }) => {
46 pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str> {
47 match source_name {
48 $(
49 <$prop_name>::SOURCE_NAME => Some(std::any::type_name::<$prop_name>()),
50 )*
51 _ => None,
52 }
53 }
54 };
55 }
56
57 macro_rules! impl_source_name_to_prop_type_name {
58 () => {
59 $crate::for_all_sources! { impl_source_name_to_prop_type_name_inner }
60 };
61 }
62
63 impl_source_name_to_prop_type_name!();
64}
65
66mod sink_properties {
67 use crate::use_all_sink_configs;
68 use crate::sink::Sink;
69 use crate::sink::file_sink::fs::FsSink;
70
71 use_all_sink_configs!();
72
73 macro_rules! impl_sink_name_to_config_type_name_inner {
74 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
75 pub fn sink_name_to_config_type_name(sink_name: &str) -> Option<&'static str> {
76 match sink_name {
77 $(
78 <$sink_type>::SINK_NAME => Some(std::any::type_name::<$config_type>()),
79 )*
80 _ => None,
81 }
82 }
83 };
84 }
85
86 macro_rules! impl_sink_name_to_config_type_name {
87 () => {
88 $crate::for_all_sinks! { impl_sink_name_to_config_type_name_inner }
89 };
90 }
91
92 impl_sink_name_to_config_type_name!();
93}
94
95pub static SOURCE_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
97 use source_properties::*;
98 let mut map = HashMap::new();
99 map.try_insert(
101 std::any::type_name::<MysqlCdcProperties>().to_owned(),
102 [
103 "cdc.source.wait.streaming.start.timeout".to_owned(),
104 "debezium.max.queue.size".to_owned(),
105 "debezium.queue.memory.ratio".to_owned(),
106 ].into_iter().collect(),
107 ).unwrap();
108 map.try_insert(
109 std::any::type_name::<PostgresCdcProperties>().to_owned(),
110 [
111 "cdc.source.wait.streaming.start.timeout".to_owned(),
112 "debezium.max.queue.size".to_owned(),
113 "debezium.queue.memory.ratio".to_owned(),
114 ].into_iter().collect(),
115 ).unwrap();
116 map.try_insert(
117 std::any::type_name::<SqlServerCdcProperties>().to_owned(),
118 [
119 "cdc.source.wait.streaming.start.timeout".to_owned(),
120 "debezium.max.queue.size".to_owned(),
121 "debezium.queue.memory.ratio".to_owned(),
122
123 ].into_iter().collect(),
124 ).unwrap();
125
126 map.try_insert(
127 std::any::type_name::<MongodbCdcProperties>().to_owned(),
128 [
129 "cdc.source.wait.streaming.start.timeout".to_owned(),
130 "debezium.max.queue.size".to_owned(),
131 "debezium.queue.memory.ratio".to_owned(),
132 ].into_iter().collect(),
133 ).unwrap();
134 map.try_insert(
136 std::any::type_name::<KafkaProperties>().to_owned(),
137 [
138 "group.id.prefix".to_owned(),
139 "properties.sync.call.timeout".to_owned(),
140 "properties.security.protocol".to_owned(),
141 "properties.ssl.endpoint.identification.algorithm".to_owned(),
142 "properties.sasl.mechanism".to_owned(),
143 "properties.sasl.username".to_owned(),
144 "properties.sasl.password".to_owned(),
145 "properties.message.max.bytes".to_owned(),
146 "properties.receive.message.max.bytes".to_owned(),
147 "properties.statistics.interval.ms".to_owned(),
148 "properties.client.id".to_owned(),
149 "properties.enable.ssl.certificate.verification".to_owned(),
150 "properties.queued.min.messages".to_owned(),
151 "properties.queued.max.messages.kbytes".to_owned(),
152 "properties.fetch.wait.max.ms".to_owned(),
153 "properties.fetch.queue.backoff.ms".to_owned(),
154 "properties.fetch.max.bytes".to_owned(),
155 "properties.enable.auto.commit".to_owned(),
156 ].into_iter().collect(),
157 ).unwrap();
158 map
159});
160
161pub static SINK_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
163 use sink_properties::*;
164 let mut map = HashMap::new();
165 map.try_insert(
167 std::any::type_name::<ClickHouseConfig>().to_owned(),
168 [
169 "commit_checkpoint_interval".to_owned(),
170 ].into_iter().collect(),
171 ).unwrap();
172 map.try_insert(
174 std::any::type_name::<DeltaLakeConfig>().to_owned(),
175 [
176 "commit_checkpoint_interval".to_owned(),
177 ].into_iter().collect(),
178 ).unwrap();
179 map.try_insert(
181 std::any::type_name::<DorisConfig>().to_owned(),
182 [
183 "doris.stream_load.http.timeout.ms".to_owned(),
184 ].into_iter().collect(),
185 ).unwrap();
186 map.try_insert(
188 std::any::type_name::<IcebergConfig>().to_owned(),
189 [
190 "commit_checkpoint_interval".to_owned(),
191 "enable_compaction".to_owned(),
192 "compaction_interval_sec".to_owned(),
193 "enable_snapshot_expiration".to_owned(),
194 "snapshot_expiration_max_age_millis".to_owned(),
195 "snapshot_expiration_retain_last".to_owned(),
196 "snapshot_expiration_clear_expired_files".to_owned(),
197 "snapshot_expiration_clear_expired_meta_data".to_owned(),
198 "compaction.max_snapshots_num".to_owned(),
199 "compaction.small_files_threshold_mb".to_owned(),
200 "compaction.delete_files_count_threshold".to_owned(),
201 "compaction.trigger_snapshot_count".to_owned(),
202 "compaction.target_file_size_mb".to_owned(),
203 "compaction.type".to_owned(),
204 ].into_iter().collect(),
205 ).unwrap();
206 map.try_insert(
208 std::any::type_name::<KafkaConfig>().to_owned(),
209 [
210 "properties.sync.call.timeout".to_owned(),
211 "properties.security.protocol".to_owned(),
212 "properties.ssl.endpoint.identification.algorithm".to_owned(),
213 "properties.sasl.mechanism".to_owned(),
214 "properties.sasl.username".to_owned(),
215 "properties.sasl.password".to_owned(),
216 "properties.message.max.bytes".to_owned(),
217 "properties.receive.message.max.bytes".to_owned(),
218 "properties.statistics.interval.ms".to_owned(),
219 "properties.client.id".to_owned(),
220 "properties.enable.ssl.certificate.verification".to_owned(),
221 "properties.allow.auto.create.topics".to_owned(),
222 "properties.queue.buffering.max.messages".to_owned(),
223 "properties.queue.buffering.max.kbytes".to_owned(),
224 "properties.queue.buffering.max.ms".to_owned(),
225 "properties.enable.idempotence".to_owned(),
226 "properties.message.send.max.retries".to_owned(),
227 "properties.retry.backoff.ms".to_owned(),
228 "properties.batch.num.messages".to_owned(),
229 "properties.batch.size".to_owned(),
230 "properties.message.timeout.ms".to_owned(),
231 "properties.max.in.flight.requests.per.connection".to_owned(),
232 "properties.request.required.acks".to_owned(),
233 ].into_iter().collect(),
234 ).unwrap();
235 map.try_insert(
237 std::any::type_name::<SnowflakeV2Config>().to_owned(),
238 [
239 "commit_checkpoint_interval".to_owned(),
240 ].into_iter().collect(),
241 ).unwrap();
242 map.try_insert(
244 std::any::type_name::<StarrocksConfig>().to_owned(),
245 [
246 "starrocks.stream_load.http.timeout.ms".to_owned(),
247 "commit_checkpoint_interval".to_owned(),
248 ].into_iter().collect(),
249 ).unwrap();
250 map
251});
252
253pub static CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
255 use crate::connector_common::*;
256 let mut map = HashMap::new();
257 map.try_insert(
259 std::any::type_name::<KafkaConnection>().to_owned(),
260 [
261 "properties.security.protocol".to_owned(),
262 "properties.ssl.endpoint.identification.algorithm".to_owned(),
263 "properties.sasl.mechanism".to_owned(),
264 "properties.sasl.username".to_owned(),
265 "properties.sasl.password".to_owned(),
266 ].into_iter().collect(),
267 ).unwrap();
268 map
269});
270
271pub fn get_source_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
273 SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
274}
275
276pub fn get_sink_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
278 SINK_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
279}
280
281pub fn get_connection_names_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
283 CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
284}
285
286pub fn check_source_allow_alter_on_fly_fields(
289 connector_name: &str,
290 fields: &[String],
291) -> crate::error::ConnectorResult<()> {
292 let Some(type_name) = source_properties::source_name_to_prop_type_name(connector_name) else {
294 return Err(ConnectorError::from(anyhow::anyhow!(
295 "Unknown source connector: {connector_name}"
296 )));
297 };
298 let Some(allowed_fields) = SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
299 return Err(ConnectorError::from(anyhow::anyhow!(
300 "No allow_alter_on_fly fields registered for connector: {connector_name}"
301 )));
302 };
303 for field in fields {
304 if !allowed_fields.contains(field) {
305 return Err(ConnectorError::from(anyhow::anyhow!(
306 "Field '{field}' is not allowed to be altered on the fly for connector: {connector_name}"
307 )));
308 }
309 }
310 Ok(())
311}
312
313pub fn check_connection_allow_alter_on_fly_fields(
314 connection_name: &str,
315 fields: &[String],
316) -> crate::error::ConnectorResult<()> {
317 use crate::source::connection_name_to_prop_type_name;
318
319 let Some(type_name) = connection_name_to_prop_type_name(connection_name) else {
321 return Err(ConnectorError::from(anyhow::anyhow!(
322 "Unknown connection: {connection_name}"
323 )));
324 };
325 let Some(allowed_fields) = CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
326 return Err(ConnectorError::from(anyhow::anyhow!(
327 "No allow_alter_on_fly fields registered for connection: {connection_name}"
328 )));
329 };
330 for field in fields {
331 if !allowed_fields.contains(field) {
332 return Err(ConnectorError::from(anyhow::anyhow!(
333 "Field '{field}' is not allowed to be altered on the fly for connection: {connection_name}"
334 )));
335 }
336 }
337 Ok(())
338}
339
340pub fn check_sink_allow_alter_on_fly_fields(
343 sink_name: &str,
344 fields: &[String],
345) -> crate::error::ConnectorResult<()> {
346 let Some(type_name) = sink_properties::sink_name_to_config_type_name(sink_name) else {
348 return Err(ConnectorError::from(anyhow::anyhow!(
349 "Unknown sink connector: {sink_name}"
350 )));
351 };
352 let Some(allowed_fields) = SINK_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
353 return Err(ConnectorError::from(anyhow::anyhow!(
354 "No allow_alter_on_fly fields registered for sink: {sink_name}"
355 )));
356 };
357 for field in fields {
358 if !allowed_fields.contains(field) {
359 return Err(ConnectorError::from(anyhow::anyhow!(
360 "Field '{field}' is not allowed to be altered on the fly for sink: {sink_name}"
361 )));
362 }
363 }
364 Ok(())
365}