risingwave_connector/source/pulsar/enumerator/
client.rs1use anyhow::anyhow;
16use async_trait::async_trait;
17use itertools::Itertools;
18use pulsar::{Pulsar, TokioExecutor};
19use risingwave_common::bail;
20use serde::{Deserialize, Serialize};
21
22use crate::error::ConnectorResult;
23use crate::source::pulsar::PulsarProperties;
24use crate::source::pulsar::split::PulsarSplit;
25use crate::source::pulsar::topic::{Topic, parse_topic};
26use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
27
28pub struct PulsarSplitEnumerator {
29 client: Pulsar<TokioExecutor>,
30 topic: Topic,
31 start_offset: PulsarEnumeratorOffset,
32}
33
34#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
35pub enum PulsarEnumeratorOffset {
36 Earliest,
37 Latest,
38 MessageId(String),
39 Timestamp(i64),
40}
41
42#[async_trait]
43impl SplitEnumerator for PulsarSplitEnumerator {
44 type Properties = PulsarProperties;
45 type Split = PulsarSplit;
46
47 async fn new(
48 properties: PulsarProperties,
49 _context: SourceEnumeratorContextRef,
50 ) -> ConnectorResult<PulsarSplitEnumerator> {
51 let pulsar = properties
52 .common
53 .build_client(&properties.oauth, &properties.aws_auth_props)
54 .await?;
55 let topic = properties.common.topic;
56 let parsed_topic = parse_topic(&topic)?;
57
58 let mut scan_start_offset = match properties
59 .scan_startup_mode
60 .map(|s| s.to_lowercase())
61 .as_deref()
62 {
63 Some("earliest") => PulsarEnumeratorOffset::Earliest,
64 Some("latest") => PulsarEnumeratorOffset::Latest,
65 None => PulsarEnumeratorOffset::Earliest,
66 _ => {
67 bail!(
68 "properties `startup_mode` only supports earliest and latest or leaving it empty"
69 );
70 }
71 };
72
73 if let Some(s) = properties.time_offset {
74 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
75 scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset)
76 }
77
78 Ok(PulsarSplitEnumerator {
79 client: pulsar,
80 topic: parsed_topic,
81 start_offset: scan_start_offset,
82 })
83 }
84
85 async fn list_splits(&mut self) -> ConnectorResult<Vec<PulsarSplit>> {
86 let offset = self.start_offset.clone();
87 assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_)));
89
90 let topic_partitions = self
91 .client
92 .lookup_partitioned_topic_number(&self.topic.to_string())
93 .await
94 .map_err(|e| anyhow!(e))?;
95
96 let splits = if topic_partitions > 0 {
97 (0..topic_partitions as i32)
99 .map(|p| PulsarSplit {
100 topic: self.topic.sub_topic(p).unwrap(),
101 start_offset: offset.clone(),
102 })
103 .collect_vec()
104 } else {
105 vec![PulsarSplit {
107 topic: self.topic.clone(),
108 start_offset: offset.clone(),
109 }]
110 };
111
112 Ok(splits)
113 }
114}