risingwave_connector/
allow_alter_on_fly_fields.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// THIS FILE IS AUTO_GENERATED. DO NOT EDIT
16// UPDATE WITH: ./risedev generate-with-options
17
18#![rustfmt::skip]
19
20use std::collections::{HashMap, HashSet};
21use std::sync::LazyLock;
22use crate::error::ConnectorError;
23
24macro_rules! use_source_properties {
25    ({ $({ $variant_name:ident, $prop_name:ty, $split:ty }),* }) => {
26        $(
27            #[allow(unused_imports)]
28            pub(super) use $prop_name;
29        )*
30    };
31}
32
33mod source_properties {
34    use crate::for_all_sources;
35    use crate::source::base::SourceProperties;
36
37    for_all_sources!(use_source_properties);
38
39    /// Implements a function that maps a source name string to the Rust type name of the corresponding property type.
40    /// Usage: `impl_source_name_to_prop_type_name!();` will generate:
41    /// ```ignore
42    /// pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str>
43    /// ```
44    macro_rules! impl_source_name_to_prop_type_name_inner {
45        ({ $({$variant:ident, $prop_name:ty, $split:ty}),* }) => {
46            pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str> {
47                match source_name {
48                    $(
49                        <$prop_name>::SOURCE_NAME => Some(std::any::type_name::<$prop_name>()),
50                    )*
51                    _ => None,
52                }
53            }
54        };
55    }
56
57    macro_rules! impl_source_name_to_prop_type_name {
58        () => {
59            $crate::for_all_sources! { impl_source_name_to_prop_type_name_inner }
60        };
61    }
62
63    impl_source_name_to_prop_type_name!();
64}
65
66mod sink_properties {
67    use crate::use_all_sink_configs;
68    use crate::sink::Sink;
69    use crate::sink::file_sink::fs::FsSink;
70
71    use_all_sink_configs!();
72
73    macro_rules! impl_sink_name_to_config_type_name_inner {
74        ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
75            pub fn sink_name_to_config_type_name(sink_name: &str) -> Option<&'static str> {
76                match sink_name {
77                $(
78                    <$sink_type>::SINK_NAME => Some(std::any::type_name::<$config_type>()),
79                )*
80                    _ => None,
81                }
82            }
83        };
84    }
85
86    macro_rules! impl_sink_name_to_config_type_name {
87        () => {
88            $crate::for_all_sinks! { impl_sink_name_to_config_type_name_inner }
89        };
90    }
91
92    impl_sink_name_to_config_type_name!();
93}
94
95/// Map of source connector names to their `allow_alter_on_fly` field names
96pub static SOURCE_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
97    use source_properties::*;
98    let mut map = HashMap::new();
99    // CDC Properties - added for schema.change.failure.policy
100    map.try_insert(
101        std::any::type_name::<MysqlCdcProperties>().to_owned(),
102        [
103            "cdc.source.wait.streaming.start.timeout".to_owned(),
104            "debezium.max.queue.size".to_owned(),
105            "debezium.queue.memory.ratio".to_owned(),
106        ].into_iter().collect(),
107    ).unwrap();
108    map.try_insert(
109        std::any::type_name::<PostgresCdcProperties>().to_owned(),
110        [
111            "cdc.source.wait.streaming.start.timeout".to_owned(),
112            "debezium.max.queue.size".to_owned(),
113            "debezium.queue.memory.ratio".to_owned(),
114        ].into_iter().collect(),
115    ).unwrap();
116    map.try_insert(
117        std::any::type_name::<SqlServerCdcProperties>().to_owned(),
118        [
119            "cdc.source.wait.streaming.start.timeout".to_owned(),
120            "debezium.max.queue.size".to_owned(),
121            "debezium.queue.memory.ratio".to_owned(),
122
123        ].into_iter().collect(),
124    ).unwrap();
125
126    map.try_insert(
127        std::any::type_name::<MongodbCdcProperties>().to_owned(),
128        [
129            "cdc.source.wait.streaming.start.timeout".to_owned(),
130            "debezium.max.queue.size".to_owned(),
131            "debezium.queue.memory.ratio".to_owned(),
132        ].into_iter().collect(),
133    ).unwrap();
134    // KafkaProperties
135    map.try_insert(
136        std::any::type_name::<KafkaProperties>().to_owned(),
137        [
138            "group.id.prefix".to_owned(),
139            "properties.sync.call.timeout".to_owned(),
140            "properties.security.protocol".to_owned(),
141            "properties.ssl.endpoint.identification.algorithm".to_owned(),
142            "properties.sasl.mechanism".to_owned(),
143            "properties.sasl.username".to_owned(),
144            "properties.sasl.password".to_owned(),
145            "properties.message.max.bytes".to_owned(),
146            "properties.receive.message.max.bytes".to_owned(),
147            "properties.statistics.interval.ms".to_owned(),
148            "properties.client.id".to_owned(),
149            "properties.enable.ssl.certificate.verification".to_owned(),
150            "properties.queued.min.messages".to_owned(),
151            "properties.queued.max.messages.kbytes".to_owned(),
152            "properties.fetch.wait.max.ms".to_owned(),
153            "properties.fetch.queue.backoff.ms".to_owned(),
154            "properties.fetch.max.bytes".to_owned(),
155            "properties.enable.auto.commit".to_owned(),
156        ].into_iter().collect(),
157    ).unwrap();
158    map
159});
160
161/// Map of sink connector names to their `allow_alter_on_fly` field names
162pub static SINK_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
163    use sink_properties::*;
164    let mut map = HashMap::new();
165    // ClickHouseConfig
166    map.try_insert(
167        std::any::type_name::<ClickHouseConfig>().to_owned(),
168        [
169            "commit_checkpoint_interval".to_owned(),
170        ].into_iter().collect(),
171    ).unwrap();
172    // DeltaLakeConfig
173    map.try_insert(
174        std::any::type_name::<DeltaLakeConfig>().to_owned(),
175        [
176            "commit_checkpoint_interval".to_owned(),
177        ].into_iter().collect(),
178    ).unwrap();
179    // DorisConfig
180    map.try_insert(
181        std::any::type_name::<DorisConfig>().to_owned(),
182        [
183            "doris.stream_load.http.timeout.ms".to_owned(),
184        ].into_iter().collect(),
185    ).unwrap();
186    // IcebergConfig
187    map.try_insert(
188        std::any::type_name::<IcebergConfig>().to_owned(),
189        [
190            "commit_checkpoint_interval".to_owned(),
191            "enable_compaction".to_owned(),
192            "compaction_interval_sec".to_owned(),
193            "enable_snapshot_expiration".to_owned(),
194            "snapshot_expiration_max_age_millis".to_owned(),
195            "snapshot_expiration_retain_last".to_owned(),
196            "snapshot_expiration_clear_expired_files".to_owned(),
197            "snapshot_expiration_clear_expired_meta_data".to_owned(),
198            "compaction.max_snapshots_num".to_owned(),
199            "compaction.small_files_threshold_mb".to_owned(),
200            "compaction.delete_files_count_threshold".to_owned(),
201            "compaction.trigger_snapshot_count".to_owned(),
202            "compaction.target_file_size_mb".to_owned(),
203            "compaction.type".to_owned(),
204        ].into_iter().collect(),
205    ).unwrap();
206    // KafkaConfig
207    map.try_insert(
208        std::any::type_name::<KafkaConfig>().to_owned(),
209        [
210            "properties.sync.call.timeout".to_owned(),
211            "properties.security.protocol".to_owned(),
212            "properties.ssl.endpoint.identification.algorithm".to_owned(),
213            "properties.sasl.mechanism".to_owned(),
214            "properties.sasl.username".to_owned(),
215            "properties.sasl.password".to_owned(),
216            "properties.message.max.bytes".to_owned(),
217            "properties.receive.message.max.bytes".to_owned(),
218            "properties.statistics.interval.ms".to_owned(),
219            "properties.client.id".to_owned(),
220            "properties.enable.ssl.certificate.verification".to_owned(),
221            "properties.allow.auto.create.topics".to_owned(),
222            "properties.queue.buffering.max.messages".to_owned(),
223            "properties.queue.buffering.max.kbytes".to_owned(),
224            "properties.queue.buffering.max.ms".to_owned(),
225            "properties.enable.idempotence".to_owned(),
226            "properties.message.send.max.retries".to_owned(),
227            "properties.retry.backoff.ms".to_owned(),
228            "properties.batch.num.messages".to_owned(),
229            "properties.batch.size".to_owned(),
230            "properties.message.timeout.ms".to_owned(),
231            "properties.max.in.flight.requests.per.connection".to_owned(),
232            "properties.request.required.acks".to_owned(),
233        ].into_iter().collect(),
234    ).unwrap();
235    // SnowflakeV2Config
236    map.try_insert(
237        std::any::type_name::<SnowflakeV2Config>().to_owned(),
238        [
239            "commit_checkpoint_interval".to_owned(),
240        ].into_iter().collect(),
241    ).unwrap();
242    // StarrocksConfig
243    map.try_insert(
244        std::any::type_name::<StarrocksConfig>().to_owned(),
245        [
246            "starrocks.stream_load.http.timeout.ms".to_owned(),
247            "commit_checkpoint_interval".to_owned(),
248        ].into_iter().collect(),
249    ).unwrap();
250    map
251});
252
253/// Map of connection names to their `allow_alter_on_fly` field names
254pub static CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
255    use crate::connector_common::*;
256    let mut map = HashMap::new();
257    // KafkaConnection
258    map.try_insert(
259        std::any::type_name::<KafkaConnection>().to_owned(),
260        [
261            "properties.security.protocol".to_owned(),
262            "properties.ssl.endpoint.identification.algorithm".to_owned(),
263            "properties.sasl.mechanism".to_owned(),
264            "properties.sasl.username".to_owned(),
265            "properties.sasl.password".to_owned(),
266        ].into_iter().collect(),
267    ).unwrap();
268    map
269});
270
271/// Get all source connector names that have `allow_alter_on_fly` fields
272pub fn get_source_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
273    SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
274}
275
276/// Get all sink connector names that have `allow_alter_on_fly` fields
277pub fn get_sink_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
278    SINK_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
279}
280
281/// Get all connection names that have `allow_alter_on_fly` fields
282pub fn get_connection_names_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
283    CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
284}
285
286/// Checks if all given fields are allowed to be altered on the fly for the specified source connector.
287/// Returns Ok(()) if all fields are allowed, otherwise returns a `ConnectorError`.
288pub fn check_source_allow_alter_on_fly_fields(
289    connector_name: &str,
290    fields: &[String],
291) -> crate::error::ConnectorResult<()> {
292    // Convert connector name to the type name key
293    let Some(type_name) = source_properties::source_name_to_prop_type_name(connector_name) else {
294        return Err(ConnectorError::from(anyhow::anyhow!(
295            "Unknown source connector: {connector_name}"
296        )));
297    };
298    let Some(allowed_fields) = SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
299        return Err(ConnectorError::from(anyhow::anyhow!(
300            "No allow_alter_on_fly fields registered for connector: {connector_name}"
301        )));
302    };
303    for field in fields {
304        if !allowed_fields.contains(field) {
305            return Err(ConnectorError::from(anyhow::anyhow!(
306                "Field '{field}' is not allowed to be altered on the fly for connector: {connector_name}"
307            )));
308        }
309    }
310    Ok(())
311}
312
313pub fn check_connection_allow_alter_on_fly_fields(
314    connection_name: &str,
315    fields: &[String],
316) -> crate::error::ConnectorResult<()> {
317    use crate::source::connection_name_to_prop_type_name;
318
319    // Convert connection name to the type name key
320    let Some(type_name) = connection_name_to_prop_type_name(connection_name) else {
321        return Err(ConnectorError::from(anyhow::anyhow!(
322            "Unknown connection: {connection_name}"
323        )));
324    };
325    let Some(allowed_fields) = CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
326        return Err(ConnectorError::from(anyhow::anyhow!(
327            "No allow_alter_on_fly fields registered for connection: {connection_name}"
328        )));
329    };
330    for field in fields {
331        if !allowed_fields.contains(field) {
332            return Err(ConnectorError::from(anyhow::anyhow!(
333                "Field '{field}' is not allowed to be altered on the fly for connection: {connection_name}"
334            )));
335        }
336    }
337    Ok(())
338}
339
340/// Checks if all given fields are allowed to be altered on the fly for the specified sink connector.
341/// Returns Ok(()) if all fields are allowed, otherwise returns a `ConnectorError`.
342pub fn check_sink_allow_alter_on_fly_fields(
343    sink_name: &str,
344    fields: &[String],
345) -> crate::error::ConnectorResult<()> {
346    // Convert sink name to the type name key
347    let Some(type_name) = sink_properties::sink_name_to_config_type_name(sink_name) else {
348        return Err(ConnectorError::from(anyhow::anyhow!(
349            "Unknown sink connector: {sink_name}"
350        )));
351    };
352    let Some(allowed_fields) = SINK_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
353        return Err(ConnectorError::from(anyhow::anyhow!(
354            "No allow_alter_on_fly fields registered for sink: {sink_name}"
355        )));
356    };
357    for field in fields {
358        if !allowed_fields.contains(field) {
359            return Err(ConnectorError::from(anyhow::anyhow!(
360                "Field '{field}' is not allowed to be altered on the fly for sink: {sink_name}"
361            )));
362        }
363    }
364    Ok(())
365}