risingwave_connector/source/pulsar/enumerator/
client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use itertools::Itertools;
18use pulsar::{Pulsar, TokioExecutor};
19use risingwave_common::bail;
20use serde::{Deserialize, Serialize};
21
22use crate::error::ConnectorResult;
23use crate::source::pulsar::PulsarProperties;
24use crate::source::pulsar::split::PulsarSplit;
25use crate::source::pulsar::topic::{Topic, parse_topic};
26use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
27
28pub struct PulsarSplitEnumerator {
29    client: Pulsar<TokioExecutor>,
30    topic: Topic,
31    start_offset: PulsarEnumeratorOffset,
32}
33
34#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
35pub enum PulsarEnumeratorOffset {
36    Earliest,
37    Latest,
38    MessageId(String),
39    Timestamp(i64),
40}
41
42#[async_trait]
43impl SplitEnumerator for PulsarSplitEnumerator {
44    type Properties = PulsarProperties;
45    type Split = PulsarSplit;
46
47    async fn new(
48        properties: PulsarProperties,
49        _context: SourceEnumeratorContextRef,
50    ) -> ConnectorResult<PulsarSplitEnumerator> {
51        let pulsar = properties
52            .common
53            .build_client(&properties.oauth, &properties.aws_auth_props)
54            .await?;
55        let topic = properties.common.topic;
56        let parsed_topic = parse_topic(&topic)?;
57
58        let mut scan_start_offset = match properties
59            .scan_startup_mode
60            .map(|s| s.to_lowercase())
61            .as_deref()
62        {
63            Some("earliest") => PulsarEnumeratorOffset::Earliest,
64            Some("latest") => PulsarEnumeratorOffset::Latest,
65            None => PulsarEnumeratorOffset::Earliest,
66            _ => {
67                bail!(
68                    "properties `startup_mode` only supports earliest and latest or leaving it empty"
69                );
70            }
71        };
72
73        if let Some(s) = properties.time_offset {
74            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
75            scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset)
76        }
77
78        Ok(PulsarSplitEnumerator {
79            client: pulsar,
80            topic: parsed_topic,
81            start_offset: scan_start_offset,
82        })
83    }
84
85    async fn list_splits(&mut self) -> ConnectorResult<Vec<PulsarSplit>> {
86        let offset = self.start_offset.clone();
87        // MessageId is only used when recovering from a State
88        assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_)));
89
90        let topic_partitions = self
91            .client
92            .lookup_partitioned_topic_number(&self.topic.to_string())
93            .await
94            .map_err(|e| anyhow!(e))?;
95
96        let splits = if topic_partitions > 0 {
97            // partitioned topic
98            (0..topic_partitions as i32)
99                .map(|p| PulsarSplit {
100                    topic: self.topic.sub_topic(p).unwrap(),
101                    start_offset: offset.clone(),
102                })
103                .collect_vec()
104        } else {
105            // non partitioned topic
106            vec![PulsarSplit {
107                topic: self.topic.clone(),
108                start_offset: offset.clone(),
109            }]
110        };
111
112        Ok(splits)
113    }
114}