risingwave_connector/source/
utils.rs1macro_rules! feature_gated_source_mod {
24 ($mod_name:ident, $source_name:literal) => {
25 crate::source::utils::feature_gated_source_mod!($mod_name, $mod_name, $source_name);
26 };
27 ($mod_name:ident, $struct_prefix:ident, $source_name:literal) => {
28 paste::paste! {
29 #[cfg(feature = "source-" $source_name)]
30 pub mod $mod_name;
31
32 #[cfg(not(feature = "source-" $source_name))]
33 pub mod $mod_name {
34 use std::collections::HashMap;
35
36 use anyhow::anyhow;
37 use async_trait::async_trait;
38 use risingwave_common::types::JsonbVal;
39 use serde::{Deserialize, Serialize};
40
41 use crate::error::{ConnectorError, ConnectorResult};
42 use crate::parser::ParserConfig;
43 use crate::source::{
44 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
45 SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
46 };
47 pub const [<$source_name:upper _CONNECTOR>]: &'static str = $source_name;
48
49 fn err_feature_not_enabled() -> ConnectorError {
50 ConnectorError::from(anyhow!(
51 "Feature `source-{}` is not enabled at compile time. \
52 Please enable it in `Cargo.toml` and rebuild.",
53 $source_name
54 ))
55 }
56
57 #[doc = "A dummy source properties that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
58 #[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
59 pub struct [<$struct_prefix:camel Properties>] {
60 #[serde(flatten)]
61 pub unknown_fields: HashMap<String, String>,
62 }
63
64 impl crate::enforce_secret::EnforceSecret for [<$struct_prefix:camel Properties>] {
65 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {};
66 }
67
68 impl UnknownFields for [<$struct_prefix:camel Properties>] {
69 fn unknown_fields(&self) -> HashMap<String, String> {
70 self.unknown_fields.clone()
71 }
72 }
73
74 impl SourceProperties for [<$struct_prefix:camel Properties>] {
75 type Split = [<$struct_prefix:camel Split>];
76 type SplitEnumerator = [<$struct_prefix:camel SplitEnumerator>];
77 type SplitReader = [<$struct_prefix:camel SplitReader>];
78
79 const SOURCE_NAME: &'static str = [<$source_name:upper _CONNECTOR>];
80 }
81
82 #[doc = "A dummy split that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
83 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash)]
84 pub struct [<$struct_prefix:camel Split>] {
85 _private: (),
86 }
87
88 impl SplitMetaData for [<$struct_prefix:camel Split>] {
89 fn id(&self) -> SplitId {
90 "feature_not_enabled".into()
91 }
92
93 fn encode_to_json(&self) -> JsonbVal {
94 serde_json::to_value(self).unwrap().into()
95 }
96
97 fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
98 Err(err_feature_not_enabled())
99 }
100
101 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
102 Err(err_feature_not_enabled())
103 }
104 }
105
106 #[doc = "A dummy split enumerator that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
107 pub struct [<$struct_prefix:camel SplitEnumerator>];
108
109 #[async_trait]
110 impl SplitEnumerator for [<$struct_prefix:camel SplitEnumerator>] {
111 type Properties = [<$struct_prefix:camel Properties>];
112 type Split = [<$struct_prefix:camel Split>];
113
114 async fn new(
115 _properties: Self::Properties,
116 _context: SourceEnumeratorContextRef,
117 ) -> ConnectorResult<Self> {
118 Err(err_feature_not_enabled())
119 }
120
121 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
122 Err(err_feature_not_enabled())
123 }
124 }
125
126 #[doc = "A dummy split reader that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
127 pub struct [<$struct_prefix:camel SplitReader>];
128
129 #[async_trait]
130 impl SplitReader for [<$struct_prefix:camel SplitReader>] {
131 type Properties = [<$struct_prefix:camel Properties>];
132 type Split = [<$struct_prefix:camel Split>];
133
134 async fn new(
135 _properties: Self::Properties,
136 _splits: Vec<Self::Split>,
137 _parser_config: ParserConfig,
138 _source_ctx: SourceContextRef,
139 _columns: Option<Vec<Column>>,
140 ) -> ConnectorResult<Self> {
141 Err(err_feature_not_enabled())
142 }
143
144 fn into_stream(self) -> BoxSourceChunkStream {
145 Box::pin(futures::stream::once(async {
146 Err(err_feature_not_enabled())
147 }))
148 }
149 }
150 }
151 }
152 };
153}
154pub(super) use feature_gated_source_mod;