risingwave_connector/
macros.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[macro_export]
16macro_rules! for_all_classified_sources {
17    ($macro:path $(, $extra_args:tt)*) => {
18        $macro! {
19            // cdc sources
20            {
21                { Mysql },
22                { Postgres },
23                { Citus },
24                { Mongodb },
25                { SqlServer }
26            },
27            // other sources
28            // todo: file source do not nest with mq source.
29            {
30                { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
31                { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
32                { Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
33                { Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
34                { Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
35                { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
36                { Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
37                { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
38                { S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
39                { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
40                { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
41                { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42                { Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
43                { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
44                { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
45            }
46            $(
47                ,$extra_args
48            )*
49        }
50    };
51}
52
53#[macro_export]
54macro_rules! for_all_connections {
55    ($macro:path $(, $extra_args:tt)*) => {
56        $macro! {
57            {
58                { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
59                { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
60                { SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
61                { Elasticsearch, $crate::connector_common::ElasticsearchConnection, risingwave_pb::catalog::connection_params::PbConnectionType }
62            }
63            $(,$extra_args)*
64        }
65    };
66}
67
68#[macro_export]
69macro_rules! for_all_sources_inner {
70    (
71        { $({ $cdc_source_type:ident }),* },
72        { $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
73        $macro:tt $(, $extra_args:tt)*
74    ) => {
75        $crate::paste! {
76            $macro! {
77                {
78                    $(
79                        {
80                            [< $cdc_source_type Cdc >],
81                            $crate::source::cdc::[< $cdc_source_type CdcProperties >],
82                            $crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
83                        }
84                    ),*
85                    ,
86                    $(
87                        { $source_variant, $prop_name, $split }
88                    ),*
89                }
90                $(, $extra_args)*
91            }
92        }
93    };
94}
95
96#[macro_export]
97macro_rules! for_all_sources {
98    ($macro:path $(, $arg:tt )*) => {
99        $crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
100    };
101}
102
103/// The invocation:
104/// ```ignore
105/// dispatch_source_enum_inner!(
106///     {
107///         {A1,B1,C1},
108///         {A2,B2,C2}
109///     },
110///     EnumType, enum_value, inner_ident, body
111/// );
112/// ```
113/// expands to:
114/// ```ignore
115/// match enum_value {
116///     EnumType::A1(inner_ident) => {
117///         #[allow(dead_code)]
118///         type PropType = B1;
119///         #[allow(dead_code)]
120///         type SplitType = C1;
121///         {
122///             body
123///         }
124///     }
125///     EnumType::A2(inner_ident) => {
126///         #[allow(dead_code)]
127///         type PropType = B2;
128///         #[allow(dead_code)]
129///         type SplitType = C2;
130///         {
131///             body
132///         }
133///     }
134/// }
135/// ```
136#[macro_export]
137macro_rules! dispatch_source_enum_inner {
138    (
139        {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
140        $enum_type:ident,
141        $enum_value:expr,
142        $inner_name:ident,
143        $body:expr
144    ) => {{
145        match $enum_value {
146            $(
147                $enum_type::$source_variant($inner_name) => {
148                    #[allow(dead_code)]
149                    type PropType = $prop_name;
150                    #[allow(dead_code)]
151                    type SplitType = $split;
152                    {
153                        $body
154                    }
155                },
156            )*
157        }
158    }}
159}
160
161/// Usage: `dispatch_source_enum!(EnumType, enum_value, |inner_ident| body)`.
162///
163/// Inside `body`:
164/// - use `inner_ident` to represent the matched variant.
165/// - use `PropType` to represent the concrete property type.
166/// - use `SplitType` to represent the concrete split type.
167///
168/// Expands to:
169/// ```ignore
170/// match enum_value {
171///     EnumType::Variant1(inner_ident) => {
172///         body
173///     }
174///     ...
175/// }
176/// ```
177///
178/// Note: `inner_ident` must be passed as an argument due to macro hygiene.
179#[macro_export]
180macro_rules! dispatch_source_enum {
181    ($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
182        $crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
183    }};
184}
185
186#[macro_export]
187macro_rules! match_source_name_str_inner {
188    (
189        {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
190        $source_name_str:expr,
191        $prop_type_name:ident,
192        $body:expr,
193        $on_other_closure:expr
194    ) => {{
195        match $source_name_str {
196            $(
197                <$prop_name>::SOURCE_NAME => {
198                    type $prop_type_name = $prop_name;
199                    {
200                        $body
201                    }
202                },
203            )*
204            other => ($on_other_closure)(other),
205        }
206    }}
207}
208
209/// Matches against `SourceProperties::SOURCE_NAME` to dispatch logic.
210#[macro_export]
211macro_rules! match_source_name_str {
212    ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
213        $crate::for_all_sources! {
214            $crate::match_source_name_str_inner,
215            { $source_name_str },
216            $prop_type_name,
217            { $body },
218            { $on_other_closure }
219        }
220    }};
221}
222
223/// [`dispatch_source_enum`] with `SplitImpl` as the enum type.
224#[macro_export]
225macro_rules! dispatch_split_impl {
226    ($impl:expr, | $inner_name:ident | $body:expr) => {{
227        use $crate::source::SplitImpl;
228        $crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
229    }};
230}
231
232#[macro_export]
233macro_rules! impl_connection {
234    ({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path }),*}) => {
235        pub fn build_connection(
236            pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType,
237            value_secret_filled: std::collections::BTreeMap<String, String>
238        ) -> $crate::error::ConnectorResult<Box<dyn $crate::connector_common::Connection>> {
239            match pb_connection_type {
240                $(
241                    <$pb_connection_path>::$variant_name => {
242                        let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?;
243                        Ok(c)
244                    },
245                )*
246                risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
247            }
248        }
249    }
250}
251
252#[macro_export]
253macro_rules! impl_split {
254    ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
255
256        #[derive(Debug, Clone, EnumAsInner, PartialEq)]
257        pub enum SplitImpl {
258            $(
259                $variant_name($split),
260            )*
261        }
262
263        $(
264            impl TryFrom<SplitImpl> for $split {
265                type Error = $crate::error::ConnectorError;
266
267                fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
268                    match split {
269                        SplitImpl::$variant_name(inner) => Ok(inner),
270                        other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
271                    }
272                }
273            }
274
275            impl From<$split> for SplitImpl {
276                fn from(split: $split) -> SplitImpl {
277                    SplitImpl::$variant_name(split)
278                }
279            }
280
281        )*
282    }
283}
284
285/// [`dispatch_source_enum`] with `ConnectorProperties` as the enum type.
286#[macro_export]
287macro_rules! dispatch_source_prop {
288    ($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
289        use $crate::source::ConnectorProperties;
290        $crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
291    }};
292}
293
294#[macro_export]
295macro_rules! impl_connector_properties {
296    ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
297        #[derive(Clone, Debug)]
298        pub enum ConnectorProperties {
299            $(
300                $variant_name(Box<$prop_name>),
301            )*
302        }
303
304        $(
305            impl From<$prop_name> for ConnectorProperties {
306                fn from(prop: $prop_name) -> ConnectorProperties {
307                    ConnectorProperties::$variant_name(Box::new(prop))
308                }
309            }
310        )*
311    }
312}
313
314#[macro_export]
315macro_rules! impl_cdc_source_type {
316    (
317        {$({$cdc_source_type:tt}),*},
318        {$($_ignore:tt),*}
319    ) => {
320        $(
321            $crate::paste!{
322                #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
323                pub struct $cdc_source_type;
324                impl CdcSourceTypeTrait for $cdc_source_type {
325                    const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
326                    fn source_type() -> CdcSourceType {
327                        CdcSourceType::$cdc_source_type
328                    }
329                }
330                pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
331            }
332        )*
333
334        #[derive(Clone)]
335        pub enum CdcSourceType {
336            $(
337                $cdc_source_type,
338            )*
339            Unspecified,
340        }
341
342        impl From<PbSourceType> for CdcSourceType {
343            fn from(value: PbSourceType) -> Self {
344                match value {
345                    PbSourceType::Unspecified => CdcSourceType::Unspecified,
346                    $(
347                        PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
348                    )*
349                }
350            }
351        }
352
353        impl From<CdcSourceType> for PbSourceType {
354            fn from(this: CdcSourceType) -> PbSourceType {
355                match this {
356                    $(
357                        CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
358                    )*
359                   CdcSourceType::Unspecified => PbSourceType::Unspecified,
360                }
361            }
362        }
363
364    }
365}