risingwave_connector/source/pulsar/
topic.rs1use anyhow::anyhow;
16use risingwave_common::bail;
17use serde::{Deserialize, Serialize};
18use urlencoding::encode;
19
20use crate::error::ConnectorResult as Result;
21
22const PERSISTENT_DOMAIN: &str = "persistent";
23const NON_PERSISTENT_DOMAIN: &str = "non-persistent";
24const PUBLIC_TENANT: &str = "public";
25const DEFAULT_NAMESPACE: &str = "default";
26const PARTITIONED_TOPIC_SUFFIX: &str = "-partition-";
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash)]
29pub struct Topic {
31 pub domain: String,
32 pub tenant: String,
33 pub namespace: String,
34 pub topic: String,
35 pub partition_index: Option<i32>,
36}
37
38impl std::fmt::Display for Topic {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(
41 f,
42 "{}://{}/{}/{}",
43 self.domain, self.tenant, self.namespace, self.topic
44 )
45 }
46}
47
48impl Topic {
49 pub fn is_partitioned_topic(&self) -> bool {
50 self.partition_index.is_none()
51 }
52
53 pub fn rest_path(&self) -> String {
54 format!(
55 "{}/{}/{}/{}",
56 self.domain,
57 self.tenant,
58 self.namespace,
59 encode(&self.topic)
60 )
61 }
62
63 pub fn sub_topic(&self, partition: i32) -> Result<Topic> {
64 if partition < 0 {
65 bail!("invalid partition index number");
66 }
67
68 if self.topic.contains(PARTITIONED_TOPIC_SUFFIX) {
69 return Ok(self.clone());
70 }
71
72 Ok(Topic {
73 domain: self.domain.clone(),
74 tenant: self.tenant.clone(),
75 namespace: self.namespace.clone(),
76 topic: format!("{}{}{}", self.topic, PARTITIONED_TOPIC_SUFFIX, partition),
77 partition_index: Some(partition),
78 })
79 }
80
81 pub fn topic_str_without_partition(&self) -> Result<String> {
82 if self.topic.contains(PARTITIONED_TOPIC_SUFFIX) {
83 let parts: Vec<&str> = self.topic.split(PARTITIONED_TOPIC_SUFFIX).collect();
84 Ok(parts[0].to_owned())
85 } else {
86 Ok(self.topic.clone())
87 }
88 }
89}
90
91pub fn get_partition_index(topic: &str) -> Result<Option<i32>> {
93 if topic.contains(PARTITIONED_TOPIC_SUFFIX) {
94 let partition = topic
95 .split('-')
96 .next_back()
97 .unwrap()
98 .parse::<i32>()
99 .map_err(|e| anyhow!(e))?;
100
101 Ok(Some(partition))
102 } else {
103 Ok(None)
104 }
105}
106
107pub fn parse_topic(topic: &str) -> Result<Topic> {
115 let mut complete_topic = topic.to_owned();
116
117 if !topic.contains("://") {
118 let parts: Vec<&str> = topic.split('/').collect();
119 complete_topic = match parts.len() {
120 1 => format!(
121 "{}://{}/{}/{}",
122 PERSISTENT_DOMAIN, PUBLIC_TENANT, DEFAULT_NAMESPACE, parts[0],
123 ),
124 3 => format!("{}://{}", PERSISTENT_DOMAIN, topic),
125 _ => {
126 bail!(
127 "Invalid short topic name '{}', \
128 it should be in the format of <tenant>/<namespace>/<topic> or <topic>",
129 topic
130 );
131 }
132 };
133 }
134
135 let parts: Vec<&str> = complete_topic.splitn(2, "://").collect();
136
137 let domain = match parts[0] {
138 PERSISTENT_DOMAIN | NON_PERSISTENT_DOMAIN => parts[0],
139 _ => {
140 bail!(
141 "The domain only can be specified as 'persistent' or 'non-persistent'. Input domain is '{}'",
142 parts[0]
143 );
144 }
145 };
146
147 let rest = parts[1];
148 let parts: Vec<&str> = rest.splitn(3, '/').collect();
149
150 if parts.len() != 3 {
151 bail!(
152 "invalid topic name '{}', it should be in the format of <tenant>/<namespace>/<topic>",
153 rest
154 );
155 }
156
157 let parsed_topic = Topic {
158 domain: domain.to_owned(),
159 tenant: parts[0].to_owned(),
160 namespace: parts[1].to_owned(),
161 topic: parts[2].to_owned(),
162 partition_index: get_partition_index(complete_topic.as_str())?,
163 };
164
165 if parsed_topic.topic.is_empty() {
166 bail!("topic name cannot be empty".to_owned());
167 }
168
169 Ok(parsed_topic)
170}
171
172#[cfg(test)]
173mod test {
174 use crate::source::pulsar::topic::{get_partition_index, parse_topic};
175
176 #[test]
177 fn test_parse_topic() {
178 assert_eq!(
179 parse_topic("success").unwrap().to_string(),
180 "persistent://public/default/success".to_owned()
181 );
182 assert_eq!(
183 parse_topic("tenant/namespace/success").unwrap().to_string(),
184 "persistent://tenant/namespace/success".to_owned()
185 );
186 assert_eq!(
187 parse_topic("persistent://tenant/namespace/success")
188 .unwrap()
189 .to_string(),
190 "persistent://tenant/namespace/success".to_owned()
191 );
192 assert_eq!(
193 parse_topic("non-persistent://tenant/namespace/success")
194 .unwrap()
195 .to_string(),
196 "non-persistent://tenant/namespace/success".to_owned()
197 );
198
199 assert_eq!(
200 parse_topic("non-persistent://tenant/namespace/success")
201 .unwrap()
202 .partition_index,
203 None
204 );
205
206 assert_eq!(
207 parse_topic("non-persistent://tenant/namespace/success-partition-1")
208 .unwrap()
209 .partition_index,
210 Some(1)
211 );
212 assert_eq!(
213 parse_topic("non-persistent://tenant/namespace/success-partition-1-partition-2")
214 .unwrap()
215 .partition_index,
216 Some(2)
217 );
218 }
219
220 #[test]
221 fn test_get_partition_index() {
222 assert_eq!(get_partition_index("success").unwrap(), None);
223 assert_eq!(get_partition_index("success-partition-1").unwrap(), Some(1));
224 assert_eq!(
225 get_partition_index("success-partition-1-partition-2").unwrap(),
226 Some(2)
227 );
228 }
229}