1#[macro_export]
16macro_rules! for_all_classified_sources {
17 ($macro:path $(, $extra_args:tt)*) => {
18 $macro! {
19 {
21 { Mysql },
22 { Postgres },
23 { Citus },
24 { Mongodb },
25 { SqlServer }
26 },
27 {
30 { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
31 { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
32 { Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
33 { Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
34 { Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
35 { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
36 { Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
37 { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
38 { S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
39 { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
40 { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
41 { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42 { BatchPosixFs, $crate::source::filesystem::opendal_source::BatchPosixFsProperties, $crate::source::filesystem::opendal_source::BatchPosixFsSplit },
43 { Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
44 { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
45 { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
46 }
47 $(
48 ,$extra_args
49 )*
50 }
51 };
52}
53
54#[macro_export]
55macro_rules! for_all_connections {
56 ($macro:path $(, $extra_args:tt)*) => {
57 $macro! {
58 {
59 { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "kafka"},
60 { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "iceberg"},
61 { SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "schema_registry"},
62 { Elasticsearch, $crate::connector_common::ElasticsearchConnection, risingwave_pb::catalog::connection_params::PbConnectionType, "elasticsearch"}
63 }
64 $(,$extra_args)*
65 }
66 };
67}
68
69#[macro_export]
70macro_rules! for_all_sources_inner {
71 (
72 { $({ $cdc_source_type:ident }),* },
73 { $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
74 $macro:tt $(, $extra_args:tt)*
75 ) => {
76 $crate::paste! {
77 $macro! {
78 {
79 $(
80 {
81 [< $cdc_source_type Cdc >],
82 $crate::source::cdc::[< $cdc_source_type CdcProperties >],
83 $crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
84 }
85 ),*
86 ,
87 $(
88 { $source_variant, $prop_name, $split }
89 ),*
90 }
91 $(, $extra_args)*
92 }
93 }
94 };
95}
96
97#[macro_export]
98macro_rules! for_all_sources {
99 ($macro:path $(, $arg:tt )*) => {
100 $crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
101 };
102}
103
104#[macro_export]
138macro_rules! dispatch_source_enum_inner {
139 (
140 {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
141 $enum_type:ident,
142 $enum_value:expr,
143 $inner_name:ident,
144 $body:expr
145 ) => {{
146 match $enum_value {
147 $(
148 $enum_type::$source_variant($inner_name) => {
149 #[allow(dead_code)]
150 type PropType = $prop_name;
151 #[allow(dead_code)]
152 type SplitType = $split;
153 {
154 $body
155 }
156 },
157 )*
158 }
159 }}
160}
161
162#[macro_export]
181macro_rules! dispatch_source_enum {
182 ($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
183 $crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
184 }};
185}
186
187#[macro_export]
188macro_rules! match_source_name_str_inner {
189 (
190 {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
191 $source_name_str:expr,
192 $prop_type_name:ident,
193 $body:expr,
194 $on_other_closure:expr
195 ) => {{
196 match $source_name_str {
197 $(
198 <$prop_name>::SOURCE_NAME => {
199 type $prop_type_name = $prop_name;
200 {
201 $body
202 }
203 },
204 )*
205 other => ($on_other_closure)(other),
206 }
207 }}
208}
209
210#[macro_export]
212macro_rules! match_source_name_str {
213 ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
214 $crate::for_all_sources! {
215 $crate::match_source_name_str_inner,
216 { $source_name_str },
217 $prop_type_name,
218 { $body },
219 { $on_other_closure }
220 }
221 }};
222}
223
224#[macro_export]
226macro_rules! dispatch_split_impl {
227 ($impl:expr, | $inner_name:ident | $body:expr) => {{
228 use $crate::source::SplitImpl;
229 $crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
230 }};
231}
232
233#[macro_export]
234macro_rules! impl_connection {
235 ({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path, $connection_type_name:literal }),*}) => {
236 pub fn connection_name_to_prop_type_name(connection_name: &str) -> Option<&'static str> {
238 match connection_name {
239 $(
240 $connection_type_name => Some(
241 std::any::type_name::<$connection_type>()
242 ),
243 )*
244 _ => None,
245 }
246 }
247
248 pub fn build_connection(
249 pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType,
250 value_secret_filled: std::collections::BTreeMap<String, String>
251 ) -> $crate::error::ConnectorResult<Box<dyn $crate::connector_common::Connection>> {
252 match pb_connection_type {
253 $(
254 <$pb_connection_path>::$variant_name => {
255 let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?;
256 Ok(c)
257 },
258 )*
259 risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
260 }
261 }
262
263 pub fn enforce_secret_connection<'a>(
264 pb_connection_type: &risingwave_pb::catalog::connection_params::PbConnectionType,
265 prop_iter: impl Iterator<Item = &'a str>,
266 ) -> $crate::error::ConnectorResult<()> {
267 match pb_connection_type {
268 $(
269 <$pb_connection_path>::$variant_name => {
270 <$connection_type>::enforce_secret(prop_iter)
271 },
272 )*
273 risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
274 }
275 }
276 }
277}
278
279#[macro_export]
280macro_rules! impl_split {
281 ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
282
283 #[derive(Debug, Clone, EnumAsInner, PartialEq)]
284 pub enum SplitImpl {
285 $(
286 $variant_name($split),
287 )*
288 }
289
290 $(
291 impl TryFrom<SplitImpl> for $split {
292 type Error = $crate::error::ConnectorError;
293
294 fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
295 match split {
296 SplitImpl::$variant_name(inner) => Ok(inner),
297 other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
298 }
299 }
300 }
301
302 impl From<$split> for SplitImpl {
303 fn from(split: $split) -> SplitImpl {
304 SplitImpl::$variant_name(split)
305 }
306 }
307
308 )*
309 }
310}
311
312#[macro_export]
314macro_rules! dispatch_source_prop {
315 ($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
316 use $crate::source::ConnectorProperties;
317 $crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
318 }};
319}
320
321#[macro_export]
322macro_rules! impl_connector_properties {
323 ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
324 #[derive(Clone, Debug)]
325 pub enum ConnectorProperties {
326 $(
327 $variant_name(Box<$prop_name>),
328 )*
329 }
330
331 impl ConnectorProperties {
332 pub fn kind(&self) -> &'static str {
333 match self {
334 $(
335 ConnectorProperties::$variant_name(_) => stringify!($variant_name),
336 )*
337 }
338 }
339 }
340
341 $(
342 impl From<$prop_name> for ConnectorProperties {
343 fn from(prop: $prop_name) -> ConnectorProperties {
344 ConnectorProperties::$variant_name(Box::new(prop))
345 }
346 }
347 )*
348 }
349}
350
351#[macro_export]
352macro_rules! impl_cdc_source_type {
353 (
354 {$({$cdc_source_type:tt}),*},
355 {$($_ignore:tt),*}
356 ) => {
357 $(
358 $crate::paste!{
359 #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
360 pub struct $cdc_source_type;
361 impl CdcSourceTypeTrait for $cdc_source_type {
362 const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
363 fn source_type() -> CdcSourceType {
364 CdcSourceType::$cdc_source_type
365 }
366 }
367 pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
368 }
369 )*
370
371 #[derive(Clone)]
372 pub enum CdcSourceType {
373 $(
374 $cdc_source_type,
375 )*
376 Unspecified,
377 }
378
379 impl From<PbSourceType> for CdcSourceType {
380 fn from(value: PbSourceType) -> Self {
381 match value {
382 PbSourceType::Unspecified => CdcSourceType::Unspecified,
383 $(
384 PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
385 )*
386 }
387 }
388 }
389
390 impl From<CdcSourceType> for PbSourceType {
391 fn from(this: CdcSourceType) -> PbSourceType {
392 match this {
393 $(
394 CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
395 )*
396 CdcSourceType::Unspecified => PbSourceType::Unspecified,
397 }
398 }
399 }
400
401 }
402}