1#[macro_export]
16macro_rules! for_all_classified_sources {
17 ($macro:path $(, $extra_args:tt)*) => {
18 $macro! {
19 {
21 { Mysql },
22 { Postgres },
23 { Citus },
24 { Mongodb },
25 { SqlServer }
26 },
27 {
30 { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
31 { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
32 { Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
33 { Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
34 { Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
35 { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
36 { Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
37 { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
38 { S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
39 { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
40 { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
41 { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42 { BatchPosixFs, $crate::source::filesystem::opendal_source::BatchPosixFsProperties, $crate::source::filesystem::opendal_source::BatchPosixFsSplit },
43 { Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
44 { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
45 { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit},
46 { AdbcSnowflake, $crate::source::adbc_snowflake::AdbcSnowflakeProperties, $crate::source::adbc_snowflake::AdbcSnowflakeSplit}
47 }
48 $(
49 ,$extra_args
50 )*
51 }
52 };
53}
54
55#[macro_export]
56macro_rules! for_all_connections {
57 ($macro:path $(, $extra_args:tt)*) => {
58 $macro! {
59 {
60 { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "kafka"},
61 { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "iceberg"},
62 { SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "schema_registry"},
63 { Elasticsearch, $crate::connector_common::ElasticsearchConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "elasticsearch"}
64 }
65 $(,$extra_args)*
66 }
67 };
68}
69
70#[macro_export]
71macro_rules! for_all_sources_inner {
72 (
73 { $({ $cdc_source_type:ident }),* },
74 { $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
75 $macro:tt $(, $extra_args:tt)*
76 ) => {
77 $crate::paste! {
78 $macro! {
79 {
80 $(
81 {
82 [< $cdc_source_type Cdc >],
83 $crate::source::cdc::[< $cdc_source_type CdcProperties >],
84 $crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
85 }
86 ),*
87 ,
88 $(
89 { $source_variant, $prop_name, $split }
90 ),*
91 }
92 $(, $extra_args)*
93 }
94 }
95 };
96}
97
98#[macro_export]
99macro_rules! for_all_sources {
100 ($macro:path $(, $arg:tt )*) => {
101 $crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
102 };
103}
104
105#[macro_export]
139macro_rules! dispatch_source_enum_inner {
140 (
141 {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
142 $enum_type:ident,
143 $enum_value:expr,
144 $inner_name:ident,
145 $body:expr
146 ) => {{
147 match $enum_value {
148 $(
149 $enum_type::$source_variant($inner_name) => {
150 #[allow(dead_code)]
151 type PropType = $prop_name;
152 #[allow(dead_code)]
153 type SplitType = $split;
154 {
155 $body
156 }
157 },
158 )*
159 }
160 }}
161}
162
163#[macro_export]
182macro_rules! dispatch_source_enum {
183 ($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
184 $crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
185 }};
186}
187
188#[macro_export]
189macro_rules! match_source_name_str_inner {
190 (
191 {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
192 $source_name_str:expr,
193 $prop_type_name:ident,
194 $body:expr,
195 $on_other_closure:expr
196 ) => {{
197 match $source_name_str {
198 $(
199 <$prop_name>::SOURCE_NAME => {
200 type $prop_type_name = $prop_name;
201 {
202 $body
203 }
204 },
205 )*
206 other => ($on_other_closure)(other),
207 }
208 }}
209}
210
211#[macro_export]
213macro_rules! match_source_name_str {
214 ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
215 $crate::for_all_sources! {
216 $crate::match_source_name_str_inner,
217 { $source_name_str },
218 $prop_type_name,
219 { $body },
220 { $on_other_closure }
221 }
222 }};
223}
224
225#[macro_export]
227macro_rules! dispatch_split_impl {
228 ($impl:expr, | $inner_name:ident | $body:expr) => {{
229 use $crate::source::SplitImpl;
230 $crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
231 }};
232}
233
234#[macro_export]
235macro_rules! impl_connection {
236 ({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path, $connection_type_name:literal }),*}) => {
237 pub fn connection_name_to_prop_type_name(connection_name: &str) -> Option<&'static str> {
239 match connection_name {
240 $(
241 $connection_type_name => Some(
242 std::any::type_name::<$connection_type>()
243 ),
244 )*
245 _ => None,
246 }
247 }
248
249 pub fn pb_connection_type_to_connection_type(pb_connection_type: &risingwave_pb::catalog::connection_params::PbConnectionType) -> Option<&'static str> {
250 match pb_connection_type {
251 $(
252 risingwave_pb::catalog::connection_params::PbConnectionType::$variant_name => Some($connection_type_name),
253 )*
254 _ => None,
255 }
256 }
257
258 pub fn build_connection(
259 pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType,
260 value_secret_filled: std::collections::BTreeMap<String, String>
261 ) -> $crate::error::ConnectorResult<Box<dyn $crate::connector_common::Connection>> {
262 match pb_connection_type {
263 $(
264 <$pb_connection_path>::$variant_name => {
265 let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?;
266 Ok(c)
267 },
268 )*
269 risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
270 }
271 }
272
273 pub fn enforce_secret_connection<'a>(
274 pb_connection_type: &risingwave_pb::catalog::connection_params::PbConnectionType,
275 prop_iter: impl Iterator<Item = &'a str>,
276 ) -> $crate::error::ConnectorResult<()> {
277 match pb_connection_type {
278 $(
279 <$pb_connection_path>::$variant_name => {
280 <$connection_type>::enforce_secret(prop_iter)
281 },
282 )*
283 risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
284 }
285 }
286 }
287}
288
289#[macro_export]
290macro_rules! impl_split {
291 ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
292
293 #[derive(Debug, Clone, EnumAsInner, PartialEq)]
294 pub enum SplitImpl {
295 $(
296 $variant_name($split),
297 )*
298 }
299
300 $(
301 impl TryFrom<SplitImpl> for $split {
302 type Error = $crate::error::ConnectorError;
303
304 fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
305 match split {
306 SplitImpl::$variant_name(inner) => Ok(inner),
307 other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
308 }
309 }
310 }
311
312 impl From<$split> for SplitImpl {
313 fn from(split: $split) -> SplitImpl {
314 SplitImpl::$variant_name(split)
315 }
316 }
317
318 )*
319 }
320}
321
322#[macro_export]
324macro_rules! dispatch_source_prop {
325 ($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
326 use $crate::source::ConnectorProperties;
327 $crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
328 }};
329}
330
331#[macro_export]
332macro_rules! impl_connector_properties {
333 ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
334 #[derive(Clone, Debug)]
335 pub enum ConnectorProperties {
336 $(
337 $variant_name(Box<$prop_name>),
338 )*
339 }
340
341 impl ConnectorProperties {
342 pub fn kind(&self) -> &'static str {
343 match self {
344 $(
345 ConnectorProperties::$variant_name(_) => stringify!($variant_name),
346 )*
347 }
348 }
349 }
350
351 $(
352 impl From<$prop_name> for ConnectorProperties {
353 fn from(prop: $prop_name) -> ConnectorProperties {
354 ConnectorProperties::$variant_name(Box::new(prop))
355 }
356 }
357 )*
358 }
359}
360
361#[macro_export]
362macro_rules! impl_cdc_source_type {
363 (
364 {$({$cdc_source_type:tt}),*},
365 {$($_ignore:tt),*}
366 ) => {
367 $(
368 $crate::paste!{
369 #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
370 pub struct $cdc_source_type;
371 impl CdcSourceTypeTrait for $cdc_source_type {
372 const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
373 fn source_type() -> CdcSourceType {
374 CdcSourceType::$cdc_source_type
375 }
376 }
377 pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
378 }
379 )*
380
381 #[derive(Clone)]
382 pub enum CdcSourceType {
383 $(
384 $cdc_source_type,
385 )*
386 Unspecified,
387 }
388
389 impl From<PbSourceType> for CdcSourceType {
390 fn from(value: PbSourceType) -> Self {
391 match value {
392 PbSourceType::Unspecified => CdcSourceType::Unspecified,
393 $(
394 PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
395 )*
396 }
397 }
398 }
399
400 impl From<CdcSourceType> for PbSourceType {
401 fn from(this: CdcSourceType) -> PbSourceType {
402 match this {
403 $(
404 CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
405 )*
406 CdcSourceType::Unspecified => PbSourceType::Unspecified,
407 }
408 }
409 }
410
411 }
412}