risingwave_connector/source/
test_source.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::{Arc, OnceLock};
17
18use async_trait::async_trait;
19use parking_lot::Mutex;
20use risingwave_common::bail;
21use risingwave_common::types::JsonbVal;
22use serde_derive::{Deserialize, Serialize};
23use with_options::WithOptions;
24
25use crate::enforce_secret::EnforceSecret;
26use crate::error::ConnectorResult;
27use crate::parser::ParserConfig;
28use crate::source::{
29 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
30 SplitEnumerator, SplitId, SplitMetaData, SplitReader, TryFromBTreeMap,
31};
32
33pub type BoxListSplits = Box<
34 dyn FnMut(
35 TestSourceProperties,
36 SourceEnumeratorContextRef,
37 ) -> ConnectorResult<Vec<TestSourceSplit>>
38 + Send
39 + 'static,
40>;
41
42pub type BoxIntoSourceStream = Box<
43 dyn FnMut(
44 TestSourceProperties,
45 Vec<TestSourceSplit>,
46 ParserConfig,
47 SourceContextRef,
48 Option<Vec<Column>>,
49 ) -> BoxSourceChunkStream
50 + Send
51 + 'static,
52>;
53
54pub struct BoxSource {
55 list_split: BoxListSplits,
56 into_source_stream: BoxIntoSourceStream,
57}
58
59impl BoxSource {
60 pub fn new(
61 list_splits: impl FnMut(
62 TestSourceProperties,
63 SourceEnumeratorContextRef,
64 ) -> ConnectorResult<Vec<TestSourceSplit>>
65 + Send
66 + 'static,
67 into_source_stream: impl FnMut(
68 TestSourceProperties,
69 Vec<TestSourceSplit>,
70 ParserConfig,
71 SourceContextRef,
72 Option<Vec<Column>>,
73 ) -> BoxSourceChunkStream
74 + Send
75 + 'static,
76 ) -> BoxSource {
77 BoxSource {
78 list_split: Box::new(list_splits),
79 into_source_stream: Box::new(into_source_stream),
80 }
81 }
82}
83
84struct TestSourceRegistry {
85 box_source: Arc<Mutex<Option<BoxSource>>>,
86}
87
88impl TestSourceRegistry {
89 fn new() -> Self {
90 TestSourceRegistry {
91 box_source: Arc::new(Mutex::new(None)),
92 }
93 }
94}
95
96fn get_registry() -> &'static TestSourceRegistry {
97 static GLOBAL_REGISTRY: OnceLock<TestSourceRegistry> = OnceLock::new();
98 GLOBAL_REGISTRY.get_or_init(TestSourceRegistry::new)
99}
100
101pub struct TestSourceRegistryGuard;
102
103impl Drop for TestSourceRegistryGuard {
104 fn drop(&mut self) {
105 assert!(get_registry().box_source.lock().take().is_some());
106 }
107}
108
109pub fn register_test_source(box_source: BoxSource) -> TestSourceRegistryGuard {
110 assert!(
111 get_registry()
112 .box_source
113 .lock()
114 .replace(box_source)
115 .is_none()
116 );
117 TestSourceRegistryGuard
118}
119
120pub const TEST_CONNECTOR: &str = "test";
121
122#[derive(Clone, Debug, Default, WithOptions)]
123pub struct TestSourceProperties {
124 properties: BTreeMap<String, String>,
125}
126
127impl EnforceSecret for TestSourceProperties {}
128
129impl TryFromBTreeMap for TestSourceProperties {
130 fn try_from_btreemap(
131 props: BTreeMap<String, String>,
132 _deny_unknown_fields: bool,
133 ) -> ConnectorResult<Self> {
134 if cfg!(any(madsim, test)) {
135 Ok(TestSourceProperties { properties: props })
136 } else {
137 bail!("test source only available at test")
138 }
139 }
140}
141
142#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
143pub struct TestSourceSplit {
144 pub id: SplitId,
145 pub properties: HashMap<String, String>,
146 pub offset: String,
147}
148
149impl SplitMetaData for TestSourceSplit {
150 fn id(&self) -> SplitId {
151 self.id.clone()
152 }
153
154 fn encode_to_json(&self) -> JsonbVal {
155 serde_json::to_value(self.clone()).unwrap().into()
156 }
157
158 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
159 serde_json::from_value(value.take()).map_err(Into::into)
160 }
161
162 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
163 self.offset = last_seen_offset;
164 Ok(())
165 }
166}
167
168pub struct TestSourceSplitEnumerator {
169 properties: TestSourceProperties,
170 context: SourceEnumeratorContextRef,
171}
172
173#[async_trait]
174impl SplitEnumerator for TestSourceSplitEnumerator {
175 type Properties = TestSourceProperties;
176 type Split = TestSourceSplit;
177
178 async fn new(
179 properties: Self::Properties,
180 context: SourceEnumeratorContextRef,
181 ) -> ConnectorResult<Self> {
182 Ok(Self {
183 properties,
184 context,
185 })
186 }
187
188 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
189 (get_registry()
190 .box_source
191 .lock()
192 .as_mut()
193 .expect("should have init")
194 .list_split)(self.properties.clone(), self.context.clone())
195 }
196}
197
198pub struct TestSourceSplitReader {
199 properties: TestSourceProperties,
200 state: Vec<TestSourceSplit>,
201 parser_config: ParserConfig,
202 source_ctx: SourceContextRef,
203 columns: Option<Vec<Column>>,
204}
205
206#[async_trait]
207impl SplitReader for TestSourceSplitReader {
208 type Properties = TestSourceProperties;
209 type Split = TestSourceSplit;
210
211 async fn new(
212 properties: Self::Properties,
213 state: Vec<Self::Split>,
214 parser_config: ParserConfig,
215 source_ctx: SourceContextRef,
216 columns: Option<Vec<Column>>,
217 ) -> ConnectorResult<Self> {
218 Ok(Self {
219 properties,
220 state,
221 parser_config,
222 source_ctx,
223 columns,
224 })
225 }
226
227 fn into_stream(self) -> BoxSourceChunkStream {
228 (get_registry()
229 .box_source
230 .lock()
231 .as_mut()
232 .expect("should have init")
233 .into_source_stream)(
234 self.properties,
235 self.state,
236 self.parser_config,
237 self.source_ctx,
238 self.columns,
239 )
240 }
241}
242
243impl SourceProperties for TestSourceProperties {
244 type Split = TestSourceSplit;
245 type SplitEnumerator = TestSourceSplitEnumerator;
246 type SplitReader = TestSourceSplitReader;
247
248 const SOURCE_NAME: &'static str = TEST_CONNECTOR;
249}
250
251impl crate::source::UnknownFields for TestSourceProperties {
252 fn unknown_fields(&self) -> HashMap<String, String> {
253 HashMap::new()
254 }
255}