risingwave_connector/source/google_pubsub/enumerator/
client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use async_trait::async_trait;
17use chrono::{TimeZone, Utc};
18use google_cloud_pubsub::subscription::SeekTo;
19use risingwave_common::bail;
20
21use crate::error::ConnectorResult;
22use crate::source::SourceEnumeratorContextRef;
23use crate::source::base::SplitEnumerator;
24use crate::source::google_pubsub::PubsubProperties;
25use crate::source::google_pubsub::split::PubsubSplit;
26
27pub struct PubsubSplitEnumerator {
28    subscription: String,
29    split_count: u32,
30}
31
32#[async_trait]
33impl SplitEnumerator for PubsubSplitEnumerator {
34    type Properties = PubsubProperties;
35    type Split = PubsubSplit;
36
37    async fn new(
38        properties: Self::Properties,
39        _context: SourceEnumeratorContextRef,
40    ) -> ConnectorResult<PubsubSplitEnumerator> {
41        let split_count = properties.parallelism.unwrap_or(1);
42        if split_count < 1 {
43            bail!("parallelism must be >= 1");
44        };
45
46        if properties.credentials.is_none() && properties.emulator_host.is_none() {
47            bail!("credentials must be set if not using the pubsub emulator")
48        }
49
50        let sub = properties.subscription_client().await?;
51        if !sub
52            .exists(None)
53            .await
54            .context("error checking subscription validity")?
55        {
56            bail!("subscription {} does not exist", &sub.id())
57        }
58
59        let seek_to = match (properties.start_offset, properties.start_snapshot) {
60            (None, None) => None,
61            (Some(start_offset), None) => {
62                let ts = start_offset
63                    .parse::<i64>()
64                    .context("error parsing start_offset")
65                    .map(|nanos| Utc.timestamp_nanos(nanos).into())?;
66                Some(SeekTo::Timestamp(ts))
67            }
68            (None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)),
69            (Some(_), Some(_)) => {
70                bail!("specify at most one of start_offset or start_snapshot")
71            }
72        };
73
74        if let Some(seek_to) = seek_to {
75            sub.seek(seek_to, None)
76                .await
77                .context("error seeking subscription")?;
78        }
79
80        Ok(Self {
81            subscription: properties.subscription.to_owned(),
82            split_count,
83        })
84    }
85
86    async fn list_splits(&mut self) -> ConnectorResult<Vec<PubsubSplit>> {
87        tracing::debug!("enumerating pubsub splits");
88        let splits: Vec<PubsubSplit> = (0..self.split_count)
89            .map(|i| PubsubSplit {
90                index: i,
91                subscription: self.subscription.to_owned(),
92                __deprecated_start_offset: None,
93                __deprecated_stop_offset: None,
94            })
95            .collect();
96
97        Ok(splits)
98    }
99}