risingwave_connector/source/nexmark/enumerator/
mod.rsuse async_trait::async_trait;
use crate::error::ConnectorResult;
use crate::source::nexmark::split::NexmarkSplit;
use crate::source::nexmark::NexmarkProperties;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
pub struct NexmarkSplitEnumerator {
split_num: i32,
}
impl NexmarkSplitEnumerator {}
#[async_trait]
impl SplitEnumerator for NexmarkSplitEnumerator {
type Properties = NexmarkProperties;
type Split = NexmarkSplit;
async fn new(
properties: NexmarkProperties,
_context: SourceEnumeratorContextRef,
) -> ConnectorResult<NexmarkSplitEnumerator> {
let split_num = properties.split_num;
Ok(Self { split_num })
}
async fn list_splits(&mut self) -> ConnectorResult<Vec<NexmarkSplit>> {
let mut splits = vec![];
for i in 0..self.split_num {
splits.push(NexmarkSplit {
split_num: self.split_num,
split_index: i,
start_offset: None,
});
}
Ok(splits)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::ConnectorResult as Result;
use crate::source::SplitMetaData;
#[tokio::test]
async fn test_nexmark_split_enumerator() -> Result<()> {
let mut enumerator = NexmarkSplitEnumerator { split_num: 4 };
let list_splits_resp = enumerator.list_splits().await?;
assert_eq!(list_splits_resp.len(), 4);
assert_eq!(&*list_splits_resp[0].id(), "4-0");
assert_eq!(&*list_splits_resp[1].id(), "4-1");
assert_eq!(&*list_splits_resp[2].id(), "4-2");
assert_eq!(&*list_splits_resp[3].id(), "4-3");
Ok(())
}
}