risingwave_connector/source/pulsar/
topic.rs1use anyhow::anyhow;
16use pulsar::proto::command_get_topics_of_namespace::Mode as LookupMode;
17use pulsar::{Pulsar, TokioExecutor};
18use risingwave_common::bail;
19use serde::{Deserialize, Serialize};
20use urlencoding::encode;
21
22use crate::error::ConnectorResult as Result;
23
24pub(crate) const PERSISTENT_DOMAIN: &str = "persistent";
25const NON_PERSISTENT_DOMAIN: &str = "non-persistent";
26const PUBLIC_TENANT: &str = "public";
27const DEFAULT_NAMESPACE: &str = "default";
28const PARTITIONED_TOPIC_SUFFIX: &str = "-partition-";
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash)]
31pub struct Topic {
33 pub domain: String,
34 pub tenant: String,
35 pub namespace: String,
36 pub topic: String,
37 pub partition_index: Option<i32>,
38}
39
40impl std::fmt::Display for Topic {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(
43 f,
44 "{}://{}/{}/{}",
45 self.domain, self.tenant, self.namespace, self.topic
46 )
47 }
48}
49
50impl Topic {
51 pub fn is_partitioned_topic(&self) -> bool {
52 self.partition_index.is_none()
53 }
54
55 pub fn rest_path(&self) -> String {
56 format!(
57 "{}/{}/{}/{}",
58 self.domain,
59 self.tenant,
60 self.namespace,
61 encode(&self.topic)
62 )
63 }
64
65 pub fn sub_topic(&self, partition: i32) -> Result<Topic> {
66 if partition < 0 {
67 bail!("invalid partition index number");
68 }
69
70 if self.topic.contains(PARTITIONED_TOPIC_SUFFIX) {
71 return Ok(self.clone());
72 }
73
74 Ok(Topic {
75 domain: self.domain.clone(),
76 tenant: self.tenant.clone(),
77 namespace: self.namespace.clone(),
78 topic: format!("{}{}{}", self.topic, PARTITIONED_TOPIC_SUFFIX, partition),
79 partition_index: Some(partition),
80 })
81 }
82
83 pub fn topic_str_without_partition(&self) -> Result<String> {
84 if self.topic.contains(PARTITIONED_TOPIC_SUFFIX) {
85 let parts: Vec<&str> = self.topic.split(PARTITIONED_TOPIC_SUFFIX).collect();
86 Ok(parts[0].to_owned())
87 } else {
88 Ok(self.topic.clone())
89 }
90 }
91}
92
93pub fn get_partition_index(topic: &str) -> Result<Option<i32>> {
95 if topic.contains(PARTITIONED_TOPIC_SUFFIX) {
96 let partition = topic
97 .split('-')
98 .next_back()
99 .unwrap()
100 .parse::<i32>()
101 .map_err(|e| anyhow!(e))?;
102
103 Ok(Some(partition))
104 } else {
105 Ok(None)
106 }
107}
108
109pub fn parse_topic(topic: &str) -> Result<Topic> {
117 let mut complete_topic = topic.to_owned();
118
119 if !topic.contains("://") {
120 let parts: Vec<&str> = topic.split('/').collect();
121 complete_topic = match parts.len() {
122 1 => format!(
123 "{}://{}/{}/{}",
124 PERSISTENT_DOMAIN, PUBLIC_TENANT, DEFAULT_NAMESPACE, parts[0],
125 ),
126 3 => format!("{}://{}", PERSISTENT_DOMAIN, topic),
127 _ => {
128 bail!(
129 "Invalid short topic name '{}', \
130 it should be in the format of <tenant>/<namespace>/<topic> or <topic>",
131 topic
132 );
133 }
134 };
135 }
136
137 let parts: Vec<&str> = complete_topic.splitn(2, "://").collect();
138
139 let domain = match parts[0] {
140 PERSISTENT_DOMAIN | NON_PERSISTENT_DOMAIN => parts[0],
141 _ => {
142 bail!(
143 "The domain only can be specified as 'persistent' or 'non-persistent'. Input domain is '{}'",
144 parts[0]
145 );
146 }
147 };
148
149 let rest = parts[1];
150 let parts: Vec<&str> = rest.splitn(3, '/').collect();
151
152 if parts.len() != 3 {
153 bail!(
154 "invalid topic name '{}', it should be in the format of <tenant>/<namespace>/<topic>",
155 rest
156 );
157 }
158
159 let parsed_topic = Topic {
160 domain: domain.to_owned(),
161 tenant: parts[0].to_owned(),
162 namespace: parts[1].to_owned(),
163 topic: parts[2].to_owned(),
164 partition_index: get_partition_index(complete_topic.as_str())?,
165 };
166
167 if parsed_topic.topic.is_empty() {
168 bail!("topic name cannot be empty".to_owned());
169 }
170
171 Ok(parsed_topic)
172}
173
174pub(crate) async fn check_topic_exists(
175 client: &Pulsar<TokioExecutor>,
176 topic: &Topic,
177) -> Result<()> {
178 let topics_on_broker = client
181 .get_topics_of_namespace(
182 format!("{}/{}", topic.tenant, topic.namespace),
183 LookupMode::All,
184 )
185 .await?;
186 if !topics_on_broker.contains(&topic.to_string()) {
187 bail!(
188 "topic {} not found on broker, available topics: {:?}",
189 topic,
190 topics_on_broker
191 );
192 }
193
194 Ok(())
195}
196
197#[cfg(test)]
198mod test {
199 use crate::source::pulsar::topic::{get_partition_index, parse_topic};
200
201 #[test]
202 fn test_parse_topic() {
203 assert_eq!(
204 parse_topic("success").unwrap().to_string(),
205 "persistent://public/default/success".to_owned()
206 );
207 assert_eq!(
208 parse_topic("tenant/namespace/success").unwrap().to_string(),
209 "persistent://tenant/namespace/success".to_owned()
210 );
211 assert_eq!(
212 parse_topic("persistent://tenant/namespace/success")
213 .unwrap()
214 .to_string(),
215 "persistent://tenant/namespace/success".to_owned()
216 );
217 assert_eq!(
218 parse_topic("non-persistent://tenant/namespace/success")
219 .unwrap()
220 .to_string(),
221 "non-persistent://tenant/namespace/success".to_owned()
222 );
223
224 assert_eq!(
225 parse_topic("non-persistent://tenant/namespace/success")
226 .unwrap()
227 .partition_index,
228 None
229 );
230
231 assert_eq!(
232 parse_topic("non-persistent://tenant/namespace/success-partition-1")
233 .unwrap()
234 .partition_index,
235 Some(1)
236 );
237 assert_eq!(
238 parse_topic("non-persistent://tenant/namespace/success-partition-1-partition-2")
239 .unwrap()
240 .partition_index,
241 Some(2)
242 );
243 }
244
245 #[test]
246 fn test_get_partition_index() {
247 assert_eq!(get_partition_index("success").unwrap(), None);
248 assert_eq!(get_partition_index("success-partition-1").unwrap(), Some(1));
249 assert_eq!(
250 get_partition_index("success-partition-1-partition-2").unwrap(),
251 Some(2)
252 );
253 }
254}