risingwave_connector/source/pulsar/enumerator/
client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use itertools::Itertools;
20use pulsar::{Pulsar, TokioExecutor};
21use risingwave_common::bail;
22use serde::{Deserialize, Serialize};
23
24use crate::error::ConnectorResult;
25use crate::source::pulsar::PulsarProperties;
26use crate::source::pulsar::split::PulsarSplit;
27use crate::source::pulsar::topic::{PERSISTENT_DOMAIN, Topic, check_topic_exists, parse_topic};
28use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
29
30pub struct PulsarSplitEnumerator {
31    client: Pulsar<TokioExecutor>,
32    topic: Topic,
33    start_offset: PulsarEnumeratorOffset,
34}
35
36#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
37pub enum PulsarEnumeratorOffset {
38    Earliest,
39    Latest,
40    MessageId(String),
41    Timestamp(i64),
42}
43
44#[async_trait]
45impl SplitEnumerator for PulsarSplitEnumerator {
46    type Properties = PulsarProperties;
47    type Split = PulsarSplit;
48
49    async fn new(
50        properties: PulsarProperties,
51        _context: SourceEnumeratorContextRef,
52    ) -> ConnectorResult<PulsarSplitEnumerator> {
53        let pulsar = properties
54            .common
55            .build_client(
56                &properties.oauth,
57                &properties.aws_auth_props,
58                properties.operation_retry.to_pulsar_options(),
59            )
60            .await?;
61        let topic = properties.common.topic;
62        let parsed_topic = parse_topic(&topic)?;
63
64        let mut scan_start_offset = match properties
65            .scan_startup_mode
66            .map(|s| s.to_lowercase())
67            .as_deref()
68        {
69            Some("earliest") => PulsarEnumeratorOffset::Earliest,
70            Some("latest") => PulsarEnumeratorOffset::Latest,
71            None => PulsarEnumeratorOffset::Earliest,
72            _ => {
73                bail!(
74                    "properties `startup_mode` only supports earliest and latest or leaving it empty"
75                );
76            }
77        };
78
79        if let Some(s) = properties.time_offset {
80            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
81            scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset)
82        }
83
84        Ok(PulsarSplitEnumerator {
85            client: pulsar,
86            topic: parsed_topic,
87            start_offset: scan_start_offset,
88        })
89    }
90
91    async fn list_splits(&mut self) -> ConnectorResult<Vec<PulsarSplit>> {
92        let offset = self.start_offset.clone();
93        // MessageId is only used when recovering from a State
94        assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_)));
95
96        // Reduce async state machine size (see `clippy::large_futures`).
97        let topic_partitions = Box::pin(
98            self.client
99                .lookup_partitioned_topic_number(&self.topic.to_string()),
100        )
101        .await
102        .map_err(|e| anyhow!(e))?;
103
104        let splits = if topic_partitions > 0 {
105            // partitioned topic
106            // if we can know the number of partitions, the topic must exist
107            (0..topic_partitions as i32)
108                .map(|p| PulsarSplit {
109                    topic: self.topic.sub_topic(p).unwrap(),
110                    start_offset: offset.clone(),
111                })
112                .collect_vec()
113        } else {
114            // only do existence check for persistent non-partitioned topic
115            //
116            // for non-persistent topic, all metadata is in broker memory
117            // unless there's a live producer/consumer, the broker may not be aware of the non-persistent topic.
118            if self.topic.domain == PERSISTENT_DOMAIN {
119                // if the topic is non-partitioned, we need to check if the topic exists on the broker
120                // Reduce async state machine size (see `clippy::large_futures`).
121                Box::pin(check_topic_exists(&self.client, &self.topic)).await?;
122            }
123            // non partitioned topic
124            vec![PulsarSplit {
125                topic: self.topic.clone(),
126                start_offset: offset.clone(),
127            }]
128        };
129
130        Ok(splits)
131    }
132}