risingwave_connector/source/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/// Define a source module that is gated by a feature.
16///
17/// This is to allow some heavy or unpopular source implementations (and their dependencies) to be disabled
18/// at compile time, in order to decrease compilation time and binary size.
19///
20/// When the feature is disabled, this macro generates standalone dummy implementations for the source,
21/// which return errors indicating the feature is not enabled. The generated types are concrete structs
22/// (not type aliases) to avoid conflicts with macro-generated `TryFrom`/`From` impls in `impl_split`.
23macro_rules! feature_gated_source_mod {
24    ($mod_name:ident, $source_name:literal) => {
25        crate::source::utils::feature_gated_source_mod!($mod_name, $mod_name, $source_name);
26    };
27    ($mod_name:ident, $struct_prefix:ident, $source_name:literal) => {
28        paste::paste! {
29        #[cfg(feature = "source-" $source_name)]
30        pub mod $mod_name;
31
32        #[cfg(not(feature = "source-" $source_name))]
33        pub mod $mod_name {
34            use std::collections::HashMap;
35
36            use anyhow::anyhow;
37            use async_trait::async_trait;
38            use risingwave_common::types::JsonbVal;
39            use serde::{Deserialize, Serialize};
40
41            use crate::error::{ConnectorError, ConnectorResult};
42            use crate::parser::ParserConfig;
43            use crate::source::{
44                BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
45                SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
46            };
47            pub const [<$source_name:upper _CONNECTOR>]: &'static str = $source_name;
48
49            fn err_feature_not_enabled() -> ConnectorError {
50                ConnectorError::from(anyhow!(
51                    "Feature `source-{}` is not enabled at compile time. \
52                    Please enable it in `Cargo.toml` and rebuild.",
53                    $source_name
54                ))
55            }
56
57            #[doc = "A dummy source properties that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
58            #[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
59            pub struct [<$struct_prefix:camel Properties>] {
60                #[serde(flatten)]
61                pub unknown_fields: HashMap<String, String>,
62            }
63
64            impl crate::enforce_secret::EnforceSecret for [<$struct_prefix:camel Properties>] {
65                const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {};
66            }
67
68            impl UnknownFields for [<$struct_prefix:camel Properties>] {
69                fn unknown_fields(&self) -> HashMap<String, String> {
70                    self.unknown_fields.clone()
71                }
72            }
73
74            impl SourceProperties for [<$struct_prefix:camel Properties>] {
75                type Split = [<$struct_prefix:camel Split>];
76                type SplitEnumerator = [<$struct_prefix:camel SplitEnumerator>];
77                type SplitReader = [<$struct_prefix:camel SplitReader>];
78
79                const SOURCE_NAME: &'static str = [<$source_name:upper _CONNECTOR>];
80            }
81
82            #[doc = "A dummy split that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
83            #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash)]
84            pub struct [<$struct_prefix:camel Split>] {
85                _private: (),
86            }
87
88            impl SplitMetaData for [<$struct_prefix:camel Split>] {
89                fn id(&self) -> SplitId {
90                    "feature_not_enabled".into()
91                }
92
93                fn encode_to_json(&self) -> JsonbVal {
94                    serde_json::to_value(self).unwrap().into()
95                }
96
97                fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
98                    Err(err_feature_not_enabled())
99                }
100
101                fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
102                    Err(err_feature_not_enabled())
103                }
104            }
105
106            #[doc = "A dummy split enumerator that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
107            pub struct [<$struct_prefix:camel SplitEnumerator>];
108
109            #[async_trait]
110            impl SplitEnumerator for [<$struct_prefix:camel SplitEnumerator>] {
111                type Properties = [<$struct_prefix:camel Properties>];
112                type Split = [<$struct_prefix:camel Split>];
113
114                async fn new(
115                    _properties: Self::Properties,
116                    _context: SourceEnumeratorContextRef,
117                ) -> ConnectorResult<Self> {
118                    Err(err_feature_not_enabled())
119                }
120
121                async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
122                    Err(err_feature_not_enabled())
123                }
124            }
125
126            #[doc = "A dummy split reader that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
127            pub struct [<$struct_prefix:camel SplitReader>];
128
129            #[async_trait]
130            impl SplitReader for [<$struct_prefix:camel SplitReader>] {
131                type Properties = [<$struct_prefix:camel Properties>];
132                type Split = [<$struct_prefix:camel Split>];
133
134                async fn new(
135                    _properties: Self::Properties,
136                    _splits: Vec<Self::Split>,
137                    _parser_config: ParserConfig,
138                    _source_ctx: SourceContextRef,
139                    _columns: Option<Vec<Column>>,
140                ) -> ConnectorResult<Self> {
141                    Err(err_feature_not_enabled())
142                }
143
144                fn into_stream(self) -> BoxSourceChunkStream {
145                    Box::pin(futures::stream::once(async {
146                        Err(err_feature_not_enabled())
147                    }))
148                }
149            }
150        }
151        }
152    };
153}
154pub(super) use feature_gated_source_mod;