risingwave_connector/
allow_alter_on_fly_fields.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// THIS FILE IS AUTO_GENERATED. DO NOT EDIT
16// UPDATE WITH: ./risedev generate-with-options
17
18use std::collections::{HashMap, HashSet};
19use std::sync::LazyLock;
20use crate::error::ConnectorError;
21
22macro_rules! use_source_properties {
23    ({ $({ $variant_name:ident, $prop_name:ty, $split:ty }),* }) => {
24        $(
25            #[allow(unused_imports)]
26            pub(super) use $prop_name;
27        )*
28    };
29}
30
31mod source_properties {
32    use crate::for_all_sources;
33    use crate::source::base::SourceProperties;
34
35    for_all_sources!(use_source_properties);
36
37    /// Implements a function that maps a source name string to the Rust type name of the corresponding property type.
38    /// Usage: `impl_source_name_to_prop_type_name!();` will generate:
39    /// ```ignore
40    /// pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str>
41    /// ```
42    macro_rules! impl_source_name_to_prop_type_name_inner {
43        ({ $({$variant:ident, $prop_name:ty, $split:ty}),* }) => {
44            pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str> {
45                match source_name {
46                    $(
47                        <$prop_name>::SOURCE_NAME => Some(std::any::type_name::<$prop_name>()),
48                    )*
49                    _ => None,
50                }
51            }
52        };
53    }
54
55    macro_rules! impl_source_name_to_prop_type_name {
56        () => {
57            $crate::for_all_sources! { impl_source_name_to_prop_type_name_inner }
58        };
59    }
60
61    impl_source_name_to_prop_type_name!();
62}
63
64mod sink_properties {
65    use crate::use_all_sink_configs;
66    use crate::sink::Sink;
67    use crate::sink::file_sink::fs::FsSink;
68
69    use_all_sink_configs!();
70
71    macro_rules! impl_sink_name_to_config_type_name_inner {
72        ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
73            pub fn sink_name_to_config_type_name(sink_name: &str) -> Option<&'static str> {
74                match sink_name {
75                $(
76                    <$sink_type>::SINK_NAME => Some(std::any::type_name::<$config_type>()),
77                )*
78                    _ => None,
79                }
80            }
81        };
82    }
83
84    macro_rules! impl_sink_name_to_config_type_name {
85        () => {
86            $crate::for_all_sinks! { impl_sink_name_to_config_type_name_inner }
87        };
88    }
89
90    impl_sink_name_to_config_type_name!();
91}
92
93/// Map of source connector names to their `allow_alter_on_fly` field names
94pub static SOURCE_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
95    use source_properties::*;
96    let mut map = HashMap::new();
97    // CDC Properties - added for schema.change.failure.policy
98    map.try_insert(
99        std::any::type_name::<MysqlCdcProperties>().to_owned(),
100        ["cdc.source.wait.streaming.start.timeout".to_owned()].into_iter().collect(),
101    ).unwrap();
102    map.try_insert(
103        std::any::type_name::<PostgresCdcProperties>().to_owned(),
104        ["cdc.source.wait.streaming.start.timeout".to_owned()].into_iter().collect(),
105    ).unwrap();
106    map.try_insert(
107        std::any::type_name::<SqlServerCdcProperties>().to_owned(),
108        ["cdc.source.wait.streaming.start.timeout".to_owned()].into_iter().collect(),
109    ).unwrap();
110
111    map.try_insert(
112        std::any::type_name::<MongodbCdcProperties>().to_owned(),
113        ["cdc.source.wait.streaming.start.timeout".to_owned()].into_iter().collect(),
114    ).unwrap();
115    // KafkaProperties
116    map.try_insert(
117        std::any::type_name::<KafkaProperties>().to_owned(),
118        [
119            "group.id.prefix".to_owned(),
120            "properties.sync.call.timeout".to_owned(),
121            "properties.security.protocol".to_owned(),
122            "properties.ssl.endpoint.identification.algorithm".to_owned(),
123            "properties.sasl.mechanism".to_owned(),
124            "properties.sasl.username".to_owned(),
125            "properties.sasl.password".to_owned(),
126            "properties.message.max.bytes".to_owned(),
127            "properties.receive.message.max.bytes".to_owned(),
128            "properties.statistics.interval.ms".to_owned(),
129            "properties.client.id".to_owned(),
130            "properties.enable.ssl.certificate.verification".to_owned(),
131            "properties.queued.min.messages".to_owned(),
132            "properties.queued.max.messages.kbytes".to_owned(),
133            "properties.fetch.wait.max.ms".to_owned(),
134            "properties.fetch.queue.backoff.ms".to_owned(),
135            "properties.fetch.max.bytes".to_owned(),
136            "properties.enable.auto.commit".to_owned(),
137        ].into_iter().collect(),
138    ).unwrap();
139    map
140});
141
142/// Map of sink connector names to their `allow_alter_on_fly` field names
143pub static SINK_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
144    use sink_properties::*;
145    let mut map = HashMap::new();
146    // ClickHouseConfig
147    map.try_insert(
148        std::any::type_name::<ClickHouseConfig>().to_owned(),
149        [
150            "commit_checkpoint_interval".to_owned(),
151        ].into_iter().collect(),
152    ).unwrap();
153    // DeltaLakeConfig
154    map.try_insert(
155        std::any::type_name::<DeltaLakeConfig>().to_owned(),
156        [
157            "commit_checkpoint_interval".to_owned(),
158        ].into_iter().collect(),
159    ).unwrap();
160    // IcebergConfig
161    map.try_insert(
162        std::any::type_name::<IcebergConfig>().to_owned(),
163        [
164            "commit_checkpoint_interval".to_owned(),
165            "enable_compaction".to_owned(),
166            "compaction_interval_sec".to_owned(),
167            "enable_snapshot_expiration".to_owned(),
168        ].into_iter().collect(),
169    ).unwrap();
170    // KafkaConfig
171    map.try_insert(
172        std::any::type_name::<KafkaConfig>().to_owned(),
173        [
174            "properties.sync.call.timeout".to_owned(),
175            "properties.security.protocol".to_owned(),
176            "properties.ssl.endpoint.identification.algorithm".to_owned(),
177            "properties.sasl.mechanism".to_owned(),
178            "properties.sasl.username".to_owned(),
179            "properties.sasl.password".to_owned(),
180            "properties.message.max.bytes".to_owned(),
181            "properties.receive.message.max.bytes".to_owned(),
182            "properties.statistics.interval.ms".to_owned(),
183            "properties.client.id".to_owned(),
184            "properties.enable.ssl.certificate.verification".to_owned(),
185            "properties.allow.auto.create.topics".to_owned(),
186            "properties.queue.buffering.max.messages".to_owned(),
187            "properties.queue.buffering.max.kbytes".to_owned(),
188            "properties.queue.buffering.max.ms".to_owned(),
189            "properties.enable.idempotence".to_owned(),
190            "properties.message.send.max.retries".to_owned(),
191            "properties.retry.backoff.ms".to_owned(),
192            "properties.batch.num.messages".to_owned(),
193            "properties.batch.size".to_owned(),
194            "properties.message.timeout.ms".to_owned(),
195            "properties.max.in.flight.requests.per.connection".to_owned(),
196            "properties.request.required.acks".to_owned(),
197        ].into_iter().collect(),
198    ).unwrap();
199    // StarrocksConfig
200    map.try_insert(
201        std::any::type_name::<StarrocksConfig>().to_owned(),
202        [
203            "commit_checkpoint_interval".to_owned(),
204        ].into_iter().collect(),
205    ).unwrap();
206    map
207});
208
209/// Map of connection names to their `allow_alter_on_fly` field names
210pub static CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
211    use crate::connector_common::*;
212    let mut map = HashMap::new();
213    // KafkaConnection
214    map.try_insert(
215        std::any::type_name::<KafkaConnection>().to_owned(),
216        [
217            "properties.security.protocol".to_owned(),
218            "properties.ssl.endpoint.identification.algorithm".to_owned(),
219            "properties.sasl.mechanism".to_owned(),
220            "properties.sasl.username".to_owned(),
221            "properties.sasl.password".to_owned(),
222        ].into_iter().collect(),
223    ).unwrap();
224    map
225});
226
227/// Get all source connector names that have `allow_alter_on_fly` fields
228pub fn get_source_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
229    SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
230}
231
232/// Get all sink connector names that have `allow_alter_on_fly` fields
233pub fn get_sink_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
234    SINK_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
235}
236
237/// Get all connection names that have `allow_alter_on_fly` fields
238pub fn get_connection_names_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
239    CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
240}
241
242/// Checks if all given fields are allowed to be altered on the fly for the specified source connector.
243/// Returns Ok(()) if all fields are allowed, otherwise returns a `ConnectorError`.
244pub fn check_source_allow_alter_on_fly_fields(
245    connector_name: &str,
246    fields: &[String],
247) -> crate::error::ConnectorResult<()> {
248    // Convert connector name to the type name key
249    let Some(type_name) = source_properties::source_name_to_prop_type_name(connector_name) else {
250        return Err(ConnectorError::from(anyhow::anyhow!(
251            "Unknown source connector: {connector_name}"
252        )));
253    };
254    let Some(allowed_fields) = SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
255        return Err(ConnectorError::from(anyhow::anyhow!(
256            "No allow_alter_on_fly fields registered for connector: {connector_name}"
257        )));
258    };
259    for field in fields {
260        if !allowed_fields.contains(field) {
261            return Err(ConnectorError::from(anyhow::anyhow!(
262                "Field '{field}' is not allowed to be altered on the fly for connector: {connector_name}"
263            )));
264        }
265    }
266    Ok(())
267}
268
269pub fn check_connection_allow_alter_on_fly_fields(
270    connection_name: &str,
271    fields: &[String],
272) -> crate::error::ConnectorResult<()> {
273    use crate::source::connection_name_to_prop_type_name;
274
275    // Convert connection name to the type name key
276    let Some(type_name) = connection_name_to_prop_type_name(connection_name) else {
277        return Err(ConnectorError::from(anyhow::anyhow!(
278            "Unknown connection: {connection_name}"
279        )));
280    };
281    let Some(allowed_fields) = CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
282        return Err(ConnectorError::from(anyhow::anyhow!(
283            "No allow_alter_on_fly fields registered for connection: {connection_name}"
284        )));
285    };
286    for field in fields {
287        if !allowed_fields.contains(field) {
288            return Err(ConnectorError::from(anyhow::anyhow!(
289                "Field '{field}' is not allowed to be altered on the fly for connection: {connection_name}"
290            )));
291        }
292    }
293    Ok(())
294}
295
296/// Checks if all given fields are allowed to be altered on the fly for the specified sink connector.
297/// Returns Ok(()) if all fields are allowed, otherwise returns a `ConnectorError`.
298pub fn check_sink_allow_alter_on_fly_fields(
299    sink_name: &str,
300    fields: &[String],
301) -> crate::error::ConnectorResult<()> {
302    // Convert sink name to the type name key
303    let Some(type_name) = sink_properties::sink_name_to_config_type_name(sink_name) else {
304        return Err(ConnectorError::from(anyhow::anyhow!(
305            "Unknown sink connector: {sink_name}"
306        )));
307    };
308    let Some(allowed_fields) = SINK_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
309        return Err(ConnectorError::from(anyhow::anyhow!(
310            "No allow_alter_on_fly fields registered for sink: {sink_name}"
311        )));
312    };
313    for field in fields {
314        if !allowed_fields.contains(field) {
315            return Err(ConnectorError::from(anyhow::anyhow!(
316                "Field '{field}' is not allowed to be altered on the fly for sink: {sink_name}"
317            )));
318        }
319    }
320    Ok(())
321}
322