risingwave_connector/source/pulsar/enumerator/
client.rsuse anyhow::anyhow;
use async_trait::async_trait;
use itertools::Itertools;
use pulsar::{Pulsar, TokioExecutor};
use risingwave_common::bail;
use serde::{Deserialize, Serialize};
use crate::error::ConnectorResult;
use crate::source::pulsar::split::PulsarSplit;
use crate::source::pulsar::topic::{parse_topic, Topic};
use crate::source::pulsar::PulsarProperties;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
pub struct PulsarSplitEnumerator {
client: Pulsar<TokioExecutor>,
topic: Topic,
start_offset: PulsarEnumeratorOffset,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum PulsarEnumeratorOffset {
Earliest,
Latest,
MessageId(String),
Timestamp(i64),
}
#[async_trait]
impl SplitEnumerator for PulsarSplitEnumerator {
type Properties = PulsarProperties;
type Split = PulsarSplit;
async fn new(
properties: PulsarProperties,
_context: SourceEnumeratorContextRef,
) -> ConnectorResult<PulsarSplitEnumerator> {
let pulsar = properties
.common
.build_client(&properties.oauth, &properties.aws_auth_props)
.await?;
let topic = properties.common.topic;
let parsed_topic = parse_topic(&topic)?;
let mut scan_start_offset = match properties
.scan_startup_mode
.map(|s| s.to_lowercase())
.as_deref()
{
Some("earliest") => PulsarEnumeratorOffset::Earliest,
Some("latest") => PulsarEnumeratorOffset::Latest,
None => PulsarEnumeratorOffset::Earliest,
_ => {
bail!(
"properties `startup_mode` only supports earliest and latest or leaving it empty"
);
}
};
if let Some(s) = properties.time_offset {
let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset)
}
Ok(PulsarSplitEnumerator {
client: pulsar,
topic: parsed_topic,
start_offset: scan_start_offset,
})
}
async fn list_splits(&mut self) -> ConnectorResult<Vec<PulsarSplit>> {
let offset = self.start_offset.clone();
assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_)));
let topic_partitions = self
.client
.lookup_partitioned_topic_number(&self.topic.to_string())
.await
.map_err(|e| anyhow!(e))?;
let splits = if topic_partitions > 0 {
(0..topic_partitions as i32)
.map(|p| PulsarSplit {
topic: self.topic.sub_topic(p).unwrap(),
start_offset: offset.clone(),
})
.collect_vec()
} else {
vec![PulsarSplit {
topic: self.topic.clone(),
start_offset: offset.clone(),
}]
};
Ok(splits)
}
}