risingwave_connector/source/nexmark/enumerator/
mod.rs1use async_trait::async_trait;
16
17use crate::error::ConnectorResult;
18use crate::source::nexmark::NexmarkProperties;
19use crate::source::nexmark::split::NexmarkSplit;
20use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
21
22pub struct NexmarkSplitEnumerator {
23 split_num: i32,
24}
25
26impl NexmarkSplitEnumerator {}
27
28#[async_trait]
29impl SplitEnumerator for NexmarkSplitEnumerator {
30 type Properties = NexmarkProperties;
31 type Split = NexmarkSplit;
32
33 async fn new(
34 properties: NexmarkProperties,
35 _context: SourceEnumeratorContextRef,
36 ) -> ConnectorResult<NexmarkSplitEnumerator> {
37 let split_num = properties.split_num;
38 Ok(Self { split_num })
39 }
40
41 async fn list_splits(&mut self) -> ConnectorResult<Vec<NexmarkSplit>> {
42 let mut splits = vec![];
43 for i in 0..self.split_num {
44 splits.push(NexmarkSplit {
45 split_num: self.split_num,
46 split_index: i,
47 start_offset: None,
48 });
49 }
50 Ok(splits)
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57 use crate::error::ConnectorResult as Result;
58 use crate::source::SplitMetaData;
59
60 #[tokio::test]
61 async fn test_nexmark_split_enumerator() -> Result<()> {
62 let mut enumerator = NexmarkSplitEnumerator { split_num: 4 };
63 let list_splits_resp = enumerator.list_splits().await?;
64
65 assert_eq!(list_splits_resp.len(), 4);
66 assert_eq!(&*list_splits_resp[0].id(), "4-0");
67 assert_eq!(&*list_splits_resp[1].id(), "4-1");
68 assert_eq!(&*list_splits_resp[2].id(), "4-2");
69 assert_eq!(&*list_splits_resp[3].id(), "4-3");
70 Ok(())
71 }
72}