risingwave_connector/source/pulsar/enumerator/
client.rs1use std::fmt::Debug;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use itertools::Itertools;
20use pulsar::{Pulsar, TokioExecutor};
21use risingwave_common::bail;
22use serde::{Deserialize, Serialize};
23
24use crate::error::ConnectorResult;
25use crate::source::pulsar::PulsarProperties;
26use crate::source::pulsar::split::PulsarSplit;
27use crate::source::pulsar::topic::{PERSISTENT_DOMAIN, Topic, check_topic_exists, parse_topic};
28use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
29
30pub struct PulsarSplitEnumerator {
31 client: Pulsar<TokioExecutor>,
32 topic: Topic,
33 start_offset: PulsarEnumeratorOffset,
34}
35
36#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
37pub enum PulsarEnumeratorOffset {
38 Earliest,
39 Latest,
40 MessageId(String),
41 Timestamp(i64),
42}
43
44#[async_trait]
45impl SplitEnumerator for PulsarSplitEnumerator {
46 type Properties = PulsarProperties;
47 type Split = PulsarSplit;
48
49 async fn new(
50 properties: PulsarProperties,
51 _context: SourceEnumeratorContextRef,
52 ) -> ConnectorResult<PulsarSplitEnumerator> {
53 let pulsar = properties
54 .common
55 .build_client(&properties.oauth, &properties.aws_auth_props)
56 .await?;
57 let topic = properties.common.topic;
58 let parsed_topic = parse_topic(&topic)?;
59
60 let mut scan_start_offset = match properties
61 .scan_startup_mode
62 .map(|s| s.to_lowercase())
63 .as_deref()
64 {
65 Some("earliest") => PulsarEnumeratorOffset::Earliest,
66 Some("latest") => PulsarEnumeratorOffset::Latest,
67 None => PulsarEnumeratorOffset::Earliest,
68 _ => {
69 bail!(
70 "properties `startup_mode` only supports earliest and latest or leaving it empty"
71 );
72 }
73 };
74
75 if let Some(s) = properties.time_offset {
76 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
77 scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset)
78 }
79
80 Ok(PulsarSplitEnumerator {
81 client: pulsar,
82 topic: parsed_topic,
83 start_offset: scan_start_offset,
84 })
85 }
86
87 async fn list_splits(&mut self) -> ConnectorResult<Vec<PulsarSplit>> {
88 let offset = self.start_offset.clone();
89 assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_)));
91
92 let topic_partitions = self
93 .client
94 .lookup_partitioned_topic_number(&self.topic.to_string())
95 .await
96 .map_err(|e| anyhow!(e))?;
97
98 let splits = if topic_partitions > 0 {
99 (0..topic_partitions as i32)
102 .map(|p| PulsarSplit {
103 topic: self.topic.sub_topic(p).unwrap(),
104 start_offset: offset.clone(),
105 })
106 .collect_vec()
107 } else {
108 if self.topic.domain == PERSISTENT_DOMAIN {
113 check_topic_exists(&self.client, &self.topic).await?;
115 }
116 vec![PulsarSplit {
118 topic: self.topic.clone(),
119 start_offset: offset.clone(),
120 }]
121 };
122
123 Ok(splits)
124 }
125}