risingwave_connector/source/google_pubsub/enumerator/
client.rs
1use anyhow::Context;
16use async_trait::async_trait;
17use chrono::{TimeZone, Utc};
18use google_cloud_pubsub::subscription::SeekTo;
19use risingwave_common::bail;
20
21use crate::error::ConnectorResult;
22use crate::source::SourceEnumeratorContextRef;
23use crate::source::base::SplitEnumerator;
24use crate::source::google_pubsub::PubsubProperties;
25use crate::source::google_pubsub::split::PubsubSplit;
26
27pub struct PubsubSplitEnumerator {
28 subscription: String,
29 split_count: u32,
30}
31
32#[async_trait]
33impl SplitEnumerator for PubsubSplitEnumerator {
34 type Properties = PubsubProperties;
35 type Split = PubsubSplit;
36
37 async fn new(
38 properties: Self::Properties,
39 _context: SourceEnumeratorContextRef,
40 ) -> ConnectorResult<PubsubSplitEnumerator> {
41 let split_count = properties.parallelism.unwrap_or(1);
42 if split_count < 1 {
43 bail!("parallelism must be >= 1");
44 };
45
46 if properties.credentials.is_none() && properties.emulator_host.is_none() {
47 bail!("credentials must be set if not using the pubsub emulator")
48 }
49
50 let sub = properties.subscription_client().await?;
51 if !sub
52 .exists(None)
53 .await
54 .context("error checking subscription validity")?
55 {
56 bail!("subscription {} does not exist", &sub.id())
57 }
58
59 let seek_to = match (properties.start_offset, properties.start_snapshot) {
60 (None, None) => None,
61 (Some(start_offset), None) => {
62 let ts = start_offset
63 .parse::<i64>()
64 .context("error parsing start_offset")
65 .map(|nanos| Utc.timestamp_nanos(nanos).into())?;
66 Some(SeekTo::Timestamp(ts))
67 }
68 (None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)),
69 (Some(_), Some(_)) => {
70 bail!("specify at most one of start_offset or start_snapshot")
71 }
72 };
73
74 if let Some(seek_to) = seek_to {
75 sub.seek(seek_to, None)
76 .await
77 .context("error seeking subscription")?;
78 }
79
80 Ok(Self {
81 subscription: properties.subscription.to_owned(),
82 split_count,
83 })
84 }
85
86 async fn list_splits(&mut self) -> ConnectorResult<Vec<PubsubSplit>> {
87 tracing::debug!("enumerating pubsub splits");
88 let splits: Vec<PubsubSplit> = (0..self.split_count)
89 .map(|i| PubsubSplit {
90 index: i,
91 subscription: self.subscription.to_owned(),
92 __deprecated_start_offset: None,
93 __deprecated_stop_offset: None,
94 })
95 .collect();
96
97 Ok(splits)
98 }
99}