risingwave_connector/
allow_alter_on_fly_fields.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// THIS FILE IS AUTO_GENERATED. DO NOT EDIT
16// UPDATE WITH: ./risedev generate-with-options
17// This file is rewritten by `tests::test_allow_alter_on_fly_fields_rust_up_to_date` with
18// `UPDATE_EXPECT=1`.
19// To update content, change source/sink/connection WITH options definitions (for example,
20// `#[with_option(allow_alter_on_fly)]` on struct fields), then run `./risedev generate-with-options`.
21// `./risedev generate-with-options` runs two UPDATE_EXPECT tests:
22// 1) refresh `with_options_{source,sink,connection}.yaml`;
23// 2) regenerate this file from those YAML files.
24
25#![rustfmt::skip]
26
27use std::collections::{HashMap, HashSet};
28use std::sync::LazyLock;
29use crate::error::ConnectorError;
30use crate::sink::remote::JdbcSink;
31use crate::sink::Sink;
32
33macro_rules! use_source_properties {
34    ({ $({ $variant_name:ident, $prop_name:ty, $split:ty }),* }) => {
35        $(
36            #[allow(unused_imports)]
37            pub(super) use $prop_name;
38        )*
39    };
40}
41
42mod source_properties {
43    use crate::for_all_sources;
44    use crate::source::base::SourceProperties;
45
46    for_all_sources!(use_source_properties);
47
48    /// Implements a function that maps a source name string to the Rust type name of the corresponding property type.
49    /// Usage: `impl_source_name_to_prop_type_name!();` will generate:
50    /// ```ignore
51    /// pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str>
52    /// ```
53    macro_rules! impl_source_name_to_prop_type_name_inner {
54        ({ $({$variant:ident, $prop_name:ty, $split:ty}),* }) => {
55            pub fn source_name_to_prop_type_name(source_name: &str) -> Option<&'static str> {
56                match source_name {
57                    $(
58                        <$prop_name>::SOURCE_NAME => Some(std::any::type_name::<$prop_name>()),
59                    )*
60                    _ => None,
61                }
62            }
63        };
64    }
65
66    macro_rules! impl_source_name_to_prop_type_name {
67        () => {
68            $crate::for_all_sources! { impl_source_name_to_prop_type_name_inner }
69        };
70    }
71
72    impl_source_name_to_prop_type_name!();
73}
74
75mod sink_properties {
76    use crate::use_all_sink_configs;
77    use crate::sink::Sink;
78    use crate::sink::file_sink::fs::FsSink;
79
80    use_all_sink_configs!();
81
82    macro_rules! impl_sink_name_to_config_type_name_inner {
83        ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
84            pub fn sink_name_to_config_type_name(sink_name: &str) -> Option<&'static str> {
85                match sink_name {
86                $(
87                    <$sink_type>::SINK_NAME => Some(std::any::type_name::<$config_type>()),
88                )*
89                    _ => None,
90                }
91            }
92        };
93    }
94
95    macro_rules! impl_sink_name_to_config_type_name {
96        () => {
97            $crate::for_all_sinks! { impl_sink_name_to_config_type_name_inner }
98        };
99    }
100
101    impl_sink_name_to_config_type_name!();
102}
103
104/// Map of source connector names to their `allow_alter_on_fly` field names
105pub static SOURCE_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
106    use source_properties::*;
107    let mut map = HashMap::new();
108    // CDC Properties - added for schema.change.failure.policy
109    map.try_insert(
110        std::any::type_name::<MysqlCdcProperties>().to_owned(),
111        [
112            "cdc.source.wait.streaming.start.timeout".to_owned(),
113            "debezium.max.queue.size".to_owned(),
114            "debezium.queue.memory.ratio".to_owned(),
115            "password".to_owned(),
116        ].into_iter().collect(),
117    ).unwrap();
118    map.try_insert(
119        std::any::type_name::<PostgresCdcProperties>().to_owned(),
120        [
121            "cdc.source.wait.streaming.start.timeout".to_owned(),
122            "debezium.max.queue.size".to_owned(),
123            "debezium.queue.memory.ratio".to_owned(),
124            "debezium.heartbeat.interval.ms".to_owned(),
125            "password".to_owned(),
126        ].into_iter().collect(),
127    ).unwrap();
128    map.try_insert(
129        std::any::type_name::<SqlServerCdcProperties>().to_owned(),
130        [
131            "cdc.source.wait.streaming.start.timeout".to_owned(),
132            "debezium.max.queue.size".to_owned(),
133            "debezium.queue.memory.ratio".to_owned(),
134            "password".to_owned(),
135        ].into_iter().collect(),
136    ).unwrap();
137
138    map.try_insert(
139        std::any::type_name::<MongodbCdcProperties>().to_owned(),
140        [
141            "cdc.source.wait.streaming.start.timeout".to_owned(),
142            "debezium.max.queue.size".to_owned(),
143            "debezium.queue.memory.ratio".to_owned(),
144        ].into_iter().collect(),
145    ).unwrap();
146    // KafkaProperties
147    map.try_insert(
148        std::any::type_name::<KafkaProperties>().to_owned(),
149        [
150            "group.id.prefix".to_owned(),
151            "properties.sync.call.timeout".to_owned(),
152            "properties.security.protocol".to_owned(),
153            "properties.ssl.endpoint.identification.algorithm".to_owned(),
154            "properties.ssl.ca.location".to_owned(),
155            "properties.ssl.ca.pem".to_owned(),
156            "properties.ssl.certificate.location".to_owned(),
157            "properties.ssl.certificate.pem".to_owned(),
158            "properties.ssl.key.location".to_owned(),
159            "properties.ssl.key.pem".to_owned(),
160            "properties.ssl.key.password".to_owned(),
161            "properties.sasl.mechanism".to_owned(),
162            "properties.sasl.username".to_owned(),
163            "properties.sasl.password".to_owned(),
164            "properties.message.max.bytes".to_owned(),
165            "properties.receive.message.max.bytes".to_owned(),
166            "properties.statistics.interval.ms".to_owned(),
167            "properties.client.id".to_owned(),
168            "properties.enable.ssl.certificate.verification".to_owned(),
169            "properties.queued.min.messages".to_owned(),
170            "properties.queued.max.messages.kbytes".to_owned(),
171            "properties.fetch.wait.max.ms".to_owned(),
172            "properties.fetch.queue.backoff.ms".to_owned(),
173            "properties.fetch.max.bytes".to_owned(),
174            "properties.enable.auto.commit".to_owned(),
175        ].into_iter().collect(),
176    ).unwrap();
177    // PubsubProperties
178    map.try_insert(
179        std::any::type_name::<PubsubProperties>().to_owned(),
180        [
181            "pubsub.ack_deadline_seconds".to_owned(),
182        ].into_iter().collect(),
183    ).unwrap();
184    // PulsarProperties
185    map.try_insert(
186        std::any::type_name::<PulsarProperties>().to_owned(),
187        [
188            "pulsar.operation.retry.max.retries".to_owned(),
189            "pulsar.operation.retry.delay".to_owned(),
190        ].into_iter().collect(),
191    ).unwrap();
192    map
193});
194
195/// Map of sink connector names to their `allow_alter_on_fly` field names
196pub static SINK_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
197    use sink_properties::*;
198    let mut map = HashMap::new();
199    // ClickHouseConfig
200    map.try_insert(
201        std::any::type_name::<ClickHouseConfig>().to_owned(),
202        [
203            "commit_checkpoint_interval".to_owned(),
204        ].into_iter().collect(),
205    ).unwrap();
206    // DeltaLakeConfig
207    map.try_insert(
208        std::any::type_name::<DeltaLakeConfig>().to_owned(),
209        [
210            "commit_checkpoint_interval".to_owned(),
211        ].into_iter().collect(),
212    ).unwrap();
213    // DorisConfig
214    map.try_insert(
215        std::any::type_name::<DorisConfig>().to_owned(),
216        [
217            "doris.stream_load.http.timeout.ms".to_owned(),
218        ].into_iter().collect(),
219    ).unwrap();
220    // IcebergConfig
221    map.try_insert(
222        std::any::type_name::<IcebergConfig>().to_owned(),
223        [
224            "commit_checkpoint_interval".to_owned(),
225            "enable_compaction".to_owned(),
226            "compaction_interval_sec".to_owned(),
227            "enable_snapshot_expiration".to_owned(),
228            "snapshot_expiration_max_age_millis".to_owned(),
229            "snapshot_expiration_retain_last".to_owned(),
230            "snapshot_expiration_clear_expired_files".to_owned(),
231            "snapshot_expiration_clear_expired_meta_data".to_owned(),
232            "compaction.max_snapshots_num".to_owned(),
233            "compaction.small_files_threshold_mb".to_owned(),
234            "compaction.delete_files_count_threshold".to_owned(),
235            "compaction.trigger_snapshot_count".to_owned(),
236            "compaction.target_file_size_mb".to_owned(),
237            "compaction.type".to_owned(),
238            "compaction.write_parquet_compression".to_owned(),
239            "compaction.write_parquet_max_row_group_rows".to_owned(),
240            "compaction.write_parquet_max_row_group_bytes".to_owned(),
241        ].into_iter().collect(),
242    ).unwrap();
243    // KafkaConfig
244    map.try_insert(
245        std::any::type_name::<KafkaConfig>().to_owned(),
246        [
247            "properties.sync.call.timeout".to_owned(),
248            "properties.security.protocol".to_owned(),
249            "properties.ssl.endpoint.identification.algorithm".to_owned(),
250            "properties.ssl.ca.location".to_owned(),
251            "properties.ssl.ca.pem".to_owned(),
252            "properties.ssl.certificate.location".to_owned(),
253            "properties.ssl.certificate.pem".to_owned(),
254            "properties.ssl.key.location".to_owned(),
255            "properties.ssl.key.pem".to_owned(),
256            "properties.ssl.key.password".to_owned(),
257            "properties.sasl.mechanism".to_owned(),
258            "properties.sasl.username".to_owned(),
259            "properties.sasl.password".to_owned(),
260            "properties.message.max.bytes".to_owned(),
261            "properties.receive.message.max.bytes".to_owned(),
262            "properties.statistics.interval.ms".to_owned(),
263            "properties.client.id".to_owned(),
264            "properties.enable.ssl.certificate.verification".to_owned(),
265            "properties.allow.auto.create.topics".to_owned(),
266            "properties.queue.buffering.max.messages".to_owned(),
267            "properties.queue.buffering.max.kbytes".to_owned(),
268            "properties.queue.buffering.max.ms".to_owned(),
269            "properties.enable.idempotence".to_owned(),
270            "properties.message.send.max.retries".to_owned(),
271            "properties.retry.backoff.ms".to_owned(),
272            "properties.batch.num.messages".to_owned(),
273            "properties.batch.size".to_owned(),
274            "properties.message.timeout.ms".to_owned(),
275            "properties.max.in.flight.requests.per.connection".to_owned(),
276            "properties.request.required.acks".to_owned(),
277        ].into_iter().collect(),
278    ).unwrap();
279    // SnowflakeV2Config
280    map.try_insert(
281        std::any::type_name::<SnowflakeV2Config>().to_owned(),
282        [
283            "commit_checkpoint_interval".to_owned(),
284        ].into_iter().collect(),
285    ).unwrap();
286    // StarrocksConfig
287    map.try_insert(
288        std::any::type_name::<StarrocksConfig>().to_owned(),
289        [
290            "starrocks.stream_load.http.timeout.ms".to_owned(),
291            "commit_checkpoint_interval".to_owned(),
292        ].into_iter().collect(),
293    ).unwrap();
294    // Jdbc
295    map.try_insert(
296        JdbcSink::SINK_NAME.to_owned(),
297        [
298            "jdbc.url".to_owned(),
299            "user".to_owned(),
300            "password".to_owned(),
301        ].into_iter().collect(),
302    ).unwrap();
303    map
304});
305
306/// Map of connection names to their `allow_alter_on_fly` field names
307pub static CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS: LazyLock<HashMap<String, HashSet<String>>> = LazyLock::new(|| {
308    use crate::connector_common::*;
309    let mut map = HashMap::new();
310    // KafkaConnection
311    map.try_insert(
312        std::any::type_name::<KafkaConnection>().to_owned(),
313        [
314            "properties.security.protocol".to_owned(),
315            "properties.ssl.endpoint.identification.algorithm".to_owned(),
316            "properties.ssl.ca.location".to_owned(),
317            "properties.ssl.ca.pem".to_owned(),
318            "properties.ssl.certificate.location".to_owned(),
319            "properties.ssl.certificate.pem".to_owned(),
320            "properties.ssl.key.location".to_owned(),
321            "properties.ssl.key.pem".to_owned(),
322            "properties.ssl.key.password".to_owned(),
323            "properties.sasl.mechanism".to_owned(),
324            "properties.sasl.username".to_owned(),
325            "properties.sasl.password".to_owned(),
326        ].into_iter().collect(),
327    ).unwrap();
328    // Jdbc
329    map.try_insert(
330        JdbcSink::SINK_NAME.to_owned(),
331        [
332            "jdbc.url".to_owned(),
333            "user".to_owned(),
334            "password".to_owned(),
335        ].into_iter().collect(),
336    ).unwrap();
337    map
338});
339
340/// Get all source connector names that have `allow_alter_on_fly` fields
341pub fn get_source_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
342    SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
343}
344
345/// Get all sink connector names that have `allow_alter_on_fly` fields
346pub fn get_sink_connectors_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
347    SINK_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
348}
349
350/// Get all connection names that have `allow_alter_on_fly` fields
351pub fn get_connection_names_with_allow_alter_on_fly_fields() -> Vec<&'static str> {
352    CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.keys().map(|s| s.as_str()).collect()
353}
354
355/// Checks if all given fields are allowed to be altered on the fly for the specified source connector.
356/// Returns Ok(()) if all fields are allowed, otherwise returns a `ConnectorError`.
357pub fn check_source_allow_alter_on_fly_fields(
358    connector_name: &str,
359    fields: &[String],
360) -> crate::error::ConnectorResult<()> {
361    // Convert connector name to the type name key
362    let Some(type_name) = source_properties::source_name_to_prop_type_name(connector_name) else {
363        return Err(ConnectorError::from(anyhow::anyhow!(
364            "Unknown source connector: {connector_name}"
365        )));
366    };
367    let Some(allowed_fields) = SOURCE_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
368    return Err(ConnectorError::from(anyhow::anyhow!(
369        "No allow_alter_on_fly fields registered for connector: {connector_name}"
370    )));
371    };
372    for field in fields {
373        if !allowed_fields.contains(field) {
374            return Err(ConnectorError::from(anyhow::anyhow!(
375                "Field '{field}' is not allowed to be altered on the fly for connector: {connector_name}"
376            )));
377        }
378    }
379    Ok(())
380}
381
382pub fn check_connection_allow_alter_on_fly_fields(
383    connection_name: &str,
384    fields: &[String],
385) -> crate::error::ConnectorResult<()> {
386    use crate::source::connection_name_to_prop_type_name;
387
388    // Convert connection name to the type name key
389    let Some(type_name) = connection_name_to_prop_type_name(connection_name) else {
390        return Err(ConnectorError::from(anyhow::anyhow!(
391            "Unknown connection: {connection_name}"
392        )));
393    };
394    let Some(allowed_fields) = CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name) else {
395        return Err(ConnectorError::from(anyhow::anyhow!(
396            "No allow_alter_on_fly fields registered for connection: {connection_name}"
397        )));
398    };
399    for field in fields {
400        if !allowed_fields.contains(field) {
401            return Err(ConnectorError::from(anyhow::anyhow!(
402                "Field '{field}' is not allowed to be altered on the fly for connection: {connection_name}"
403            )));
404        }
405    }
406    Ok(())
407}
408
409/// Checks if all given fields are allowed to be altered on the fly for the specified sink connector.
410/// Returns Ok(()) if all fields are allowed, otherwise returns a `ConnectorError`.
411pub fn check_sink_allow_alter_on_fly_fields(
412    sink_name: &str,
413    fields: &[String],
414) -> crate::error::ConnectorResult<()> {
415    // TODO(#24846): JDBC sink currently uses `()` as sink config type in `for_all_sinks!`,
416    // so it cannot have an isolated key in `SINK_ALLOW_ALTER_ON_FLY_FIELDS`.
417    // Reuse the JDBC entry in `CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS` for now.
418    // TODO(#24846): remove this special case after JDBC sink has a dedicated config type
419    // and allow-alter fields are generated directly into `SINK_ALLOW_ALTER_ON_FLY_FIELDS`.
420    let allowed_fields = if sink_name == JdbcSink::SINK_NAME {
421        CONNECTION_ALLOW_ALTER_ON_FLY_FIELDS.get(JdbcSink::SINK_NAME)
422    } else {
423        // Convert sink name to the type name key
424        let Some(type_name) = sink_properties::sink_name_to_config_type_name(sink_name) else {
425            return Err(ConnectorError::from(anyhow::anyhow!(
426                "Unknown sink connector: {sink_name}"
427            )));
428        };
429        SINK_ALLOW_ALTER_ON_FLY_FIELDS.get(type_name)
430    };
431    let Some(allowed_fields) = allowed_fields else {
432        return Err(ConnectorError::from(anyhow::anyhow!(
433            "No allow_alter_on_fly fields registered for sink: {sink_name}"
434        )));
435    };
436    for field in fields {
437        if !allowed_fields.contains(field) {
438            return Err(ConnectorError::from(anyhow::anyhow!(
439                "Field '{field}' is not allowed to be altered on the fly for sink: {sink_name}"
440            )));
441        }
442    }
443    Ok(())
444}