1#[macro_export]
16macro_rules! for_all_classified_sources {
17 ($macro:path $(, $extra_args:tt)*) => {
18 $macro! {
19 {
21 { Mysql },
22 { Postgres },
23 { Citus },
24 { Mongodb },
25 { SqlServer }
26 },
27 {
30 { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
31 { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
32 { Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
33 { Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
34 { Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
35 { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
36 { Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
37 { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
38 { S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
39 { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
40 { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
41 { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42 { Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
43 { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
44 { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
45 }
46 $(
47 ,$extra_args
48 )*
49 }
50 };
51}
52
53#[macro_export]
54macro_rules! for_all_connections {
55 ($macro:path $(, $extra_args:tt)*) => {
56 $macro! {
57 {
58 { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
59 { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
60 { SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
61 { Elasticsearch, $crate::connector_common::ElasticsearchConnection, risingwave_pb::catalog::connection_params::PbConnectionType }
62 }
63 $(,$extra_args)*
64 }
65 };
66}
67
68#[macro_export]
69macro_rules! for_all_sources_inner {
70 (
71 { $({ $cdc_source_type:ident }),* },
72 { $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
73 $macro:tt $(, $extra_args:tt)*
74 ) => {
75 $crate::paste! {
76 $macro! {
77 {
78 $(
79 {
80 [< $cdc_source_type Cdc >],
81 $crate::source::cdc::[< $cdc_source_type CdcProperties >],
82 $crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
83 }
84 ),*
85 ,
86 $(
87 { $source_variant, $prop_name, $split }
88 ),*
89 }
90 $(, $extra_args)*
91 }
92 }
93 };
94}
95
96#[macro_export]
97macro_rules! for_all_sources {
98 ($macro:path $(, $arg:tt )*) => {
99 $crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
100 };
101}
102
103#[macro_export]
137macro_rules! dispatch_source_enum_inner {
138 (
139 {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
140 $enum_type:ident,
141 $enum_value:expr,
142 $inner_name:ident,
143 $body:expr
144 ) => {{
145 match $enum_value {
146 $(
147 $enum_type::$source_variant($inner_name) => {
148 #[allow(dead_code)]
149 type PropType = $prop_name;
150 #[allow(dead_code)]
151 type SplitType = $split;
152 {
153 $body
154 }
155 },
156 )*
157 }
158 }}
159}
160
161#[macro_export]
180macro_rules! dispatch_source_enum {
181 ($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
182 $crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
183 }};
184}
185
186#[macro_export]
187macro_rules! match_source_name_str_inner {
188 (
189 {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
190 $source_name_str:expr,
191 $prop_type_name:ident,
192 $body:expr,
193 $on_other_closure:expr
194 ) => {{
195 match $source_name_str {
196 $(
197 <$prop_name>::SOURCE_NAME => {
198 type $prop_type_name = $prop_name;
199 {
200 $body
201 }
202 },
203 )*
204 other => ($on_other_closure)(other),
205 }
206 }}
207}
208
209#[macro_export]
211macro_rules! match_source_name_str {
212 ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
213 $crate::for_all_sources! {
214 $crate::match_source_name_str_inner,
215 { $source_name_str },
216 $prop_type_name,
217 { $body },
218 { $on_other_closure }
219 }
220 }};
221}
222
223#[macro_export]
225macro_rules! dispatch_split_impl {
226 ($impl:expr, | $inner_name:ident | $body:expr) => {{
227 use $crate::source::SplitImpl;
228 $crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
229 }};
230}
231
232#[macro_export]
233macro_rules! impl_connection {
234 ({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path }),*}) => {
235 pub fn build_connection(
236 pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType,
237 value_secret_filled: std::collections::BTreeMap<String, String>
238 ) -> $crate::error::ConnectorResult<Box<dyn $crate::connector_common::Connection>> {
239 match pb_connection_type {
240 $(
241 <$pb_connection_path>::$variant_name => {
242 let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?;
243 Ok(c)
244 },
245 )*
246 risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
247 }
248 }
249 }
250}
251
252#[macro_export]
253macro_rules! impl_split {
254 ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
255
256 #[derive(Debug, Clone, EnumAsInner, PartialEq)]
257 pub enum SplitImpl {
258 $(
259 $variant_name($split),
260 )*
261 }
262
263 $(
264 impl TryFrom<SplitImpl> for $split {
265 type Error = $crate::error::ConnectorError;
266
267 fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
268 match split {
269 SplitImpl::$variant_name(inner) => Ok(inner),
270 other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
271 }
272 }
273 }
274
275 impl From<$split> for SplitImpl {
276 fn from(split: $split) -> SplitImpl {
277 SplitImpl::$variant_name(split)
278 }
279 }
280
281 )*
282 }
283}
284
285#[macro_export]
287macro_rules! dispatch_source_prop {
288 ($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
289 use $crate::source::ConnectorProperties;
290 $crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
291 }};
292}
293
294#[macro_export]
295macro_rules! impl_connector_properties {
296 ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
297 #[derive(Clone, Debug)]
298 pub enum ConnectorProperties {
299 $(
300 $variant_name(Box<$prop_name>),
301 )*
302 }
303
304 $(
305 impl From<$prop_name> for ConnectorProperties {
306 fn from(prop: $prop_name) -> ConnectorProperties {
307 ConnectorProperties::$variant_name(Box::new(prop))
308 }
309 }
310 )*
311 }
312}
313
314#[macro_export]
315macro_rules! impl_cdc_source_type {
316 (
317 {$({$cdc_source_type:tt}),*},
318 {$($_ignore:tt),*}
319 ) => {
320 $(
321 $crate::paste!{
322 #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
323 pub struct $cdc_source_type;
324 impl CdcSourceTypeTrait for $cdc_source_type {
325 const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
326 fn source_type() -> CdcSourceType {
327 CdcSourceType::$cdc_source_type
328 }
329 }
330 pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
331 }
332 )*
333
334 #[derive(Clone)]
335 pub enum CdcSourceType {
336 $(
337 $cdc_source_type,
338 )*
339 Unspecified,
340 }
341
342 impl From<PbSourceType> for CdcSourceType {
343 fn from(value: PbSourceType) -> Self {
344 match value {
345 PbSourceType::Unspecified => CdcSourceType::Unspecified,
346 $(
347 PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
348 )*
349 }
350 }
351 }
352
353 impl From<CdcSourceType> for PbSourceType {
354 fn from(this: CdcSourceType) -> PbSourceType {
355 match this {
356 $(
357 CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
358 )*
359 CdcSourceType::Unspecified => PbSourceType::Unspecified,
360 }
361 }
362 }
363
364 }
365}