risingwave_connector/source/
test_source.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::{Arc, OnceLock};
17
18use async_trait::async_trait;
19use parking_lot::Mutex;
20use risingwave_common::bail;
21use risingwave_common::types::JsonbVal;
22use serde_derive::{Deserialize, Serialize};
23use with_options::WithOptions;
24
25use crate::error::ConnectorResult;
26use crate::parser::ParserConfig;
27use crate::source::{
28 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
29 SplitEnumerator, SplitId, SplitMetaData, SplitReader, TryFromBTreeMap,
30};
31
32pub type BoxListSplits = Box<
33 dyn FnMut(
34 TestSourceProperties,
35 SourceEnumeratorContextRef,
36 ) -> ConnectorResult<Vec<TestSourceSplit>>
37 + Send
38 + 'static,
39>;
40
41pub type BoxIntoSourceStream = Box<
42 dyn FnMut(
43 TestSourceProperties,
44 Vec<TestSourceSplit>,
45 ParserConfig,
46 SourceContextRef,
47 Option<Vec<Column>>,
48 ) -> BoxSourceChunkStream
49 + Send
50 + 'static,
51>;
52
53pub struct BoxSource {
54 list_split: BoxListSplits,
55 into_source_stream: BoxIntoSourceStream,
56}
57
58impl BoxSource {
59 pub fn new(
60 list_splits: impl FnMut(
61 TestSourceProperties,
62 SourceEnumeratorContextRef,
63 ) -> ConnectorResult<Vec<TestSourceSplit>>
64 + Send
65 + 'static,
66 into_source_stream: impl FnMut(
67 TestSourceProperties,
68 Vec<TestSourceSplit>,
69 ParserConfig,
70 SourceContextRef,
71 Option<Vec<Column>>,
72 ) -> BoxSourceChunkStream
73 + Send
74 + 'static,
75 ) -> BoxSource {
76 BoxSource {
77 list_split: Box::new(list_splits),
78 into_source_stream: Box::new(into_source_stream),
79 }
80 }
81}
82
83struct TestSourceRegistry {
84 box_source: Arc<Mutex<Option<BoxSource>>>,
85}
86
87impl TestSourceRegistry {
88 fn new() -> Self {
89 TestSourceRegistry {
90 box_source: Arc::new(Mutex::new(None)),
91 }
92 }
93}
94
95fn get_registry() -> &'static TestSourceRegistry {
96 static GLOBAL_REGISTRY: OnceLock<TestSourceRegistry> = OnceLock::new();
97 GLOBAL_REGISTRY.get_or_init(TestSourceRegistry::new)
98}
99
100pub struct TestSourceRegistryGuard;
101
102impl Drop for TestSourceRegistryGuard {
103 fn drop(&mut self) {
104 assert!(get_registry().box_source.lock().take().is_some());
105 }
106}
107
108pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard {
109 assert!(
110 get_registry()
111 .box_source
112 .lock()
113 .replace(box_source)
114 .is_none()
115 );
116 TestSourceRegistryGuard
117}
118
119pub const TEST_CONNECTOR: &str = "test";
120
121#[derive(Clone, Debug, Default, WithOptions)]
122pub struct TestSourceProperties {
123 properties: BTreeMap<String, String>,
124}
125
126impl TryFromBTreeMap for TestSourceProperties {
127 fn try_from_btreemap(
128 props: BTreeMap<String, String>,
129 _deny_unknown_fields: bool,
130 ) -> ConnectorResult<Self> {
131 if cfg!(any(madsim, test)) {
132 Ok(TestSourceProperties { properties: props })
133 } else {
134 bail!("test source only available at test")
135 }
136 }
137}
138
139#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
140pub struct TestSourceSplit {
141 pub id: SplitId,
142 pub properties: HashMap<String, String>,
143 pub offset: String,
144}
145
146impl SplitMetaData for TestSourceSplit {
147 fn id(&self) -> SplitId {
148 self.id.clone()
149 }
150
151 fn encode_to_json(&self) -> JsonbVal {
152 serde_json::to_value(self.clone()).unwrap().into()
153 }
154
155 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
156 serde_json::from_value(value.take()).map_err(Into::into)
157 }
158
159 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
160 self.offset = last_seen_offset;
161 Ok(())
162 }
163}
164
165pub struct TestSourceSplitEnumerator {
166 properties: TestSourceProperties,
167 context: SourceEnumeratorContextRef,
168}
169
170#[async_trait]
171impl SplitEnumerator for TestSourceSplitEnumerator {
172 type Properties = TestSourceProperties;
173 type Split = TestSourceSplit;
174
175 async fn new(
176 properties: Self::Properties,
177 context: SourceEnumeratorContextRef,
178 ) -> ConnectorResult<Self> {
179 Ok(Self {
180 properties,
181 context,
182 })
183 }
184
185 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
186 (get_registry()
187 .box_source
188 .lock()
189 .as_mut()
190 .expect("should have init")
191 .list_split)(self.properties.clone(), self.context.clone())
192 }
193}
194
195pub struct TestSourceSplitReader {
196 properties: TestSourceProperties,
197 state: Vec<TestSourceSplit>,
198 parser_config: ParserConfig,
199 source_ctx: SourceContextRef,
200 columns: Option<Vec<Column>>,
201}
202
203#[async_trait]
204impl SplitReader for TestSourceSplitReader {
205 type Properties = TestSourceProperties;
206 type Split = TestSourceSplit;
207
208 async fn new(
209 properties: Self::Properties,
210 state: Vec<Self::Split>,
211 parser_config: ParserConfig,
212 source_ctx: SourceContextRef,
213 columns: Option<Vec<Column>>,
214 ) -> ConnectorResult<Self> {
215 Ok(Self {
216 properties,
217 state,
218 parser_config,
219 source_ctx,
220 columns,
221 })
222 }
223
224 fn into_stream(self) -> BoxSourceChunkStream {
225 (get_registry()
226 .box_source
227 .lock()
228 .as_mut()
229 .expect("should have init")
230 .into_source_stream)(
231 self.properties,
232 self.state,
233 self.parser_config,
234 self.source_ctx,
235 self.columns,
236 )
237 }
238}
239
240impl SourceProperties for TestSourceProperties {
241 type Split = TestSourceSplit;
242 type SplitEnumerator = TestSourceSplitEnumerator;
243 type SplitReader = TestSourceSplitReader;
244
245 const SOURCE_NAME: &'static str = TEST_CONNECTOR;
246}
247
248impl crate::source::UnknownFields for TestSourceProperties {
249 fn unknown_fields(&self) -> HashMap<String, String> {
250 HashMap::new()
251 }
252}