risingwave_connector/sink/
google_pubsub.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use google_cloud_gax::conn::Environment;
19use google_cloud_googleapis::pubsub::v1::PubsubMessage;
20use google_cloud_pubsub::apiv1;
21use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
22use google_cloud_pubsub::client::google_cloud_auth::project;
23use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
24use google_cloud_pubsub::client::{Client, ClientConfig};
25use google_cloud_pubsub::publisher::Publisher;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::catalog::Schema;
28use serde_derive::Deserialize;
29use serde_with::serde_as;
30use tonic::Status;
31use with_options::WithOptions;
32
33use super::catalog::SinkFormatDesc;
34use super::formatter::SinkFormatterImpl;
35use super::log_store::DeliveryFutureManagerAddFuture;
36use super::writer::{
37    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
40use crate::dispatch_sink_formatter_str_key_impl;
41use crate::enforce_secret::EnforceSecret;
42
43pub const PUBSUB_SINK: &str = "google_pubsub";
44const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
45
46mod delivery_future {
47    use anyhow::Context;
48    use futures::future::try_join_all;
49    use futures::{FutureExt, TryFuture, TryFutureExt};
50    use google_cloud_pubsub::publisher::Awaiter;
51
52    use crate::sink::SinkError;
53
54    pub type GooglePubSubSinkDeliveryFuture =
55        impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
56
57    #[define_opaque(GooglePubSubSinkDeliveryFuture)]
58    pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
59        try_join_all(awaiter.into_iter().map(|awaiter| {
60            awaiter.get().map(|result| {
61                result
62                    .context("Google Pub/Sub sink error")
63                    .map_err(SinkError::GooglePubSub)
64                    .map(|_| ())
65            })
66        }))
67        .map_ok(|_: Vec<()>| ())
68        .boxed()
69    }
70}
71
72use delivery_future::*;
73
74#[serde_as]
75#[derive(Clone, Debug, Deserialize, WithOptions)]
76pub struct GooglePubSubConfig {
77    /// The Google Pub/Sub Project ID
78    #[serde(rename = "pubsub.project_id")]
79    pub project_id: String,
80
81    /// Specifies the Pub/Sub topic to publish messages
82    #[serde(rename = "pubsub.topic")]
83    pub topic: String,
84
85    /// The Google Pub/Sub endpoint URL
86    #[serde(rename = "pubsub.endpoint")]
87    pub endpoint: String,
88
89    /// use the connector with a pubsub emulator
90    /// <https://cloud.google.com/pubsub/docs/emulator>
91    #[serde(rename = "pubsub.emulator_host")]
92    pub emulator_host: Option<String>,
93
94    /// A JSON string containing the service account credentials for authorization,
95    /// see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide.
96    /// The provided account credential must have the
97    /// `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles)
98    #[serde(rename = "pubsub.credentials")]
99    pub credentials: Option<String>,
100}
101
102impl EnforceSecret for GooglePubSubConfig {
103    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
104        "pubsub.credentials",
105    };
106}
107
108impl GooglePubSubConfig {
109    fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
110        serde_json::from_value::<GooglePubSubConfig>(serde_json::to_value(values).unwrap())
111            .map_err(|e| SinkError::Config(anyhow!(e)))
112    }
113}
114
115#[derive(Clone, Debug)]
116pub struct GooglePubSubSink {
117    pub config: GooglePubSubConfig,
118    is_append_only: bool,
119
120    schema: Schema,
121    pk_indices: Vec<usize>,
122    format_desc: SinkFormatDesc,
123    db_name: String,
124    sink_from_name: String,
125}
126
127impl EnforceSecret for GooglePubSubSink {
128    fn enforce_secret<'a>(
129        prop_iter: impl Iterator<Item = &'a str>,
130    ) -> crate::error::ConnectorResult<()> {
131        for prop in prop_iter {
132            GooglePubSubConfig::enforce_one(prop)?;
133        }
134        Ok(())
135    }
136}
137impl Sink for GooglePubSubSink {
138    type Coordinator = DummySinkCommitCoordinator;
139    type LogSinker = AsyncTruncateLogSinkerOf<GooglePubSubSinkWriter>;
140
141    const SINK_NAME: &'static str = PUBSUB_SINK;
142
143    async fn validate(&self) -> Result<()> {
144        if !self.is_append_only {
145            return Err(SinkError::GooglePubSub(anyhow!(
146                "Google Pub/Sub sink only support append-only mode"
147            )));
148        }
149
150        let conf = &self.config;
151        if matches!((&conf.emulator_host, &conf.credentials), (None, None)) {
152            return Err(SinkError::GooglePubSub(anyhow!(
153                "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink"
154            )));
155        }
156
157        Ok(())
158    }
159
160    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
161        Ok(GooglePubSubSinkWriter::new(
162            self.config.clone(),
163            self.schema.clone(),
164            self.pk_indices.clone(),
165            &self.format_desc,
166            self.db_name.clone(),
167            self.sink_from_name.clone(),
168        )
169        .await?
170        .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
171    }
172}
173
174impl TryFrom<SinkParam> for GooglePubSubSink {
175    type Error = SinkError;
176
177    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
178        let schema = param.schema();
179        let config = GooglePubSubConfig::from_btreemap(param.properties)?;
180
181        let format_desc = param
182            .format_desc
183            .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?;
184        Ok(Self {
185            config,
186            is_append_only: param.sink_type.is_append_only(),
187
188            schema,
189            pk_indices: param.downstream_pk,
190            format_desc,
191            db_name: param.db_name,
192            sink_from_name: param.sink_from_name,
193        })
194    }
195}
196
197struct GooglePubSubPayloadWriter<'w> {
198    publisher: &'w mut Publisher,
199    message_vec: Vec<PubsubMessage>,
200    add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
201}
202
203impl GooglePubSubSinkWriter {
204    pub async fn new(
205        config: GooglePubSubConfig,
206        schema: Schema,
207        pk_indices: Vec<usize>,
208        format_desc: &SinkFormatDesc,
209        db_name: String,
210        sink_from_name: String,
211    ) -> Result<Self> {
212        let environment = if let Some(ref cred) = config.credentials {
213            let mut auth_config = project::Config::default();
214            auth_config = auth_config.with_audience(apiv1::conn_pool::AUDIENCE);
215            auth_config = auth_config.with_scopes(&apiv1::conn_pool::SCOPES);
216            let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| {
217                SinkError::GooglePubSub(
218                    anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"),
219                )
220            })?;
221            let provider =
222                DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file))
223                    .await
224                    .map_err(|e| {
225                        SinkError::GooglePubSub(
226                            anyhow!(e).context(
227                                "Failed to create Google Cloud Pub/Sub token source provider",
228                            ),
229                        )
230                    })?;
231            Environment::GoogleCloud(Box::new(provider))
232        } else if let Some(emu_host) = config.emulator_host {
233            Environment::Emulator(emu_host)
234        } else {
235            return Err(SinkError::GooglePubSub(anyhow!(
236                "Missing emulator_host or credentials in Google Pub/Sub sink"
237            )));
238        };
239
240        let client_config = ClientConfig {
241            endpoint: config.endpoint,
242            project_id: Some(config.project_id),
243            environment,
244            ..Default::default()
245        };
246        let client = Client::new(client_config)
247            .await
248            .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?;
249
250        let topic = async {
251            let topic = client.topic(&config.topic);
252            if !topic.exists(None).await? {
253                topic.create(None, None).await?;
254            }
255            Ok(topic)
256        }
257        .await
258        .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?;
259
260        let formatter = SinkFormatterImpl::new(
261            format_desc,
262            schema,
263            pk_indices,
264            db_name,
265            sink_from_name,
266            topic.fully_qualified_name(),
267        )
268        .await?;
269
270        let publisher = topic.new_publisher(None);
271
272        Ok(Self {
273            formatter,
274            publisher,
275        })
276    }
277}
278
279pub struct GooglePubSubSinkWriter {
280    formatter: SinkFormatterImpl,
281    publisher: Publisher,
282}
283
284impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
285    type DeliveryFuture = GooglePubSubSinkDeliveryFuture;
286
287    async fn write_chunk<'a>(
288        &'a mut self,
289        chunk: StreamChunk,
290        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
291    ) -> Result<()> {
292        let mut payload_writer = GooglePubSubPayloadWriter {
293            publisher: &mut self.publisher,
294            message_vec: Vec::with_capacity(chunk.cardinality()),
295            add_future,
296        };
297        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
298            payload_writer.write_chunk(chunk, formatter).await
299        })?;
300        payload_writer.finish().await
301    }
302}
303
304impl GooglePubSubPayloadWriter<'_> {
305    pub async fn finish(&mut self) -> Result<()> {
306        let message_vec = std::mem::take(&mut self.message_vec);
307        let awaiters = self.publisher.publish_bulk(message_vec).await;
308        self.add_future
309            .add_future_may_await(may_delivery_future(awaiters))
310            .await?;
311        Ok(())
312    }
313}
314
315impl FormattedSink for GooglePubSubPayloadWriter<'_> {
316    type K = String;
317    type V = Vec<u8>;
318
319    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
320        let ordering_key = k.unwrap_or_default();
321        match v {
322            Some(data) => {
323                let msg = PubsubMessage {
324                    data,
325                    ordering_key,
326                    ..Default::default()
327                };
328                self.message_vec.push(msg);
329                Ok(())
330            }
331            None => Err(SinkError::GooglePubSub(anyhow!(
332                "Google Pub/Sub sink error: missing value to publish"
333            ))),
334        }
335    }
336}