risingwave_connector/source/
utils.rs1#[allow(unused_macros)]
24macro_rules! feature_gated_source_mod {
25 ($mod_name:ident, $source_name:literal) => {
26 crate::source::utils::feature_gated_source_mod!($mod_name, $mod_name, $source_name);
27 };
28 ($mod_name:ident, $struct_prefix:ident, $source_name:literal) => {
29 paste::paste! {
30 #[cfg(feature = "source-" $source_name)]
31 pub mod $mod_name;
32
33 #[cfg(not(feature = "source-" $source_name))]
34 pub mod $mod_name {
35 use std::collections::HashMap;
36
37 use anyhow::anyhow;
38 use async_trait::async_trait;
39 use risingwave_common::types::JsonbVal;
40 use serde::{Deserialize, Serialize};
41
42 use crate::error::{ConnectorError, ConnectorResult};
43 use crate::parser::ParserConfig;
44 use crate::source::{
45 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
46 SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
47 };
48 pub const [<$source_name:upper _CONNECTOR>]: &'static str = $source_name;
49
50 fn err_feature_not_enabled() -> ConnectorError {
51 ConnectorError::from(anyhow!(
52 "Feature `source-{}` is not enabled at compile time. \
53 Please enable it in `Cargo.toml` and rebuild.",
54 $source_name
55 ))
56 }
57
58 #[doc = "A dummy source properties that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
59 #[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
60 pub struct [<$struct_prefix:camel Properties>] {
61 #[serde(flatten)]
62 pub unknown_fields: HashMap<String, String>,
63 }
64
65 impl crate::enforce_secret::EnforceSecret for [<$struct_prefix:camel Properties>] {
66 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {};
67 }
68
69 impl UnknownFields for [<$struct_prefix:camel Properties>] {
70 fn unknown_fields(&self) -> HashMap<String, String> {
71 self.unknown_fields.clone()
72 }
73 }
74
75 impl SourceProperties for [<$struct_prefix:camel Properties>] {
76 type Split = [<$struct_prefix:camel Split>];
77 type SplitEnumerator = [<$struct_prefix:camel SplitEnumerator>];
78 type SplitReader = [<$struct_prefix:camel SplitReader>];
79
80 const SOURCE_NAME: &'static str = [<$source_name:upper _CONNECTOR>];
81 }
82
83 #[doc = "A dummy split that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
84 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash)]
85 pub struct [<$struct_prefix:camel Split>] {
86 _private: (),
87 }
88
89 impl SplitMetaData for [<$struct_prefix:camel Split>] {
90 fn id(&self) -> SplitId {
91 "feature_not_enabled".into()
92 }
93
94 fn encode_to_json(&self) -> JsonbVal {
95 serde_json::to_value(self).unwrap().into()
96 }
97
98 fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
99 Err(err_feature_not_enabled())
100 }
101
102 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
103 Err(err_feature_not_enabled())
104 }
105 }
106
107 #[doc = "A dummy split enumerator that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
108 pub struct [<$struct_prefix:camel SplitEnumerator>];
109
110 #[async_trait]
111 impl SplitEnumerator for [<$struct_prefix:camel SplitEnumerator>] {
112 type Properties = [<$struct_prefix:camel Properties>];
113 type Split = [<$struct_prefix:camel Split>];
114
115 async fn new(
116 _properties: Self::Properties,
117 _context: SourceEnumeratorContextRef,
118 ) -> ConnectorResult<Self> {
119 Err(err_feature_not_enabled())
120 }
121
122 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
123 Err(err_feature_not_enabled())
124 }
125 }
126
127 #[doc = "A dummy split reader that always returns an error, as the feature `source-" $source_name "` is currently not enabled."]
128 pub struct [<$struct_prefix:camel SplitReader>];
129
130 #[async_trait]
131 impl SplitReader for [<$struct_prefix:camel SplitReader>] {
132 type Properties = [<$struct_prefix:camel Properties>];
133 type Split = [<$struct_prefix:camel Split>];
134
135 async fn new(
136 _properties: Self::Properties,
137 _splits: Vec<Self::Split>,
138 _parser_config: ParserConfig,
139 _source_ctx: SourceContextRef,
140 _columns: Option<Vec<Column>>,
141 ) -> ConnectorResult<Self> {
142 Err(err_feature_not_enabled())
143 }
144
145 fn into_stream(self) -> BoxSourceChunkStream {
146 Box::pin(futures::stream::once(async {
147 Err(err_feature_not_enabled())
148 }))
149 }
150 }
151 }
152 }
153 };
154}
155pub(super) use feature_gated_source_mod;