risingwave_connector/source/google_pubsub/enumerator/
client.rs1use anyhow::Context;
16use async_trait::async_trait;
17use chrono::{TimeZone, Utc};
18use google_cloud_pubsub::subscription::SeekTo;
19use risingwave_common::bail;
20
21use crate::error::ConnectorResult;
22use crate::source::SourceEnumeratorContextRef;
23use crate::source::base::SplitEnumerator;
24use crate::source::google_pubsub::PubsubProperties;
25use crate::source::google_pubsub::split::PubsubSplit;
26
27pub struct PubsubSplitEnumerator {
28 subscription: String,
29}
30
31#[async_trait]
32impl SplitEnumerator for PubsubSplitEnumerator {
33 type Properties = PubsubProperties;
34 type Split = PubsubSplit;
35
36 async fn new(
37 properties: Self::Properties,
38 _context: SourceEnumeratorContextRef,
39 ) -> ConnectorResult<PubsubSplitEnumerator> {
40 if properties.parallelism.is_some() {
41 tracing::warn!(
42 "pubsub.parallelism is deprecated and will be ignored. \
43 Split count now adapts automatically to the number of actors."
44 );
45 }
46
47 if let Some(ack_deadline) = properties.ack_deadline_seconds
48 && !(10..=600).contains(&ack_deadline)
49 {
50 bail!("pubsub.ack_deadline_seconds must be between 10 and 600");
51 }
52
53 if properties.credentials.is_none() && properties.emulator_host.is_none() {
54 bail!("credentials must be set if not using the pubsub emulator")
55 }
56
57 let sub = properties.subscription_client().await?;
58 if !sub
59 .exists(None)
60 .await
61 .context("error checking subscription validity")?
62 {
63 bail!("subscription {} does not exist", &sub.id())
64 }
65
66 let seek_to = match (properties.start_offset, properties.start_snapshot) {
67 (None, None) => None,
68 (Some(start_offset), None) => {
69 let ts = start_offset
70 .parse::<i64>()
71 .context("error parsing start_offset")
72 .map(|nanos| Utc.timestamp_nanos(nanos).into())?;
73 Some(SeekTo::Timestamp(ts))
74 }
75 (None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)),
76 (Some(_), Some(_)) => {
77 bail!("specify at most one of start_offset or start_snapshot")
78 }
79 };
80
81 if let Some(seek_to) = seek_to {
82 sub.seek(seek_to, None)
83 .await
84 .context("error seeking subscription")?;
85 }
86
87 Ok(Self {
88 subscription: properties.subscription,
89 })
90 }
91
92 async fn list_splits(&mut self) -> ConnectorResult<Vec<PubsubSplit>> {
93 tracing::debug!("enumerating pubsub splits (adaptive mode, returning 1 template split)");
94 Ok(vec![PubsubSplit {
95 index: 0,
96 subscription: self.subscription.clone(),
97 __deprecated_start_offset: None,
98 __deprecated_stop_offset: None,
99 }])
100 }
101}