1use std::collections::{HashMap, HashSet};
19use std::sync::LazyLock;
20use crate::error::ConnectorError;
21
22macro_rules! use_source_properties {
23 ({ $({ $variant_name:ident, $prop_name:ty, $split:ty }),* }) => {
24 $(
25 #[allow(unused_imports)]
26 pub(super) use $prop_name;
27 )*
28 };
29}
30
31mod source_properties {
32 use crate::for_all_sources;
33 use crate::source::base::SourceProperties;
34
35 for_all_sources!(use_source_properties);
36
37 macro_rules! impl_source_name_to_prop_type_name_inner {
43 ({ $({$variant:ident, $prop_name:ty, $split:ty}),* }) => {
44 pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str> {
45 match source_name {
46 $(
47 <$prop_name>::SOURCE_NAME => Some(std::any::type_name::<$prop_name>()),
48 )*
49 _ => None,
50 }
51 }
52 };
53 }
54
55 macro_rules! impl_source_name_to_prop_type_name {
56 () => {
57 $crate::for_all_sources! { impl_source_name_to_prop_type_name_inner }
58 };
59 }
60
61 impl_source_name_to_prop_type_name!();
62}
63
64mod sink_properties {
65 use crate::use_all_sink_configs;
66 use crate::sink::Sink;
67 use crate::sink::file_sink::fs::FsSink;
68
69 use_all_sink_configs!();
70
71 macro_rules! impl_sink_name_to_config_type_name_inner {
72 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
73 pub fn sink_name_to_config_type_name(sink_name: &str) -> Option<&'static str> {
74 match sink_name {
75 $(
76 <$sink_type>::SINK_NAME => Some(std::any::type_name::<$config_type>()),
77 )*
78 _ => None,
79 }
80 }
81 };
82 }
83
84 macro_rules! impl_sink_name_to_config_type_name {
85 () => {
86 $crate::for_all_sinks! { impl_sink_name_to_config_type_name_inner }
87 };
88 }
89
90 impl_sink_name_to_config_type_name!();
91}
92
93pub static SOURCE_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
95 use source_properties::*;
96 let mut map = HashMap::new();
97 map.try_insert(
99 std::any::type_name::<MysqlCdcProperties>().to_owned(),
100 ["cdc.source.wait.streaming.start.timeout".to_owned()].into_iter().collect(),
101 ).unwrap();
102 map.try_insert(
103 std::any::type_name::<PostgresCdcProperties>().to_owned(),
104 ["cdc.source.wait.streaming.start.timeout".to_owned()].into_iter().collect(),
105 ).unwrap();
106 map.try_insert(
107 std::any::type_name::<SqlServerCdcProperties>().to_owned(),
108 ["cdc.source.wait.streaming.start.timeout".to_owned()].into_iter().collect(),
109 ).unwrap();
110
111 map.try_insert(
112 std::any::type_name::<MongodbCdcProperties>().to_owned(),
113 ["cdc.source.wait.streaming.start.timeout".to_owned()].into_iter().collect(),
114 ).unwrap();
115 map.try_insert(
117 std::any::type_name::<KafkaProperties>().to_owned(),
118 [
119 "group.id.prefix".to_owned(),
120 "properties.sync.call.timeout".to_owned(),
121 "properties.security.protocol".to_owned(),
122 "properties.ssl.endpoint.identification.algorithm".to_owned(),
123 "properties.sasl.mechanism".to_owned(),
124 "properties.sasl.username".to_owned(),
125 "properties.sasl.password".to_owned(),
126 "properties.message.max.bytes".to_owned(),
127 "properties.receive.message.max.bytes".to_owned(),
128 "properties.statistics.interval.ms".to_owned(),
129 "properties.client.id".to_owned(),
130 "properties.enable.ssl.certificate.verification".to_owned(),
131 "properties.queued.min.messages".to_owned(),
132 "properties.queued.max.messages.kbytes".to_owned(),
133 "properties.fetch.wait.max.ms".to_owned(),
134 "properties.fetch.queue.backoff.ms".to_owned(),
135 "properties.fetch.max.bytes".to_owned(),
136 "properties.enable.auto.commit".to_owned(),
137 ].into_iter().collect(),
138 ).unwrap();
139 map
140});
141
142pub static SINK_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
144 use sink_properties::*;
145 let mut map = HashMap::new();
146 map.try_insert(
148 std::any::type_name::<ClickHouseConfig>().to_owned(),
149 [
150 "commit_checkpoint_interval".to_owned(),
151 ].into_iter().collect(),
152 ).unwrap();
153 map.try_insert(
155 std::any::type_name::<DeltaLakeConfig>().to_owned(),
156 [
157 "commit_checkpoint_interval".to_owned(),
158 ].into_iter().collect(),
159 ).unwrap();
160 map.try_insert(
162 std::any::type_name::<IcebergConfig>().to_owned(),
163 [
164 "commit_checkpoint_interval".to_owned(),
165 "enable_compaction".to_owned(),
166 "compaction_interval_sec".to_owned(),
167 "enable_snapshot_expiration".to_owned(),
168 ].into_iter().collect(),
169 ).unwrap();
170 map.try_insert(
172 std::any::type_name::<KafkaConfig>().to_owned(),
173 [
174 "properties.sync.call.timeout".to_owned(),
175 "properties.security.protocol".to_owned(),
176 "properties.ssl.endpoint.identification.algorithm".to_owned(),
177 "properties.sasl.mechanism".to_owned(),
178 "properties.sasl.username".to_owned(),
179 "properties.sasl.password".to_owned(),
180 "properties.message.max.bytes".to_owned(),
181 "properties.receive.message.max.bytes".to_owned(),
182 "properties.statistics.interval.ms".to_owned(),
183 "properties.client.id".to_owned(),
184 "properties.enable.ssl.certificate.verification".to_owned(),
185 "properties.allow.auto.create.topics".to_owned(),
186 "properties.queue.buffering.max.messages".to_owned(),
187 "properties.queue.buffering.max.kbytes".to_owned(),
188 "properties.queue.buffering.max.ms".to_owned(),
189 "properties.enable.idempotence".to_owned(),
190 "properties.message.send.max.retries".to_owned(),
191 "properties.retry.backoff.ms".to_owned(),
192 "properties.batch.num.messages".to_owned(),
193 "properties.batch.size".to_owned(),
194 "properties.message.timeout.ms".to_owned(),
195 "properties.max.in.flight.requests.per.connection".to_owned(),
196 "properties.request.required.acks".to_owned(),
197 ].into_iter().collect(),
198 ).unwrap();
199 map.try_insert(
201 std::any::type_name::<StarrocksConfig>().to_owned(),
202 [
203 "commit_checkpoint_interval".to_owned(),
204 ].into_iter().collect(),
205 ).unwrap();
206 map
207});
208
209pub static CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
211 use crate::connector_common::*;
212 let mut map = HashMap::new();
213 map.try_insert(
215 std::any::type_name::<KafkaConnection>().to_owned(),
216 [
217 "properties.security.protocol".to_owned(),
218 "properties.ssl.endpoint.identification.algorithm".to_owned(),
219 "properties.sasl.mechanism".to_owned(),
220 "properties.sasl.username".to_owned(),
221 "properties.sasl.password".to_owned(),
222 ].into_iter().collect(),
223 ).unwrap();
224 map
225});
226
227pub fn get_source_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
229 SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
230}
231
232pub fn get_sink_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
234 SINK_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
235}
236
237pub fn get_connection_names_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
239 CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
240}
241
242pub fn check_source_allow_alter_on_fly_fields(
245 connector_name: &str,
246 fields: &[String],
247) -> crate::error::ConnectorResult<()> {
248 let Some(type_name) = source_properties::source_name_to_prop_type_name(connector_name) else {
250 return Err(ConnectorError::from(anyhow::anyhow!(
251 "Unknown source connector: {connector_name}"
252 )));
253 };
254 let Some(allowed_fields) = SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
255 return Err(ConnectorError::from(anyhow::anyhow!(
256 "No allow_alter_on_fly fields registered for connector: {connector_name}"
257 )));
258 };
259 for field in fields {
260 if !allowed_fields.contains(field) {
261 return Err(ConnectorError::from(anyhow::anyhow!(
262 "Field '{field}' is not allowed to be altered on the fly for connector: {connector_name}"
263 )));
264 }
265 }
266 Ok(())
267}
268
269pub fn check_connection_allow_alter_on_fly_fields(
270 connection_name: &str,
271 fields: &[String],
272) -> crate::error::ConnectorResult<()> {
273 use crate::source::connection_name_to_prop_type_name;
274
275 let Some(type_name) = connection_name_to_prop_type_name(connection_name) else {
277 return Err(ConnectorError::from(anyhow::anyhow!(
278 "Unknown connection: {connection_name}"
279 )));
280 };
281 let Some(allowed_fields) = CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
282 return Err(ConnectorError::from(anyhow::anyhow!(
283 "No allow_alter_on_fly fields registered for connection: {connection_name}"
284 )));
285 };
286 for field in fields {
287 if !allowed_fields.contains(field) {
288 return Err(ConnectorError::from(anyhow::anyhow!(
289 "Field '{field}' is not allowed to be altered on the fly for connection: {connection_name}"
290 )));
291 }
292 }
293 Ok(())
294}
295
296pub fn check_sink_allow_alter_on_fly_fields(
299 sink_name: &str,
300 fields: &[String],
301) -> crate::error::ConnectorResult<()> {
302 let Some(type_name) = sink_properties::sink_name_to_config_type_name(sink_name) else {
304 return Err(ConnectorError::from(anyhow::anyhow!(
305 "Unknown sink connector: {sink_name}"
306 )));
307 };
308 let Some(allowed_fields) = SINK_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
309 return Err(ConnectorError::from(anyhow::anyhow!(
310 "No allow_alter_on_fly fields registered for sink: {sink_name}"
311 )));
312 };
313 for field in fields {
314 if !allowed_fields.contains(field) {
315 return Err(ConnectorError::from(anyhow::anyhow!(
316 "Field '{field}' is not allowed to be altered on the fly for sink: {sink_name}"
317 )));
318 }
319 }
320 Ok(())
321}
322