risingwave_connector/source/
test_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::{Arc, OnceLock};
17
18use async_trait::async_trait;
19use parking_lot::Mutex;
20use risingwave_common::bail;
21use risingwave_common::types::JsonbVal;
22use serde_derive::{Deserialize, Serialize};
23use with_options::WithOptions;
24
25use crate::error::ConnectorResult;
26use crate::parser::ParserConfig;
27use crate::source::{
28    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
29    SplitEnumerator, SplitId, SplitMetaData, SplitReader, TryFromBTreeMap,
30};
31
32pub type BoxListSplits = Box<
33    dyn FnMut(
34            TestSourceProperties,
35            SourceEnumeratorContextRef,
36        ) -> ConnectorResult<Vec<TestSourceSplit>>
37        + Send
38        + 'static,
39>;
40
41pub type BoxIntoSourceStream = Box<
42    dyn FnMut(
43            TestSourceProperties,
44            Vec<TestSourceSplit>,
45            ParserConfig,
46            SourceContextRef,
47            Option<Vec<Column>>,
48        ) -> BoxSourceChunkStream
49        + Send
50        + 'static,
51>;
52
53pub struct BoxSource {
54    list_split: BoxListSplits,
55    into_source_stream: BoxIntoSourceStream,
56}
57
58impl BoxSource {
59    pub fn new(
60        list_splits: impl FnMut(
61            TestSourceProperties,
62            SourceEnumeratorContextRef,
63        ) -> ConnectorResult<Vec<TestSourceSplit>>
64        + Send
65        + 'static,
66        into_source_stream: impl FnMut(
67            TestSourceProperties,
68            Vec<TestSourceSplit>,
69            ParserConfig,
70            SourceContextRef,
71            Option<Vec<Column>>,
72        ) -> BoxSourceChunkStream
73        + Send
74        + 'static,
75    ) -> BoxSource {
76        BoxSource {
77            list_split: Box::new(list_splits),
78            into_source_stream: Box::new(into_source_stream),
79        }
80    }
81}
82
83struct TestSourceRegistry {
84    box_source: Arc<Mutex<Option<BoxSource>>>,
85}
86
87impl TestSourceRegistry {
88    fn new() -> Self {
89        TestSourceRegistry {
90            box_source: Arc::new(Mutex::new(None)),
91        }
92    }
93}
94
95fn get_registry() -> &'static TestSourceRegistry {
96    static GLOBAL_REGISTRY: OnceLock<TestSourceRegistry> = OnceLock::new();
97    GLOBAL_REGISTRY.get_or_init(TestSourceRegistry::new)
98}
99
100pub struct TestSourceRegistryGuard;
101
102impl Drop for TestSourceRegistryGuard {
103    fn drop(&mut self) {
104        assert!(get_registry().box_source.lock().take().is_some());
105    }
106}
107
108pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard {
109    assert!(
110        get_registry()
111            .box_source
112            .lock()
113            .replace(box_source)
114            .is_none()
115    );
116    TestSourceRegistryGuard
117}
118
119pub const TEST_CONNECTOR: &str = "test";
120
121#[derive(Clone, Debug, Default, WithOptions)]
122pub struct TestSourceProperties {
123    properties: BTreeMap<String, String>,
124}
125
126impl TryFromBTreeMap for TestSourceProperties {
127    fn try_from_btreemap(
128        props: BTreeMap<String, String>,
129        _deny_unknown_fields: bool,
130    ) -> ConnectorResult<Self> {
131        if cfg!(any(madsim, test)) {
132            Ok(TestSourceProperties { properties: props })
133        } else {
134            bail!("test source only available at test")
135        }
136    }
137}
138
139#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
140pub struct TestSourceSplit {
141    pub id: SplitId,
142    pub properties: HashMap<String, String>,
143    pub offset: String,
144}
145
146impl SplitMetaData for TestSourceSplit {
147    fn id(&self) -> SplitId {
148        self.id.clone()
149    }
150
151    fn encode_to_json(&self) -> JsonbVal {
152        serde_json::to_value(self.clone()).unwrap().into()
153    }
154
155    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
156        serde_json::from_value(value.take()).map_err(Into::into)
157    }
158
159    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
160        self.offset = last_seen_offset;
161        Ok(())
162    }
163}
164
165pub struct TestSourceSplitEnumerator {
166    properties: TestSourceProperties,
167    context: SourceEnumeratorContextRef,
168}
169
170#[async_trait]
171impl SplitEnumerator for TestSourceSplitEnumerator {
172    type Properties = TestSourceProperties;
173    type Split = TestSourceSplit;
174
175    async fn new(
176        properties: Self::Properties,
177        context: SourceEnumeratorContextRef,
178    ) -> ConnectorResult<Self> {
179        Ok(Self {
180            properties,
181            context,
182        })
183    }
184
185    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
186        (get_registry()
187            .box_source
188            .lock()
189            .as_mut()
190            .expect("should have init")
191            .list_split)(self.properties.clone(), self.context.clone())
192    }
193}
194
195pub struct TestSourceSplitReader {
196    properties: TestSourceProperties,
197    state: Vec<TestSourceSplit>,
198    parser_config: ParserConfig,
199    source_ctx: SourceContextRef,
200    columns: Option<Vec<Column>>,
201}
202
203#[async_trait]
204impl SplitReader for TestSourceSplitReader {
205    type Properties = TestSourceProperties;
206    type Split = TestSourceSplit;
207
208    async fn new(
209        properties: Self::Properties,
210        state: Vec<Self::Split>,
211        parser_config: ParserConfig,
212        source_ctx: SourceContextRef,
213        columns: Option<Vec<Column>>,
214    ) -> ConnectorResult<Self> {
215        Ok(Self {
216            properties,
217            state,
218            parser_config,
219            source_ctx,
220            columns,
221        })
222    }
223
224    fn into_stream(self) -> BoxSourceChunkStream {
225        (get_registry()
226            .box_source
227            .lock()
228            .as_mut()
229            .expect("should have init")
230            .into_source_stream)(
231            self.properties,
232            self.state,
233            self.parser_config,
234            self.source_ctx,
235            self.columns,
236        )
237    }
238}
239
240impl SourceProperties for TestSourceProperties {
241    type Split = TestSourceSplit;
242    type SplitEnumerator = TestSourceSplitEnumerator;
243    type SplitReader = TestSourceSplitReader;
244
245    const SOURCE_NAME: &'static str = TEST_CONNECTOR;
246}
247
248impl crate::source::UnknownFields for TestSourceProperties {
249    fn unknown_fields(&self) -> HashMap<String, String> {
250        HashMap::new()
251    }
252}