risingwave_connector/source/pulsar/enumerator/
client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use itertools::Itertools;
20use pulsar::{Pulsar, TokioExecutor};
21use risingwave_common::bail;
22use serde::{Deserialize, Serialize};
23
24use crate::error::ConnectorResult;
25use crate::source::pulsar::PulsarProperties;
26use crate::source::pulsar::split::PulsarSplit;
27use crate::source::pulsar::topic::{PERSISTENT_DOMAIN, Topic, check_topic_exists, parse_topic};
28use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
29
30pub struct PulsarSplitEnumerator {
31    client: Pulsar<TokioExecutor>,
32    topic: Topic,
33    start_offset: PulsarEnumeratorOffset,
34}
35
36#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
37pub enum PulsarEnumeratorOffset {
38    Earliest,
39    Latest,
40    MessageId(String),
41    Timestamp(i64),
42}
43
44#[async_trait]
45impl SplitEnumerator for PulsarSplitEnumerator {
46    type Properties = PulsarProperties;
47    type Split = PulsarSplit;
48
49    async fn new(
50        properties: PulsarProperties,
51        _context: SourceEnumeratorContextRef,
52    ) -> ConnectorResult<PulsarSplitEnumerator> {
53        let pulsar = properties
54            .common
55            .build_client(&properties.oauth, &properties.aws_auth_props)
56            .await?;
57        let topic = properties.common.topic;
58        let parsed_topic = parse_topic(&topic)?;
59
60        let mut scan_start_offset = match properties
61            .scan_startup_mode
62            .map(|s| s.to_lowercase())
63            .as_deref()
64        {
65            Some("earliest") => PulsarEnumeratorOffset::Earliest,
66            Some("latest") => PulsarEnumeratorOffset::Latest,
67            None => PulsarEnumeratorOffset::Earliest,
68            _ => {
69                bail!(
70                    "properties `startup_mode` only supports earliest and latest or leaving it empty"
71                );
72            }
73        };
74
75        if let Some(s) = properties.time_offset {
76            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
77            scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset)
78        }
79
80        Ok(PulsarSplitEnumerator {
81            client: pulsar,
82            topic: parsed_topic,
83            start_offset: scan_start_offset,
84        })
85    }
86
87    async fn list_splits(&mut self) -> ConnectorResult<Vec<PulsarSplit>> {
88        let offset = self.start_offset.clone();
89        // MessageId is only used when recovering from a State
90        assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_)));
91
92        let topic_partitions = self
93            .client
94            .lookup_partitioned_topic_number(&self.topic.to_string())
95            .await
96            .map_err(|e| anyhow!(e))?;
97
98        let splits = if topic_partitions > 0 {
99            // partitioned topic
100            // if we can know the number of partitions, the topic must exist
101            (0..topic_partitions as i32)
102                .map(|p| PulsarSplit {
103                    topic: self.topic.sub_topic(p).unwrap(),
104                    start_offset: offset.clone(),
105                })
106                .collect_vec()
107        } else {
108            // only do existence check for persistent non-partitioned topic
109            //
110            // for non-persistent topic, all metadata is in broker memory
111            // unless there's a live producer/consumer, the broker may not be aware of the non-persistent topic.
112            if self.topic.domain == PERSISTENT_DOMAIN {
113                // if the topic is non-partitioned, we need to check if the topic exists on the broker
114                check_topic_exists(&self.client, &self.topic).await?;
115            }
116            // non partitioned topic
117            vec![PulsarSplit {
118                topic: self.topic.clone(),
119                start_offset: offset.clone(),
120            }]
121        };
122
123        Ok(splits)
124    }
125}