risingwave_connector/source/google_pubsub/enumerator/
client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use async_trait::async_trait;
17use chrono::{TimeZone, Utc};
18use google_cloud_pubsub::subscription::SeekTo;
19use risingwave_common::bail;
20
21use crate::error::ConnectorResult;
22use crate::source::SourceEnumeratorContextRef;
23use crate::source::base::SplitEnumerator;
24use crate::source::google_pubsub::PubsubProperties;
25use crate::source::google_pubsub::split::PubsubSplit;
26
27pub struct PubsubSplitEnumerator {
28    subscription: String,
29}
30
31#[async_trait]
32impl SplitEnumerator for PubsubSplitEnumerator {
33    type Properties = PubsubProperties;
34    type Split = PubsubSplit;
35
36    async fn new(
37        properties: Self::Properties,
38        _context: SourceEnumeratorContextRef,
39    ) -> ConnectorResult<PubsubSplitEnumerator> {
40        if properties.parallelism.is_some() {
41            tracing::warn!(
42                "pubsub.parallelism is deprecated and will be ignored. \
43                 Split count now adapts automatically to the number of actors."
44            );
45        }
46
47        if let Some(ack_deadline) = properties.ack_deadline_seconds
48            && !(10..=600).contains(&ack_deadline)
49        {
50            bail!("pubsub.ack_deadline_seconds must be between 10 and 600");
51        }
52
53        if properties.credentials.is_none() && properties.emulator_host.is_none() {
54            bail!("credentials must be set if not using the pubsub emulator")
55        }
56
57        let sub = properties.subscription_client().await?;
58        if !sub
59            .exists(None)
60            .await
61            .context("error checking subscription validity")?
62        {
63            bail!("subscription {} does not exist", &sub.id())
64        }
65
66        let seek_to = match (properties.start_offset, properties.start_snapshot) {
67            (None, None) => None,
68            (Some(start_offset), None) => {
69                let ts = start_offset
70                    .parse::<i64>()
71                    .context("error parsing start_offset")
72                    .map(|nanos| Utc.timestamp_nanos(nanos).into())?;
73                Some(SeekTo::Timestamp(ts))
74            }
75            (None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)),
76            (Some(_), Some(_)) => {
77                bail!("specify at most one of start_offset or start_snapshot")
78            }
79        };
80
81        if let Some(seek_to) = seek_to {
82            sub.seek(seek_to, None)
83                .await
84                .context("error seeking subscription")?;
85        }
86
87        Ok(Self {
88            subscription: properties.subscription,
89        })
90    }
91
92    async fn list_splits(&mut self) -> ConnectorResult<Vec<PubsubSplit>> {
93        tracing::debug!("enumerating pubsub splits (adaptive mode, returning 1 template split)");
94        Ok(vec![PubsubSplit {
95            index: 0,
96            subscription: self.subscription.clone(),
97            __deprecated_start_offset: None,
98            __deprecated_stop_offset: None,
99        }])
100    }
101}