risingwave_connector/source/nexmark/enumerator/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16
17use crate::error::ConnectorResult;
18use crate::source::nexmark::NexmarkProperties;
19use crate::source::nexmark::split::NexmarkSplit;
20use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
21
22pub struct NexmarkSplitEnumerator {
23    split_num: i32,
24}
25
26impl NexmarkSplitEnumerator {}
27
28#[async_trait]
29impl SplitEnumerator for NexmarkSplitEnumerator {
30    type Properties = NexmarkProperties;
31    type Split = NexmarkSplit;
32
33    async fn new(
34        properties: NexmarkProperties,
35        _context: SourceEnumeratorContextRef,
36    ) -> ConnectorResult<NexmarkSplitEnumerator> {
37        let split_num = properties.split_num;
38        Ok(Self { split_num })
39    }
40
41    async fn list_splits(&mut self) -> ConnectorResult<Vec<NexmarkSplit>> {
42        let mut splits = vec![];
43        for i in 0..self.split_num {
44            splits.push(NexmarkSplit {
45                split_num: self.split_num,
46                split_index: i,
47                start_offset: None,
48            });
49        }
50        Ok(splits)
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use crate::error::ConnectorResult as Result;
58    use crate::source::SplitMetaData;
59
60    #[tokio::test]
61    async fn test_nexmark_split_enumerator() -> Result<()> {
62        let mut enumerator = NexmarkSplitEnumerator { split_num: 4 };
63        let list_splits_resp = enumerator.list_splits().await?;
64
65        assert_eq!(list_splits_resp.len(), 4);
66        assert_eq!(&*list_splits_resp[0].id(), "4-0");
67        assert_eq!(&*list_splits_resp[1].id(), "4-1");
68        assert_eq!(&*list_splits_resp[2].id(), "4-2");
69        assert_eq!(&*list_splits_resp[3].id(), "4-3");
70        Ok(())
71    }
72}