risingwave_connector/source/pulsar/
topic.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use pulsar::proto::command_get_topics_of_namespace::Mode as LookupMode;
17use pulsar::{Pulsar, TokioExecutor};
18use risingwave_common::bail;
19use serde::{Deserialize, Serialize};
20use urlencoding::encode;
21
22use crate::error::ConnectorResult as Result;
23
24pub(crate) const PERSISTENT_DOMAIN: &str = "persistent";
25const NON_PERSISTENT_DOMAIN: &str = "non-persistent";
26const PUBLIC_TENANT: &str = "public";
27const DEFAULT_NAMESPACE: &str = "default";
28const PARTITIONED_TOPIC_SUFFIX: &str = "-partition-";
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash)]
31/// `ParsedTopic` is a parsed topic name, Generated by `parse_topic`.
32pub struct Topic {
33    pub domain: String,
34    pub tenant: String,
35    pub namespace: String,
36    pub topic: String,
37    pub partition_index: Option<i32>,
38}
39
40impl std::fmt::Display for Topic {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(
43            f,
44            "{}://{}/{}/{}",
45            self.domain, self.tenant, self.namespace, self.topic
46        )
47    }
48}
49
50impl Topic {
51    pub fn is_partitioned_topic(&self) -> bool {
52        self.partition_index.is_none()
53    }
54
55    pub fn rest_path(&self) -> String {
56        format!(
57            "{}/{}/{}/{}",
58            self.domain,
59            self.tenant,
60            self.namespace,
61            encode(&self.topic)
62        )
63    }
64
65    pub fn sub_topic(&self, partition: i32) -> Result<Topic> {
66        if partition < 0 {
67            bail!("invalid partition index number");
68        }
69
70        if self.topic.contains(PARTITIONED_TOPIC_SUFFIX) {
71            return Ok(self.clone());
72        }
73
74        Ok(Topic {
75            domain: self.domain.clone(),
76            tenant: self.tenant.clone(),
77            namespace: self.namespace.clone(),
78            topic: format!("{}{}{}", self.topic, PARTITIONED_TOPIC_SUFFIX, partition),
79            partition_index: Some(partition),
80        })
81    }
82
83    pub fn topic_str_without_partition(&self) -> Result<String> {
84        if self.topic.contains(PARTITIONED_TOPIC_SUFFIX) {
85            let parts: Vec<&str> = self.topic.split(PARTITIONED_TOPIC_SUFFIX).collect();
86            Ok(parts[0].to_owned())
87        } else {
88            Ok(self.topic.clone())
89        }
90    }
91}
92
93/// `get_partition_index` returns the partition index of the topic.
94pub fn get_partition_index(topic: &str) -> Result<Option<i32>> {
95    if topic.contains(PARTITIONED_TOPIC_SUFFIX) {
96        let partition = topic
97            .split('-')
98            .next_back()
99            .unwrap()
100            .parse::<i32>()
101            .map_err(|e| anyhow!(e))?;
102
103        Ok(Some(partition))
104    } else {
105        Ok(None)
106    }
107}
108
109/// `parse_topic` parses a topic name into its components.
110/// The short topic name can be:
111/// - `<topic>`
112/// - `<tenant>/<namespace>/<topic>`
113///
114/// The fully qualified topic name can be:
115/// `<domain>://<tenant>/<namespace>/<topic>`
116pub fn parse_topic(topic: &str) -> Result<Topic> {
117    let mut complete_topic = topic.to_owned();
118
119    if !topic.contains("://") {
120        let parts: Vec<&str> = topic.split('/').collect();
121        complete_topic = match parts.len() {
122            1 => format!(
123                "{}://{}/{}/{}",
124                PERSISTENT_DOMAIN, PUBLIC_TENANT, DEFAULT_NAMESPACE, parts[0],
125            ),
126            3 => format!("{}://{}", PERSISTENT_DOMAIN, topic),
127            _ => {
128                bail!(
129                    "Invalid short topic name '{}', \
130                it should be in the format of <tenant>/<namespace>/<topic> or <topic>",
131                    topic
132                );
133            }
134        };
135    }
136
137    let parts: Vec<&str> = complete_topic.splitn(2, "://").collect();
138
139    let domain = match parts[0] {
140        PERSISTENT_DOMAIN | NON_PERSISTENT_DOMAIN => parts[0],
141        _ => {
142            bail!(
143                "The domain only can be specified as 'persistent' or 'non-persistent'. Input domain is '{}'",
144                parts[0]
145            );
146        }
147    };
148
149    let rest = parts[1];
150    let parts: Vec<&str> = rest.splitn(3, '/').collect();
151
152    if parts.len() != 3 {
153        bail!(
154            "invalid topic name '{}', it should be in the format of <tenant>/<namespace>/<topic>",
155            rest
156        );
157    }
158
159    let parsed_topic = Topic {
160        domain: domain.to_owned(),
161        tenant: parts[0].to_owned(),
162        namespace: parts[1].to_owned(),
163        topic: parts[2].to_owned(),
164        partition_index: get_partition_index(complete_topic.as_str())?,
165    };
166
167    if parsed_topic.topic.is_empty() {
168        bail!("topic name cannot be empty".to_owned());
169    }
170
171    Ok(parsed_topic)
172}
173
174pub(crate) async fn check_topic_exists(
175    client: &Pulsar<TokioExecutor>,
176    topic: &Topic,
177) -> Result<()> {
178    // issue about api `get_topics_of_namespace`:
179    // for partitioned topic, the api will return all sub-topic of the topic instead of the topic itself
180    let topics_on_broker = client
181        .get_topics_of_namespace(
182            format!("{}/{}", topic.tenant, topic.namespace),
183            LookupMode::All,
184        )
185        .await?;
186    if !topics_on_broker.contains(&topic.to_string()) {
187        bail!(
188            "topic {} not found on broker, available topics: {:?}",
189            topic,
190            topics_on_broker
191        );
192    }
193
194    Ok(())
195}
196
197#[cfg(test)]
198mod test {
199    use crate::source::pulsar::topic::{get_partition_index, parse_topic};
200
201    #[test]
202    fn test_parse_topic() {
203        assert_eq!(
204            parse_topic("success").unwrap().to_string(),
205            "persistent://public/default/success".to_owned()
206        );
207        assert_eq!(
208            parse_topic("tenant/namespace/success").unwrap().to_string(),
209            "persistent://tenant/namespace/success".to_owned()
210        );
211        assert_eq!(
212            parse_topic("persistent://tenant/namespace/success")
213                .unwrap()
214                .to_string(),
215            "persistent://tenant/namespace/success".to_owned()
216        );
217        assert_eq!(
218            parse_topic("non-persistent://tenant/namespace/success")
219                .unwrap()
220                .to_string(),
221            "non-persistent://tenant/namespace/success".to_owned()
222        );
223
224        assert_eq!(
225            parse_topic("non-persistent://tenant/namespace/success")
226                .unwrap()
227                .partition_index,
228            None
229        );
230
231        assert_eq!(
232            parse_topic("non-persistent://tenant/namespace/success-partition-1")
233                .unwrap()
234                .partition_index,
235            Some(1)
236        );
237        assert_eq!(
238            parse_topic("non-persistent://tenant/namespace/success-partition-1-partition-2")
239                .unwrap()
240                .partition_index,
241            Some(2)
242        );
243    }
244
245    #[test]
246    fn test_get_partition_index() {
247        assert_eq!(get_partition_index("success").unwrap(), None);
248        assert_eq!(get_partition_index("success-partition-1").unwrap(), Some(1));
249        assert_eq!(
250            get_partition_index("success-partition-1-partition-2").unwrap(),
251            Some(2)
252        );
253    }
254}