risingwave_connector/
macros.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[macro_export]
16macro_rules! for_all_classified_sources {
17    ($macro:path $(, $extra_args:tt)*) => {
18        $macro! {
19            // cdc sources
20            {
21                { Mysql },
22                { Postgres },
23                { Citus },
24                { Mongodb },
25                { SqlServer }
26            },
27            // other sources
28            // todo: file source do not nest with mq source.
29            {
30                { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
31                { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
32                { Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
33                { Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
34                { Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
35                { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
36                { Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
37                { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
38                { S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
39                { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
40                { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
41                { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42                { BatchPosixFs, $crate::source::filesystem::opendal_source::BatchPosixFsProperties, $crate::source::filesystem::opendal_source::BatchPosixFsSplit },
43                { Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
44                { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
45                { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
46            }
47            $(
48                ,$extra_args
49            )*
50        }
51    };
52}
53
54#[macro_export]
55macro_rules! for_all_connections {
56    ($macro:path $(, $extra_args:tt)*) => {
57        $macro! {
58            {
59                { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "kafka"},
60                { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "iceberg"},
61                { SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "schema_registry"},
62                { Elasticsearch, $crate::connector_common::ElasticsearchConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "elasticsearch"}
63            }
64            $(,$extra_args)*
65        }
66    };
67}
68
69#[macro_export]
70macro_rules! for_all_sources_inner {
71    (
72        { $({ $cdc_source_type:ident }),* },
73        { $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
74        $macro:tt $(, $extra_args:tt)*
75    ) => {
76        $crate::paste! {
77            $macro! {
78                {
79                    $(
80                        {
81                            [< $cdc_source_type Cdc >],
82                            $crate::source::cdc::[< $cdc_source_type CdcProperties >],
83                            $crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
84                        }
85                    ),*
86                    ,
87                    $(
88                        { $source_variant, $prop_name, $split }
89                    ),*
90                }
91                $(, $extra_args)*
92            }
93        }
94    };
95}
96
97#[macro_export]
98macro_rules! for_all_sources {
99    ($macro:path $(, $arg:tt )*) => {
100        $crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
101    };
102}
103
104/// The invocation:
105/// ```ignore
106/// dispatch_source_enum_inner!(
107///     {
108///         {A1,B1,C1},
109///         {A2,B2,C2}
110///     },
111///     EnumType, enum_value, inner_ident, body
112/// );
113/// ```
114/// expands to:
115/// ```ignore
116/// match enum_value {
117///     EnumType::A1(inner_ident) => {
118///         #[allow(dead_code)]
119///         type PropType = B1;
120///         #[allow(dead_code)]
121///         type SplitType = C1;
122///         {
123///             body
124///         }
125///     }
126///     EnumType::A2(inner_ident) => {
127///         #[allow(dead_code)]
128///         type PropType = B2;
129///         #[allow(dead_code)]
130///         type SplitType = C2;
131///         {
132///             body
133///         }
134///     }
135/// }
136/// ```
137#[macro_export]
138macro_rules! dispatch_source_enum_inner {
139    (
140        {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
141        $enum_type:ident,
142        $enum_value:expr,
143        $inner_name:ident,
144        $body:expr
145    ) => {{
146        match $enum_value {
147            $(
148                $enum_type::$source_variant($inner_name) => {
149                    #[allow(dead_code)]
150                    type PropType = $prop_name;
151                    #[allow(dead_code)]
152                    type SplitType = $split;
153                    {
154                        $body
155                    }
156                },
157            )*
158        }
159    }}
160}
161
162/// Usage: `dispatch_source_enum!(EnumType, enum_value, |inner_ident| body)`.
163///
164/// Inside `body`:
165/// - use `inner_ident` to represent the matched variant.
166/// - use `PropType` to represent the concrete property type.
167/// - use `SplitType` to represent the concrete split type.
168///
169/// Expands to:
170/// ```ignore
171/// match enum_value {
172///     EnumType::Variant1(inner_ident) => {
173///         body
174///     }
175///     ...
176/// }
177/// ```
178///
179/// Note: `inner_ident` must be passed as an argument due to macro hygiene.
180#[macro_export]
181macro_rules! dispatch_source_enum {
182    ($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
183        $crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
184    }};
185}
186
187#[macro_export]
188macro_rules! match_source_name_str_inner {
189    (
190        {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
191        $source_name_str:expr,
192        $prop_type_name:ident,
193        $body:expr,
194        $on_other_closure:expr
195    ) => {{
196        match $source_name_str {
197            $(
198                <$prop_name>::SOURCE_NAME => {
199                    type $prop_type_name = $prop_name;
200                    {
201                        $body
202                    }
203                },
204            )*
205            other => ($on_other_closure)(other),
206        }
207    }}
208}
209
210/// Matches against `SourceProperties::SOURCE_NAME` to dispatch logic.
211#[macro_export]
212macro_rules! match_source_name_str {
213    ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
214        $crate::for_all_sources! {
215            $crate::match_source_name_str_inner,
216            { $source_name_str },
217            $prop_type_name,
218            { $body },
219            { $on_other_closure }
220        }
221    }};
222}
223
224/// [`dispatch_source_enum`] with `SplitImpl` as the enum type.
225#[macro_export]
226macro_rules! dispatch_split_impl {
227    ($impl:expr, | $inner_name:ident | $body:expr) => {{
228        use $crate::source::SplitImpl;
229        $crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
230    }};
231}
232
233#[macro_export]
234macro_rules! impl_connection {
235    ({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path, $connection_type_name:literal }),*}) => {
236        // impl the connection name for all connections
237        pub fn connection_name_to_prop_type_name(connection_name: &str) -> Option<&'static str> {
238            match connection_name {
239                $(
240                    $connection_type_name => Some(
241                        std::any::type_name::<$connection_type>()
242                    ),
243                )*
244                _ => None,
245            }
246        }
247
248        pub fn build_connection(
249            pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType,
250            value_secret_filled: std::collections::BTreeMap<String, String>
251        ) -> $crate::error::ConnectorResult<Box<dyn $crate::connector_common::Connection>> {
252            match pb_connection_type {
253                $(
254                    <$pb_connection_path>::$variant_name => {
255                        let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?;
256                        Ok(c)
257                    },
258                )*
259                risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
260            }
261        }
262
263        pub fn enforce_secret_connection<'a>(
264            pb_connection_type: &risingwave_pb::catalog::connection_params::PbConnectionType,
265            prop_iter: impl Iterator<Item = &'a str>,
266        ) -> $crate::error::ConnectorResult<()> {
267            match pb_connection_type {
268                $(
269                    <$pb_connection_path>::$variant_name => {
270                        <$connection_type>::enforce_secret(prop_iter)
271                    },
272                )*
273                risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
274            }
275        }
276    }
277}
278
279#[macro_export]
280macro_rules! impl_split {
281    ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
282
283        #[derive(Debug, Clone, EnumAsInner, PartialEq)]
284        pub enum SplitImpl {
285            $(
286                $variant_name($split),
287            )*
288        }
289
290        $(
291            impl TryFrom<SplitImpl> for $split {
292                type Error = $crate::error::ConnectorError;
293
294                fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
295                    match split {
296                        SplitImpl::$variant_name(inner) => Ok(inner),
297                        other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
298                    }
299                }
300            }
301
302            impl From<$split> for SplitImpl {
303                fn from(split: $split) -> SplitImpl {
304                    SplitImpl::$variant_name(split)
305                }
306            }
307
308        )*
309    }
310}
311
312/// [`dispatch_source_enum`] with `ConnectorProperties` as the enum type.
313#[macro_export]
314macro_rules! dispatch_source_prop {
315    ($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
316        use $crate::source::ConnectorProperties;
317        $crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
318    }};
319}
320
321#[macro_export]
322macro_rules! impl_connector_properties {
323    ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
324        #[derive(Clone, Debug)]
325        pub enum ConnectorProperties {
326            $(
327                $variant_name(Box<$prop_name>),
328            )*
329        }
330
331        impl ConnectorProperties {
332            pub fn kind(&self) -> &'static str {
333                match self {
334                    $(
335                        ConnectorProperties::$variant_name(_) => stringify!($variant_name),
336                    )*
337                }
338            }
339        }
340
341        $(
342            impl From<$prop_name> for ConnectorProperties {
343                fn from(prop: $prop_name) -> ConnectorProperties {
344                    ConnectorProperties::$variant_name(Box::new(prop))
345                }
346            }
347        )*
348    }
349}
350
351#[macro_export]
352macro_rules! impl_cdc_source_type {
353    (
354        {$({$cdc_source_type:tt}),*},
355        {$($_ignore:tt),*}
356    ) => {
357        $(
358            $crate::paste!{
359                #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
360                pub struct $cdc_source_type;
361                impl CdcSourceTypeTrait for $cdc_source_type {
362                    const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
363                    fn source_type() -> CdcSourceType {
364                        CdcSourceType::$cdc_source_type
365                    }
366                }
367                pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
368            }
369        )*
370
371        #[derive(Clone)]
372        pub enum CdcSourceType {
373            $(
374                $cdc_source_type,
375            )*
376            Unspecified,
377        }
378
379        impl From<PbSourceType> for CdcSourceType {
380            fn from(value: PbSourceType) -> Self {
381                match value {
382                    PbSourceType::Unspecified => CdcSourceType::Unspecified,
383                    $(
384                        PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
385                    )*
386                }
387            }
388        }
389
390        impl From<CdcSourceType> for PbSourceType {
391            fn from(this: CdcSourceType) -> PbSourceType {
392                match this {
393                    $(
394                        CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
395                    )*
396                   CdcSourceType::Unspecified => PbSourceType::Unspecified,
397                }
398            }
399        }
400
401    }
402}