risingwave_connector/source/google_pubsub/mod.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::Context;
18use google_cloud_pubsub::client::{Client, ClientConfig};
19use google_cloud_pubsub::subscription::Subscription;
20use serde::Deserialize;
21
22pub mod enumerator;
23pub mod source;
24pub mod split;
25pub use enumerator::*;
26use serde_with::{DisplayFromStr, serde_as};
27pub use source::*;
28pub use split::*;
29use with_options::WithOptions;
30
31use crate::error::ConnectorResult;
32use crate::source::SourceProperties;
33
34pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub";
35
36/// # Implementation Notes
37/// Pub/Sub does not rely on persisted state (`SplitImpl`) to start from a position.
38/// It rely on Pub/Sub to load-balance messages between all Readers.
39/// We `ack` received messages after checkpoint (see `WaitCheckpointWorker`) to achieve at-least-once delivery.
40#[serde_as]
41#[derive(Clone, Debug, Deserialize, WithOptions)]
42pub struct PubsubProperties {
43 /// Pub/Sub subscription to consume messages from.
44 ///
45 /// Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from
46 /// the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV
47 /// (shared between the actors of its fragment).
48 /// Otherwise, different MVs on the same Source will both receive part of the messages.
49 /// TODO: check and enforce this on Meta.
50 #[serde(rename = "pubsub.subscription")]
51 pub subscription: String,
52
53 /// use the connector with a pubsub emulator
54 /// <https://cloud.google.com/pubsub/docs/emulator>
55 #[serde(rename = "pubsub.emulator_host")]
56 pub emulator_host: Option<String>,
57
58 /// `credentials` is a JSON string containing the service account credentials.
59 /// See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account).
60 /// The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).
61 #[serde(rename = "pubsub.credentials")]
62 pub credentials: Option<String>,
63
64 /// `start_offset` is a numeric timestamp, ideally the publish timestamp of a message
65 /// in the subscription. If present, the connector will attempt to seek the subscription
66 /// to the timestamp and start consuming from there. Note that the seek operation is
67 /// subject to limitations around the message retention policy of the subscription. See
68 /// [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for
69 /// more details.
70 #[serde(rename = "pubsub.start_offset.nanos")]
71 pub start_offset: Option<String>,
72
73 /// `start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek
74 /// to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism
75 /// in pub/sub because they guarantee retention of:
76 /// - All unacknowledged messages at the time of their creation.
77 /// - All messages created after their creation.
78 /// Besides retention guarantees, snapshots are also more precise than timestamp-based seeks.
79 /// See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for
80 /// more details.
81 #[serde(rename = "pubsub.start_snapshot")]
82 pub start_snapshot: Option<String>,
83
84 /// `parallelism` is the number of parallel consumers to run for the subscription.
85 /// TODO: use system parallelism if not set
86 #[serde_as(as = "Option<DisplayFromStr>")]
87 #[serde(rename = "pubsub.parallelism")]
88 pub parallelism: Option<u32>,
89
90 #[serde(flatten)]
91 pub unknown_fields: HashMap<String, String>,
92}
93
94impl SourceProperties for PubsubProperties {
95 type Split = PubsubSplit;
96 type SplitEnumerator = PubsubSplitEnumerator;
97 type SplitReader = PubsubSplitReader;
98
99 const SOURCE_NAME: &'static str = GOOGLE_PUBSUB_CONNECTOR;
100}
101
102impl crate::source::UnknownFields for PubsubProperties {
103 fn unknown_fields(&self) -> HashMap<String, String> {
104 self.unknown_fields.clone()
105 }
106}
107
108impl PubsubProperties {
109 pub(crate) async fn subscription_client(&self) -> ConnectorResult<Subscription> {
110 // initialize env
111 {
112 tracing::debug!("setting pubsub environment variables");
113 if let Some(emulator_host) = &self.emulator_host {
114 // safety: only read in the same thread below in with_auth
115 unsafe { std::env::set_var("PUBSUB_EMULATOR_HOST", emulator_host) };
116 }
117 if let Some(credentials) = &self.credentials {
118 // safety: only read in the same thread below in with_auth
119 unsafe { std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials) };
120 }
121 };
122
123 // Validate config
124 let config = ClientConfig::default().with_auth().await?;
125 let client = Client::new(config)
126 .await
127 .context("error initializing pubsub client")?;
128
129 Ok(client.subscription(&self.subscription))
130 }
131}