1#[macro_export]
16macro_rules! for_all_classified_sources {
17 ($macro:path $(, $extra_args:tt)*) => {
18 $macro! {
19 {
21 { Mysql },
22 { Postgres },
23 { Citus },
24 { Mongodb },
25 { SqlServer }
26 },
27 {
30 { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
31 { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
32 { Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
33 { Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
34 { Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
35 { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
36 { Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
37 { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
38 { S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
39 { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
40 { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
41 { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
42 { Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
43 { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
44 { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
45 }
46 $(
47 ,$extra_args
48 )*
49 }
50 };
51}
52
53#[macro_export]
54macro_rules! for_all_connections {
55 ($macro:path $(, $extra_args:tt)*) => {
56 $macro! {
57 {
58 { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
59 { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
60 { SchemaRegistry, $crate::connector_common::ConfluentSchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
61 { Elasticsearch, $crate::connector_common::ElasticsearchConnection, risingwave_pb::catalog::connection_params::PbConnectionType }
62 }
63 $(,$extra_args)*
64 }
65 };
66}
67
68#[macro_export]
69macro_rules! for_all_sources_inner {
70 (
71 { $({ $cdc_source_type:ident }),* },
72 { $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
73 $macro:tt $(, $extra_args:tt)*
74 ) => {
75 $crate::paste! {
76 $macro! {
77 {
78 $(
79 {
80 [< $cdc_source_type Cdc >],
81 $crate::source::cdc::[< $cdc_source_type CdcProperties >],
82 $crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
83 }
84 ),*
85 ,
86 $(
87 { $source_variant, $prop_name, $split }
88 ),*
89 }
90 $(, $extra_args)*
91 }
92 }
93 };
94}
95
96#[macro_export]
97macro_rules! for_all_sources {
98 ($macro:path $(, $arg:tt )*) => {
99 $crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
100 };
101}
102
103#[macro_export]
137macro_rules! dispatch_source_enum_inner {
138 (
139 {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
140 $enum_type:ident,
141 $enum_value:expr,
142 $inner_name:ident,
143 $body:expr
144 ) => {{
145 match $enum_value {
146 $(
147 $enum_type::$source_variant($inner_name) => {
148 #[allow(dead_code)]
149 type PropType = $prop_name;
150 #[allow(dead_code)]
151 type SplitType = $split;
152 {
153 $body
154 }
155 },
156 )*
157 }
158 }}
159}
160
161#[macro_export]
180macro_rules! dispatch_source_enum {
181 ($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
182 $crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
183 }};
184}
185
186#[macro_export]
187macro_rules! match_source_name_str_inner {
188 (
189 {$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
190 $source_name_str:expr,
191 $prop_type_name:ident,
192 $body:expr,
193 $on_other_closure:expr
194 ) => {{
195 match $source_name_str {
196 $(
197 <$prop_name>::SOURCE_NAME => {
198 type $prop_type_name = $prop_name;
199 {
200 $body
201 }
202 },
203 )*
204 other => ($on_other_closure)(other),
205 }
206 }}
207}
208
209#[macro_export]
211macro_rules! match_source_name_str {
212 ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
213 $crate::for_all_sources! {
214 $crate::match_source_name_str_inner,
215 { $source_name_str },
216 $prop_type_name,
217 { $body },
218 { $on_other_closure }
219 }
220 }};
221}
222
223#[macro_export]
225macro_rules! dispatch_split_impl {
226 ($impl:expr, | $inner_name:ident | $body:expr) => {{
227 use $crate::source::SplitImpl;
228 $crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
229 }};
230}
231
232#[macro_export]
233macro_rules! impl_connection {
234 ({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path }),*}) => {
235 pub fn build_connection(
236 pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType,
237 value_secret_filled: std::collections::BTreeMap<String, String>
238 ) -> $crate::error::ConnectorResult<Box<dyn $crate::connector_common::Connection>> {
239 match pb_connection_type {
240 $(
241 <$pb_connection_path>::$variant_name => {
242 let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?;
243 Ok(c)
244 },
245 )*
246 risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
247 }
248 }
249
250 pub fn enforce_secret_connection<'a>(
251 pb_connection_type: &risingwave_pb::catalog::connection_params::PbConnectionType,
252 prop_iter: impl Iterator<Item = &'a str>,
253 ) -> $crate::error::ConnectorResult<()> {
254 match pb_connection_type {
255 $(
256 <$pb_connection_path>::$variant_name => {
257 <$connection_type>::enforce_secret(prop_iter)
258 },
259 )*
260 risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
261 }
262 }
263 }
264}
265
266#[macro_export]
267macro_rules! impl_split {
268 ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
269
270 #[derive(Debug, Clone, EnumAsInner, PartialEq)]
271 pub enum SplitImpl {
272 $(
273 $variant_name($split),
274 )*
275 }
276
277 $(
278 impl TryFrom<SplitImpl> for $split {
279 type Error = $crate::error::ConnectorError;
280
281 fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
282 match split {
283 SplitImpl::$variant_name(inner) => Ok(inner),
284 other => risingwave_common::bail!("expect {} but get {:?}", stringify!($split), other),
285 }
286 }
287 }
288
289 impl From<$split> for SplitImpl {
290 fn from(split: $split) -> SplitImpl {
291 SplitImpl::$variant_name(split)
292 }
293 }
294
295 )*
296 }
297}
298
299#[macro_export]
301macro_rules! dispatch_source_prop {
302 ($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
303 use $crate::source::ConnectorProperties;
304 $crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
305 }};
306}
307
308#[macro_export]
309macro_rules! impl_connector_properties {
310 ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
311 #[derive(Clone, Debug)]
312 pub enum ConnectorProperties {
313 $(
314 $variant_name(Box<$prop_name>),
315 )*
316 }
317
318 $(
319 impl From<$prop_name> for ConnectorProperties {
320 fn from(prop: $prop_name) -> ConnectorProperties {
321 ConnectorProperties::$variant_name(Box::new(prop))
322 }
323 }
324 )*
325 }
326}
327
328#[macro_export]
329macro_rules! impl_cdc_source_type {
330 (
331 {$({$cdc_source_type:tt}),*},
332 {$($_ignore:tt),*}
333 ) => {
334 $(
335 $crate::paste!{
336 #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
337 pub struct $cdc_source_type;
338 impl CdcSourceTypeTrait for $cdc_source_type {
339 const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
340 fn source_type() -> CdcSourceType {
341 CdcSourceType::$cdc_source_type
342 }
343 }
344 pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
345 }
346 )*
347
348 #[derive(Clone)]
349 pub enum CdcSourceType {
350 $(
351 $cdc_source_type,
352 )*
353 Unspecified,
354 }
355
356 impl From<PbSourceType> for CdcSourceType {
357 fn from(value: PbSourceType) -> Self {
358 match value {
359 PbSourceType::Unspecified => CdcSourceType::Unspecified,
360 $(
361 PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
362 )*
363 }
364 }
365 }
366
367 impl From<CdcSourceType> for PbSourceType {
368 fn from(this: CdcSourceType) -> PbSourceType {
369 match this {
370 $(
371 CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
372 )*
373 CdcSourceType::Unspecified => PbSourceType::Unspecified,
374 }
375 }
376 }
377
378 }
379}