risingwave_connector/source/pulsar/
topic.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::bail;
17use serde::{Deserialize, Serialize};
18use urlencoding::encode;
19
20use crate::error::ConnectorResult as Result;
21
22const PERSISTENT_DOMAIN: &str = "persistent";
23const NON_PERSISTENT_DOMAIN: &str = "non-persistent";
24const PUBLIC_TENANT: &str = "public";
25const DEFAULT_NAMESPACE: &str = "default";
26const PARTITIONED_TOPIC_SUFFIX: &str = "-partition-";
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash)]
29/// `ParsedTopic` is a parsed topic name, Generated by `parse_topic`.
30pub struct Topic {
31    pub domain: String,
32    pub tenant: String,
33    pub namespace: String,
34    pub topic: String,
35    pub partition_index: Option<i32>,
36}
37
38impl std::fmt::Display for Topic {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(
41            f,
42            "{}://{}/{}/{}",
43            self.domain, self.tenant, self.namespace, self.topic
44        )
45    }
46}
47
48impl Topic {
49    pub fn is_partitioned_topic(&self) -> bool {
50        self.partition_index.is_none()
51    }
52
53    pub fn rest_path(&self) -> String {
54        format!(
55            "{}/{}/{}/{}",
56            self.domain,
57            self.tenant,
58            self.namespace,
59            encode(&self.topic)
60        )
61    }
62
63    pub fn sub_topic(&self, partition: i32) -> Result<Topic> {
64        if partition < 0 {
65            bail!("invalid partition index number");
66        }
67
68        if self.topic.contains(PARTITIONED_TOPIC_SUFFIX) {
69            return Ok(self.clone());
70        }
71
72        Ok(Topic {
73            domain: self.domain.clone(),
74            tenant: self.tenant.clone(),
75            namespace: self.namespace.clone(),
76            topic: format!("{}{}{}", self.topic, PARTITIONED_TOPIC_SUFFIX, partition),
77            partition_index: Some(partition),
78        })
79    }
80
81    pub fn topic_str_without_partition(&self) -> Result<String> {
82        if self.topic.contains(PARTITIONED_TOPIC_SUFFIX) {
83            let parts: Vec<&str> = self.topic.split(PARTITIONED_TOPIC_SUFFIX).collect();
84            Ok(parts[0].to_owned())
85        } else {
86            Ok(self.topic.clone())
87        }
88    }
89}
90
91/// `get_partition_index` returns the partition index of the topic.
92pub fn get_partition_index(topic: &str) -> Result<Option<i32>> {
93    if topic.contains(PARTITIONED_TOPIC_SUFFIX) {
94        let partition = topic
95            .split('-')
96            .next_back()
97            .unwrap()
98            .parse::<i32>()
99            .map_err(|e| anyhow!(e))?;
100
101        Ok(Some(partition))
102    } else {
103        Ok(None)
104    }
105}
106
107/// `parse_topic` parses a topic name into its components.
108/// The short topic name can be:
109/// - `<topic>`
110/// - `<tenant>/<namespace>/<topic>`
111///
112/// The fully qualified topic name can be:
113/// `<domain>://<tenant>/<namespace>/<topic>`
114pub fn parse_topic(topic: &str) -> Result<Topic> {
115    let mut complete_topic = topic.to_owned();
116
117    if !topic.contains("://") {
118        let parts: Vec<&str> = topic.split('/').collect();
119        complete_topic = match parts.len() {
120            1 => format!(
121                "{}://{}/{}/{}",
122                PERSISTENT_DOMAIN, PUBLIC_TENANT, DEFAULT_NAMESPACE, parts[0],
123            ),
124            3 => format!("{}://{}", PERSISTENT_DOMAIN, topic),
125            _ => {
126                bail!(
127                    "Invalid short topic name '{}', \
128                it should be in the format of <tenant>/<namespace>/<topic> or <topic>",
129                    topic
130                );
131            }
132        };
133    }
134
135    let parts: Vec<&str> = complete_topic.splitn(2, "://").collect();
136
137    let domain = match parts[0] {
138        PERSISTENT_DOMAIN | NON_PERSISTENT_DOMAIN => parts[0],
139        _ => {
140            bail!(
141                "The domain only can be specified as 'persistent' or 'non-persistent'. Input domain is '{}'",
142                parts[0]
143            );
144        }
145    };
146
147    let rest = parts[1];
148    let parts: Vec<&str> = rest.splitn(3, '/').collect();
149
150    if parts.len() != 3 {
151        bail!(
152            "invalid topic name '{}', it should be in the format of <tenant>/<namespace>/<topic>",
153            rest
154        );
155    }
156
157    let parsed_topic = Topic {
158        domain: domain.to_owned(),
159        tenant: parts[0].to_owned(),
160        namespace: parts[1].to_owned(),
161        topic: parts[2].to_owned(),
162        partition_index: get_partition_index(complete_topic.as_str())?,
163    };
164
165    if parsed_topic.topic.is_empty() {
166        bail!("topic name cannot be empty".to_owned());
167    }
168
169    Ok(parsed_topic)
170}
171
172#[cfg(test)]
173mod test {
174    use crate::source::pulsar::topic::{get_partition_index, parse_topic};
175
176    #[test]
177    fn test_parse_topic() {
178        assert_eq!(
179            parse_topic("success").unwrap().to_string(),
180            "persistent://public/default/success".to_owned()
181        );
182        assert_eq!(
183            parse_topic("tenant/namespace/success").unwrap().to_string(),
184            "persistent://tenant/namespace/success".to_owned()
185        );
186        assert_eq!(
187            parse_topic("persistent://tenant/namespace/success")
188                .unwrap()
189                .to_string(),
190            "persistent://tenant/namespace/success".to_owned()
191        );
192        assert_eq!(
193            parse_topic("non-persistent://tenant/namespace/success")
194                .unwrap()
195                .to_string(),
196            "non-persistent://tenant/namespace/success".to_owned()
197        );
198
199        assert_eq!(
200            parse_topic("non-persistent://tenant/namespace/success")
201                .unwrap()
202                .partition_index,
203            None
204        );
205
206        assert_eq!(
207            parse_topic("non-persistent://tenant/namespace/success-partition-1")
208                .unwrap()
209                .partition_index,
210            Some(1)
211        );
212        assert_eq!(
213            parse_topic("non-persistent://tenant/namespace/success-partition-1-partition-2")
214                .unwrap()
215                .partition_index,
216            Some(2)
217        );
218    }
219
220    #[test]
221    fn test_get_partition_index() {
222        assert_eq!(get_partition_index("success").unwrap(), None);
223        assert_eq!(get_partition_index("success-partition-1").unwrap(), Some(1));
224        assert_eq!(
225            get_partition_index("success-partition-1-partition-2").unwrap(),
226            Some(2)
227        );
228    }
229}