risingwave_connector/source/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/// Define a source module that is gated by a feature.
16///
17/// This is to allow some heavy or unpopular source implementations (and their dependencies) to be disabled
18/// at compile time, in order to decrease compilation time and binary size.
19///
20/// When the feature is disabled, this macro generates standalone dummy implementations for the source,
21/// which return errors indicating the feature is not enabled. The generated types are concrete structs
22/// (not type aliases) to avoid conflicts with macro-generated `TryFrom`/`From` impls in `impl_split`.
23#[allow(unused_macros)]
24macro_rules! feature_gated_source_mod {
25    ($mod_name:ident, $source_name:literal) => {
26        crate::source::utils::feature_gated_source_mod!($mod_name, $mod_name, $source_name);
27    };
28    ($mod_name:ident, $struct_prefix:ident, $source_name:literal) => {
29        paste::paste! {
30        #[cfg(feature = "source-" $source_name)]
31        pub mod $mod_name;
32
33        #[cfg(not(feature = "source-" $source_name))]
34        pub mod $mod_name {
35            use std::collections::HashMap;
36
37            use anyhow::anyhow;
38            use async_trait::async_trait;
39            use risingwave_common::types::JsonbVal;
40            use serde::{Deserialize, Serialize};
41
42            use crate::error::{ConnectorError, ConnectorResult};
43            use crate::parser::ParserConfig;
44            use crate::source::{
45                BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
46                SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
47            };
48            pub const [<$source_name:upper _CONNECTOR>]: &'static str = $source_name;
49
50            fn err_feature_not_enabled() -> ConnectorError {
51                ConnectorError::from(anyhow!(
52                    "Feature `source-{}` is not enabled at compile time. \
53                    Please enable it in `Cargo.toml` and rebuild.",
54                    $source_name
55                ))
56            }
57
58            #[doc = "A dummy source properties that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
59            #[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
60            pub struct [<$struct_prefix:camel Properties>] {
61                #[serde(flatten)]
62                pub unknown_fields: HashMap<String, String>,
63            }
64
65            impl crate::enforce_secret::EnforceSecret for [<$struct_prefix:camel Properties>] {
66                const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {};
67            }
68
69            impl UnknownFields for [<$struct_prefix:camel Properties>] {
70                fn unknown_fields(&self) -> HashMap<String, String> {
71                    self.unknown_fields.clone()
72                }
73            }
74
75            impl SourceProperties for [<$struct_prefix:camel Properties>] {
76                type Split = [<$struct_prefix:camel Split>];
77                type SplitEnumerator = [<$struct_prefix:camel SplitEnumerator>];
78                type SplitReader = [<$struct_prefix:camel SplitReader>];
79
80                const SOURCE_NAME: &'static str = [<$source_name:upper _CONNECTOR>];
81            }
82
83            #[doc = "A dummy split that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
84            #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash)]
85            pub struct [<$struct_prefix:camel Split>] {
86                _private: (),
87            }
88
89            impl SplitMetaData for [<$struct_prefix:camel Split>] {
90                fn id(&self) -> SplitId {
91                    "feature_not_enabled".into()
92                }
93
94                fn encode_to_json(&self) -> JsonbVal {
95                    serde_json::to_value(self).unwrap().into()
96                }
97
98                fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
99                    Err(err_feature_not_enabled())
100                }
101
102                fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
103                    Err(err_feature_not_enabled())
104                }
105            }
106
107            #[doc = "A dummy split enumerator that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
108            pub struct [<$struct_prefix:camel SplitEnumerator>];
109
110            #[async_trait]
111            impl SplitEnumerator for [<$struct_prefix:camel SplitEnumerator>] {
112                type Properties = [<$struct_prefix:camel Properties>];
113                type Split = [<$struct_prefix:camel Split>];
114
115                async fn new(
116                    _properties: Self::Properties,
117                    _context: SourceEnumeratorContextRef,
118                ) -> ConnectorResult<Self> {
119                    Err(err_feature_not_enabled())
120                }
121
122                async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
123                    Err(err_feature_not_enabled())
124                }
125            }
126
127            #[doc = "A dummy split reader that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
128            pub struct [<$struct_prefix:camel SplitReader>];
129
130            #[async_trait]
131            impl SplitReader for [<$struct_prefix:camel SplitReader>] {
132                type Properties = [<$struct_prefix:camel Properties>];
133                type Split = [<$struct_prefix:camel Split>];
134
135                async fn new(
136                    _properties: Self::Properties,
137                    _splits: Vec<Self::Split>,
138                    _parser_config: ParserConfig,
139                    _source_ctx: SourceContextRef,
140                    _columns: Option<Vec<Column>>,
141                ) -> ConnectorResult<Self> {
142                    Err(err_feature_not_enabled())
143                }
144
145                fn into_stream(self) -> BoxSourceChunkStream {
146                    Box::pin(futures::stream::once(async {
147                        Err(err_feature_not_enabled())
148                    }))
149                }
150            }
151        }
152        }
153    };
154}
155pub(super) use feature_gated_source_mod;