risingwave_connector/source/
test_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::{Arc, OnceLock};
17
18use async_trait::async_trait;
19use parking_lot::Mutex;
20use risingwave_common::bail;
21use risingwave_common::types::JsonbVal;
22use serde_derive::{Deserialize, Serialize};
23use with_options::WithOptions;
24
25use crate::enforce_secret::EnforceSecret;
26use crate::error::ConnectorResult;
27use crate::parser::ParserConfig;
28use crate::source::{
29    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
30    SplitEnumerator, SplitId, SplitMetaData, SplitReader, TryFromBTreeMap,
31};
32
33pub type BoxListSplits = Box<
34    dyn FnMut(
35            TestSourceProperties,
36            SourceEnumeratorContextRef,
37        ) -> ConnectorResult<Vec<TestSourceSplit>>
38        + Send
39        + 'static,
40>;
41
42pub type BoxIntoSourceStream = Box<
43    dyn FnMut(
44            TestSourceProperties,
45            Vec<TestSourceSplit>,
46            ParserConfig,
47            SourceContextRef,
48            Option<Vec<Column>>,
49        ) -> BoxSourceChunkStream
50        + Send
51        + 'static,
52>;
53
54pub struct BoxSource {
55    list_split: BoxListSplits,
56    into_source_stream: BoxIntoSourceStream,
57}
58
59impl BoxSource {
60    pub fn new(
61        list_splits: impl FnMut(
62            TestSourceProperties,
63            SourceEnumeratorContextRef,
64        ) -> ConnectorResult<Vec<TestSourceSplit>>
65        + Send
66        + 'static,
67        into_source_stream: impl FnMut(
68            TestSourceProperties,
69            Vec<TestSourceSplit>,
70            ParserConfig,
71            SourceContextRef,
72            Option<Vec<Column>>,
73        ) -> BoxSourceChunkStream
74        + Send
75        + 'static,
76    ) -> BoxSource {
77        BoxSource {
78            list_split: Box::new(list_splits),
79            into_source_stream: Box::new(into_source_stream),
80        }
81    }
82}
83
84struct TestSourceRegistry {
85    box_source: Arc<Mutex<Option<BoxSource>>>,
86}
87
88impl TestSourceRegistry {
89    fn new() -> Self {
90        TestSourceRegistry {
91            box_source: Arc::new(Mutex::new(None)),
92        }
93    }
94}
95
96fn get_registry() -> &'static TestSourceRegistry {
97    static GLOBAL_REGISTRY: OnceLock<TestSourceRegistry> = OnceLock::new();
98    GLOBAL_REGISTRY.get_or_init(TestSourceRegistry::new)
99}
100
101pub struct TestSourceRegistryGuard;
102
103impl Drop for TestSourceRegistryGuard {
104    fn drop(&mut self) {
105        assert!(get_registry().box_source.lock().take().is_some());
106    }
107}
108
109pub fn register_test_source(box_source: BoxSource) -> TestSourceRegistryGuard {
110    assert!(
111        get_registry()
112            .box_source
113            .lock()
114            .replace(box_source)
115            .is_none()
116    );
117    TestSourceRegistryGuard
118}
119
120pub const TEST_CONNECTOR: &str = "test";
121
122#[derive(Clone, Debug, Default, WithOptions)]
123pub struct TestSourceProperties {
124    properties: BTreeMap<String, String>,
125}
126
127impl EnforceSecret for TestSourceProperties {}
128
129impl TryFromBTreeMap for TestSourceProperties {
130    fn try_from_btreemap(
131        props: BTreeMap<String, String>,
132        _deny_unknown_fields: bool,
133    ) -> ConnectorResult<Self> {
134        if cfg!(any(madsim, test)) {
135            Ok(TestSourceProperties { properties: props })
136        } else {
137            bail!("test source only available at test")
138        }
139    }
140}
141
142#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
143pub struct TestSourceSplit {
144    pub id: SplitId,
145    pub properties: HashMap<String, String>,
146    pub offset: String,
147}
148
149impl SplitMetaData for TestSourceSplit {
150    fn id(&self) -> SplitId {
151        self.id.clone()
152    }
153
154    fn encode_to_json(&self) -> JsonbVal {
155        serde_json::to_value(self.clone()).unwrap().into()
156    }
157
158    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
159        serde_json::from_value(value.take()).map_err(Into::into)
160    }
161
162    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
163        self.offset = last_seen_offset;
164        Ok(())
165    }
166}
167
168pub struct TestSourceSplitEnumerator {
169    properties: TestSourceProperties,
170    context: SourceEnumeratorContextRef,
171}
172
173#[async_trait]
174impl SplitEnumerator for TestSourceSplitEnumerator {
175    type Properties = TestSourceProperties;
176    type Split = TestSourceSplit;
177
178    async fn new(
179        properties: Self::Properties,
180        context: SourceEnumeratorContextRef,
181    ) -> ConnectorResult<Self> {
182        Ok(Self {
183            properties,
184            context,
185        })
186    }
187
188    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
189        (get_registry()
190            .box_source
191            .lock()
192            .as_mut()
193            .expect("should have init")
194            .list_split)(self.properties.clone(), self.context.clone())
195    }
196}
197
198pub struct TestSourceSplitReader {
199    properties: TestSourceProperties,
200    state: Vec<TestSourceSplit>,
201    parser_config: ParserConfig,
202    source_ctx: SourceContextRef,
203    columns: Option<Vec<Column>>,
204}
205
206#[async_trait]
207impl SplitReader for TestSourceSplitReader {
208    type Properties = TestSourceProperties;
209    type Split = TestSourceSplit;
210
211    async fn new(
212        properties: Self::Properties,
213        state: Vec<Self::Split>,
214        parser_config: ParserConfig,
215        source_ctx: SourceContextRef,
216        columns: Option<Vec<Column>>,
217    ) -> ConnectorResult<Self> {
218        Ok(Self {
219            properties,
220            state,
221            parser_config,
222            source_ctx,
223            columns,
224        })
225    }
226
227    fn into_stream(self) -> BoxSourceChunkStream {
228        (get_registry()
229            .box_source
230            .lock()
231            .as_mut()
232            .expect("should have init")
233            .into_source_stream)(
234            self.properties,
235            self.state,
236            self.parser_config,
237            self.source_ctx,
238            self.columns,
239        )
240    }
241}
242
243impl SourceProperties for TestSourceProperties {
244    type Split = TestSourceSplit;
245    type SplitEnumerator = TestSourceSplitEnumerator;
246    type SplitReader = TestSourceSplitReader;
247
248    const SOURCE_NAME: &'static str = TEST_CONNECTOR;
249}
250
251impl crate::source::UnknownFields for TestSourceProperties {
252    fn unknown_fields(&self) -> HashMap<String, String> {
253        HashMap::new()
254    }
255}