risingwave_connector/source/google_pubsub/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::Context;
18use google_cloud_pubsub::client::{Client, ClientConfig};
19use google_cloud_pubsub::subscription::Subscription;
20use serde::Deserialize;
21
22pub mod enumerator;
23pub mod source;
24pub mod split;
25
26pub use enumerator::*;
27use phf::{Set, phf_set};
28use serde_with::{DisplayFromStr, serde_as};
29pub use source::*;
30pub use split::*;
31use with_options::WithOptions;
32
33use crate::enforce_secret::EnforceSecret;
34use crate::error::ConnectorResult;
35use crate::source::SourceProperties;
36
37pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub";
38
39/// # Implementation Notes
40/// Pub/Sub does not rely on persisted state (`SplitImpl`) to start from a position.
41/// It rely on Pub/Sub to load-balance messages between all Readers.
42/// We `ack` received messages after checkpoint (see `WaitCheckpointWorker`) to achieve at-least-once delivery.
43#[serde_as]
44#[derive(Clone, Debug, Deserialize, WithOptions)]
45pub struct PubsubProperties {
46    /// Pub/Sub subscription to consume messages from.
47    ///
48    /// Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from
49    /// the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV
50    /// (shared between the actors of its fragment).
51    /// Otherwise, different MVs on the same Source will both receive part of the messages.
52    /// TODO: check and enforce this on Meta.
53    #[serde(rename = "pubsub.subscription")]
54    pub subscription: String,
55
56    /// use the connector with a pubsub emulator
57    /// <https://cloud.google.com/pubsub/docs/emulator>
58    #[serde(rename = "pubsub.emulator_host")]
59    pub emulator_host: Option<String>,
60
61    /// `credentials` is a JSON string containing the service account credentials.
62    /// See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account).
63    /// The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).
64    #[serde(rename = "pubsub.credentials")]
65    pub credentials: Option<String>,
66
67    /// `start_offset` is a numeric timestamp, ideally the publish timestamp of a message
68    /// in the subscription. If present, the connector will attempt to seek the subscription
69    /// to the timestamp and start consuming from there. Note that the seek operation is
70    /// subject to limitations around the message retention policy of the subscription. See
71    /// [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for
72    /// more details.
73    #[serde(rename = "pubsub.start_offset.nanos")]
74    pub start_offset: Option<String>,
75
76    /// `start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek
77    /// to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism
78    /// in pub/sub because they guarantee retention of:
79    /// - All unacknowledged messages at the time of their creation.
80    /// - All messages created after their creation.
81    /// Besides retention guarantees, snapshots are also more precise than timestamp-based seeks.
82    /// See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for
83    /// more details.
84    #[serde(rename = "pubsub.start_snapshot")]
85    pub start_snapshot: Option<String>,
86
87    /// Deprecated: ignored since adaptive split mode was introduced.
88    /// Split count now adapts automatically to the number of actors.
89    /// Kept for backward compatibility with existing DDL.
90    #[serde_as(as = "Option<DisplayFromStr>")]
91    #[serde(rename = "pubsub.parallelism")]
92    pub parallelism: Option<u32>,
93
94    /// The ack deadline in seconds for the streaming pull subscriber.
95    /// This is the maximum time the server will wait for an ack before redelivering the message.
96    /// Must be between 10 and 600 seconds. Defaults to 60.
97    #[serde_as(as = "Option<DisplayFromStr>")]
98    #[serde(rename = "pubsub.ack_deadline_seconds")]
99    #[with_option(allow_alter_on_fly)]
100    pub ack_deadline_seconds: Option<i32>,
101
102    #[serde(flatten)]
103    pub unknown_fields: HashMap<String, String>,
104}
105
106impl EnforceSecret for PubsubProperties {
107    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
108        "pubsub.credentials",
109    };
110}
111
112impl SourceProperties for PubsubProperties {
113    type Split = PubsubSplit;
114    type SplitEnumerator = PubsubSplitEnumerator;
115    type SplitReader = PubsubSplitReader;
116
117    const SOURCE_NAME: &'static str = GOOGLE_PUBSUB_CONNECTOR;
118}
119
120impl crate::source::UnknownFields for PubsubProperties {
121    fn unknown_fields(&self) -> HashMap<String, String> {
122        self.unknown_fields.clone()
123    }
124}
125
126impl PubsubProperties {
127    pub(crate) async fn subscription_client(&self) -> ConnectorResult<Subscription> {
128        // initialize env
129        {
130            tracing::debug!("setting pubsub environment variables");
131            if let Some(emulator_host) = &self.emulator_host {
132                // safety: only read in the same thread below in with_auth
133                unsafe { std::env::set_var("PUBSUB_EMULATOR_HOST", emulator_host) };
134            }
135            if let Some(credentials) = &self.credentials {
136                // safety: only read in the same thread below in with_auth
137                unsafe { std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials) };
138            }
139        };
140
141        // Validate config
142        let config = ClientConfig::default().with_auth().await?;
143        let client = Client::new(config)
144            .await
145            .context("error initializing pubsub client")?;
146
147        Ok(client.subscription(&self.subscription))
148    }
149}