risingwave_connector/
macros.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[macro_export]
16macro_rules! for_all_classified_sources {
17    ($macro:path $(, $extra_args:tt)*) => {
18        $macro! {
19            // cdc sources
20            {
21                { Mysql },
22                { Postgres },
23                { Citus },
24                { Mongodb },
25                { SqlServer }
26            },
27            // other sources
28            // todo: file source do not nest with mq source.
29            {
30                { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
31                { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
32                { Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
33                { Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
34                { Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
35                { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
36                { Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
37                { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
38                { S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
39                { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
40                { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
41                { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42                { BatchPosixFs, $crate::source::filesystem::opendal_source::BatchPosixFsProperties, $crate::source::filesystem::opendal_source::BatchPosixFsSplit },
43                { Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
44                { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
45                { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit},
46                { AdbcSnowflake, $crate::source::adbc_snowflake::AdbcSnowflakeProperties, $crate::source::adbc_snowflake::AdbcSnowflakeSplit}
47            }
48            $(
49                ,$extra_args
50            )*
51        }
52    };
53}
54
55#[macro_export]
56macro_rules! for_all_connections {
57    ($macro:path $(, $extra_args:tt)*) => {
58        $macro! {
59            {
60                { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "kafka"},
61                { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "iceberg"},
62                { SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "schema_registry"},
63                { Elasticsearch, $crate::connector_common::ElasticsearchConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "elasticsearch"}
64            }
65            $(,$extra_args)*
66        }
67    };
68}
69
70#[macro_export]
71macro_rules! for_all_sources_inner {
72    (
73        { $({ $cdc_source_type:ident }),* },
74        { $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
75        $macro:tt $(, $extra_args:tt)*
76    ) => {
77        $crate::paste! {
78            $macro! {
79                {
80                    $(
81                        {
82                            [< $cdc_source_type Cdc >],
83                            $crate::source::cdc::[< $cdc_source_type CdcProperties >],
84                            $crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
85                        }
86                    ),*
87                    ,
88                    $(
89                        { $source_variant, $prop_name, $split }
90                    ),*
91                }
92                $(, $extra_args)*
93            }
94        }
95    };
96}
97
98#[macro_export]
99macro_rules! for_all_sources {
100    ($macro:path $(, $arg:tt )*) => {
101        $crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
102    };
103}
104
105/// The invocation:
106/// ```ignore
107/// dispatch_source_enum_inner!(
108///     {
109///         {A1,B1,C1},
110///         {A2,B2,C2}
111///     },
112///     EnumType, enum_value, inner_ident, body
113/// );
114/// ```
115/// expands to:
116/// ```ignore
117/// match enum_value {
118///     EnumType::A1(inner_ident) => {
119///         #[allow(dead_code)]
120///         type PropType = B1;
121///         #[allow(dead_code)]
122///         type SplitType = C1;
123///         {
124///             body
125///         }
126///     }
127///     EnumType::A2(inner_ident) => {
128///         #[allow(dead_code)]
129///         type PropType = B2;
130///         #[allow(dead_code)]
131///         type SplitType = C2;
132///         {
133///             body
134///         }
135///     }
136/// }
137/// ```
138#[macro_export]
139macro_rules! dispatch_source_enum_inner {
140    (
141        {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
142        $enum_type:ident,
143        $enum_value:expr,
144        $inner_name:ident,
145        $body:expr
146    ) => {{
147        match $enum_value {
148            $(
149                $enum_type::$source_variant($inner_name) => {
150                    #[allow(dead_code)]
151                    type PropType = $prop_name;
152                    #[allow(dead_code)]
153                    type SplitType = $split;
154                    {
155                        $body
156                    }
157                },
158            )*
159        }
160    }}
161}
162
163/// Usage: `dispatch_source_enum!(EnumType, enum_value, |inner_ident| body)`.
164///
165/// Inside `body`:
166/// - use `inner_ident` to represent the matched variant.
167/// - use `PropType` to represent the concrete property type.
168/// - use `SplitType` to represent the concrete split type.
169///
170/// Expands to:
171/// ```ignore
172/// match enum_value {
173///     EnumType::Variant1(inner_ident) => {
174///         body
175///     }
176///     ...
177/// }
178/// ```
179///
180/// Note: `inner_ident` must be passed as an argument due to macro hygiene.
181#[macro_export]
182macro_rules! dispatch_source_enum {
183    ($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
184        $crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
185    }};
186}
187
188#[macro_export]
189macro_rules! match_source_name_str_inner {
190    (
191        {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
192        $source_name_str:expr,
193        $prop_type_name:ident,
194        $body:expr,
195        $on_other_closure:expr
196    ) => {{
197        match $source_name_str {
198            $(
199                <$prop_name>::SOURCE_NAME => {
200                    type $prop_type_name = $prop_name;
201                    {
202                        $body
203                    }
204                },
205            )*
206            other => ($on_other_closure)(other),
207        }
208    }}
209}
210
211/// Matches against `SourceProperties::SOURCE_NAME` to dispatch logic.
212#[macro_export]
213macro_rules! match_source_name_str {
214    ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
215        $crate::for_all_sources! {
216            $crate::match_source_name_str_inner,
217            { $source_name_str },
218            $prop_type_name,
219            { $body },
220            { $on_other_closure }
221        }
222    }};
223}
224
225/// [`dispatch_source_enum`] with `SplitImpl` as the enum type.
226#[macro_export]
227macro_rules! dispatch_split_impl {
228    ($impl:expr, | $inner_name:ident | $body:expr) => {{
229        use $crate::source::SplitImpl;
230        $crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
231    }};
232}
233
234#[macro_export]
235macro_rules! impl_connection {
236    ({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path, $connection_type_name:literal }),*}) => {
237        // impl the connection name for all connections
238        pub fn connection_name_to_prop_type_name(connection_name: &str) -> Option<&'static str> {
239            match connection_name {
240                $(
241                    $connection_type_name => Some(
242                        std::any::type_name::<$connection_type>()
243                    ),
244                )*
245                _ => None,
246            }
247        }
248
249        pub fn pb_connection_type_to_connection_type(pb_connection_type: &risingwave_pb::catalog::connection_params::PbConnectionType) -> Option<&'static str> {
250            match pb_connection_type {
251                $(
252                    risingwave_pb::catalog::connection_params::PbConnectionType::$variant_name => Some($connection_type_name),
253                )*
254                _ => None,
255            }
256        }
257
258        pub fn build_connection(
259            pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType,
260            value_secret_filled: std::collections::BTreeMap<String, String>
261        ) -> $crate::error::ConnectorResult<Box<dyn $crate::connector_common::Connection>> {
262            match pb_connection_type {
263                $(
264                    <$pb_connection_path>::$variant_name => {
265                        let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?;
266                        Ok(c)
267                    },
268                )*
269                risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
270            }
271        }
272
273        pub fn enforce_secret_connection<'a>(
274            pb_connection_type: &risingwave_pb::catalog::connection_params::PbConnectionType,
275            prop_iter: impl Iterator<Item = &'a str>,
276        ) -> $crate::error::ConnectorResult<()> {
277            match pb_connection_type {
278                $(
279                    <$pb_connection_path>::$variant_name => {
280                        <$connection_type>::enforce_secret(prop_iter)
281                    },
282                )*
283                risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
284            }
285        }
286    }
287}
288
289#[macro_export]
290macro_rules! impl_split {
291    ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
292
293        #[derive(Debug, Clone, EnumAsInner, PartialEq)]
294        pub enum SplitImpl {
295            $(
296                $variant_name($split),
297            )*
298        }
299
300        $(
301            impl TryFrom<SplitImpl> for $split {
302                type Error = $crate::error::ConnectorError;
303
304                fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
305                    match split {
306                        SplitImpl::$variant_name(inner) => Ok(inner),
307                        other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
308                    }
309                }
310            }
311
312            impl From<$split> for SplitImpl {
313                fn from(split: $split) -> SplitImpl {
314                    SplitImpl::$variant_name(split)
315                }
316            }
317
318        )*
319    }
320}
321
322/// [`dispatch_source_enum`] with `ConnectorProperties` as the enum type.
323#[macro_export]
324macro_rules! dispatch_source_prop {
325    ($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
326        use $crate::source::ConnectorProperties;
327        $crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
328    }};
329}
330
331#[macro_export]
332macro_rules! impl_connector_properties {
333    ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
334        #[derive(Clone, Debug)]
335        pub enum ConnectorProperties {
336            $(
337                $variant_name(Box<$prop_name>),
338            )*
339        }
340
341        impl ConnectorProperties {
342            pub fn kind(&self) -> &'static str {
343                match self {
344                    $(
345                        ConnectorProperties::$variant_name(_) => stringify!($variant_name),
346                    )*
347                }
348            }
349        }
350
351        $(
352            impl From<$prop_name> for ConnectorProperties {
353                fn from(prop: $prop_name) -> ConnectorProperties {
354                    ConnectorProperties::$variant_name(Box::new(prop))
355                }
356            }
357        )*
358    }
359}
360
361#[macro_export]
362macro_rules! impl_cdc_source_type {
363    (
364        {$({$cdc_source_type:tt}),*},
365        {$($_ignore:tt),*}
366    ) => {
367        $(
368            $crate::paste!{
369                #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
370                pub struct $cdc_source_type;
371                impl CdcSourceTypeTrait for $cdc_source_type {
372                    const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
373                    fn source_type() -> CdcSourceType {
374                        CdcSourceType::$cdc_source_type
375                    }
376                }
377                pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
378            }
379        )*
380
381        #[derive(Clone)]
382        pub enum CdcSourceType {
383            $(
384                $cdc_source_type,
385            )*
386            Unspecified,
387        }
388
389        impl From<PbSourceType> for CdcSourceType {
390            fn from(value: PbSourceType) -> Self {
391                match value {
392                    PbSourceType::Unspecified => CdcSourceType::Unspecified,
393                    $(
394                        PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
395                    )*
396                }
397            }
398        }
399
400        impl From<CdcSourceType> for PbSourceType {
401            fn from(this: CdcSourceType) -> PbSourceType {
402                match this {
403                    $(
404                        CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
405                    )*
406                   CdcSourceType::Unspecified => PbSourceType::Unspecified,
407                }
408            }
409        }
410
411    }
412}