risingwave_connector/source/pulsar/enumerator/
client.rs1use std::fmt::Debug;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use itertools::Itertools;
20use pulsar::{Pulsar, TokioExecutor};
21use risingwave_common::bail;
22use serde::{Deserialize, Serialize};
23
24use crate::error::ConnectorResult;
25use crate::source::pulsar::PulsarProperties;
26use crate::source::pulsar::split::PulsarSplit;
27use crate::source::pulsar::topic::{PERSISTENT_DOMAIN, Topic, check_topic_exists, parse_topic};
28use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
29
30pub struct PulsarSplitEnumerator {
31 client: Pulsar<TokioExecutor>,
32 topic: Topic,
33 start_offset: PulsarEnumeratorOffset,
34}
35
36#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
37pub enum PulsarEnumeratorOffset {
38 Earliest,
39 Latest,
40 MessageId(String),
41 Timestamp(i64),
42}
43
44#[async_trait]
45impl SplitEnumerator for PulsarSplitEnumerator {
46 type Properties = PulsarProperties;
47 type Split = PulsarSplit;
48
49 async fn new(
50 properties: PulsarProperties,
51 _context: SourceEnumeratorContextRef,
52 ) -> ConnectorResult<PulsarSplitEnumerator> {
53 let pulsar = properties
54 .common
55 .build_client(
56 &properties.oauth,
57 &properties.aws_auth_props,
58 properties.operation_retry.to_pulsar_options(),
59 )
60 .await?;
61 let topic = properties.common.topic;
62 let parsed_topic = parse_topic(&topic)?;
63
64 let mut scan_start_offset = match properties
65 .scan_startup_mode
66 .map(|s| s.to_lowercase())
67 .as_deref()
68 {
69 Some("earliest") => PulsarEnumeratorOffset::Earliest,
70 Some("latest") => PulsarEnumeratorOffset::Latest,
71 None => PulsarEnumeratorOffset::Earliest,
72 _ => {
73 bail!(
74 "properties `startup_mode` only supports earliest and latest or leaving it empty"
75 );
76 }
77 };
78
79 if let Some(s) = properties.time_offset {
80 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
81 scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset)
82 }
83
84 Ok(PulsarSplitEnumerator {
85 client: pulsar,
86 topic: parsed_topic,
87 start_offset: scan_start_offset,
88 })
89 }
90
91 async fn list_splits(&mut self) -> ConnectorResult<Vec<PulsarSplit>> {
92 let offset = self.start_offset.clone();
93 assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_)));
95
96 let topic_partitions = Box::pin(
98 self.client
99 .lookup_partitioned_topic_number(&self.topic.to_string()),
100 )
101 .await
102 .map_err(|e| anyhow!(e))?;
103
104 let splits = if topic_partitions > 0 {
105 (0..topic_partitions as i32)
108 .map(|p| PulsarSplit {
109 topic: self.topic.sub_topic(p).unwrap(),
110 start_offset: offset.clone(),
111 })
112 .collect_vec()
113 } else {
114 if self.topic.domain == PERSISTENT_DOMAIN {
119 Box::pin(check_topic_exists(&self.client, &self.topic)).await?;
122 }
123 vec![PulsarSplit {
125 topic: self.topic.clone(),
126 start_offset: offset.clone(),
127 }]
128 };
129
130 Ok(splits)
131 }
132}