risingwave_connector/sink/
google_pubsub.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use google_cloud_gax::conn::Environment;
19use google_cloud_googleapis::pubsub::v1::PubsubMessage;
20use google_cloud_pubsub::apiv1;
21use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
22use google_cloud_pubsub::client::google_cloud_auth::project;
23use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
24use google_cloud_pubsub::client::{Client, ClientConfig};
25use google_cloud_pubsub::publisher::Publisher;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::catalog::Schema;
28use serde::Deserialize;
29use serde_with::serde_as;
30use tonic::Status;
31use with_options::WithOptions;
32
33use super::catalog::SinkFormatDesc;
34use super::formatter::SinkFormatterImpl;
35use super::log_store::DeliveryFutureManagerAddFuture;
36use super::writer::{
37    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use super::{Result, Sink, SinkError, SinkParam, SinkWriterParam};
40use crate::dispatch_sink_formatter_str_key_impl;
41use crate::enforce_secret::EnforceSecret;
42
43pub const PUBSUB_SINK: &str = "google_pubsub";
44const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
45
46mod delivery_future {
47    use anyhow::Context;
48    use futures::future::try_join_all;
49    use futures::{FutureExt, TryFuture, TryFutureExt};
50    use google_cloud_pubsub::publisher::Awaiter;
51
52    use crate::sink::SinkError;
53
54    pub type GooglePubSubSinkDeliveryFuture =
55        impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
56
57    #[define_opaque(GooglePubSubSinkDeliveryFuture)]
58    pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
59        try_join_all(awaiter.into_iter().map(|awaiter| {
60            awaiter.get().map(|result| {
61                result
62                    .context("Google Pub/Sub sink error")
63                    .map_err(SinkError::GooglePubSub)
64                    .map(|_| ())
65            })
66        }))
67        .map_ok(|_: Vec<()>| ())
68        .boxed()
69    }
70}
71
72use delivery_future::*;
73
74#[serde_as]
75#[derive(Clone, Debug, Deserialize, WithOptions)]
76pub struct GooglePubSubConfig {
77    /// The Google Pub/Sub Project ID
78    #[serde(rename = "pubsub.project_id")]
79    pub project_id: String,
80
81    /// Specifies the Pub/Sub topic to publish messages
82    #[serde(rename = "pubsub.topic")]
83    pub topic: String,
84
85    /// The Google Pub/Sub endpoint URL
86    #[serde(rename = "pubsub.endpoint")]
87    pub endpoint: String,
88
89    /// use the connector with a pubsub emulator
90    /// <https://cloud.google.com/pubsub/docs/emulator>
91    #[serde(rename = "pubsub.emulator_host")]
92    pub emulator_host: Option<String>,
93
94    /// A JSON string containing the service account credentials for authorization,
95    /// see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide.
96    /// The provided account credential must have the
97    /// `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles)
98    #[serde(rename = "pubsub.credentials")]
99    pub credentials: Option<String>,
100}
101
102impl EnforceSecret for GooglePubSubConfig {
103    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
104        "pubsub.credentials",
105    };
106}
107
108impl GooglePubSubConfig {
109    fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
110        serde_json::from_value::<GooglePubSubConfig>(serde_json::to_value(values).unwrap())
111            .map_err(|e| SinkError::Config(anyhow!(e)))
112    }
113}
114
115#[derive(Clone, Debug)]
116pub struct GooglePubSubSink {
117    pub config: GooglePubSubConfig,
118    is_append_only: bool,
119
120    schema: Schema,
121    pk_indices: Vec<usize>,
122    format_desc: SinkFormatDesc,
123    db_name: String,
124    sink_from_name: String,
125}
126
127impl EnforceSecret for GooglePubSubSink {
128    fn enforce_secret<'a>(
129        prop_iter: impl Iterator<Item = &'a str>,
130    ) -> crate::error::ConnectorResult<()> {
131        for prop in prop_iter {
132            GooglePubSubConfig::enforce_one(prop)?;
133        }
134        Ok(())
135    }
136}
137impl Sink for GooglePubSubSink {
138    type LogSinker = AsyncTruncateLogSinkerOf<GooglePubSubSinkWriter>;
139
140    const SINK_NAME: &'static str = PUBSUB_SINK;
141
142    async fn validate(&self) -> Result<()> {
143        if !self.is_append_only {
144            return Err(SinkError::GooglePubSub(anyhow!(
145                "Google Pub/Sub sink only support append-only mode"
146            )));
147        }
148
149        let conf = &self.config;
150        if matches!((&conf.emulator_host, &conf.credentials), (None, None)) {
151            return Err(SinkError::GooglePubSub(anyhow!(
152                "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink"
153            )));
154        }
155
156        Ok(())
157    }
158
159    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
160        Ok(GooglePubSubSinkWriter::new(
161            self.config.clone(),
162            self.schema.clone(),
163            self.pk_indices.clone(),
164            &self.format_desc,
165            self.db_name.clone(),
166            self.sink_from_name.clone(),
167        )
168        .await?
169        .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
170    }
171}
172
173impl TryFrom<SinkParam> for GooglePubSubSink {
174    type Error = SinkError;
175
176    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177        let schema = param.schema();
178        let pk_indices = param.downstream_pk_or_empty();
179        let config = GooglePubSubConfig::from_btreemap(param.properties)?;
180        let format_desc = param
181            .format_desc
182            .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?;
183        Ok(Self {
184            config,
185            is_append_only: param.sink_type.is_append_only(),
186            schema,
187            pk_indices,
188            format_desc,
189            db_name: param.db_name,
190            sink_from_name: param.sink_from_name,
191        })
192    }
193}
194
195struct GooglePubSubPayloadWriter<'w> {
196    publisher: &'w mut Publisher,
197    message_vec: Vec<PubsubMessage>,
198    add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
199}
200
201impl GooglePubSubSinkWriter {
202    pub async fn new(
203        config: GooglePubSubConfig,
204        schema: Schema,
205        pk_indices: Vec<usize>,
206        format_desc: &SinkFormatDesc,
207        db_name: String,
208        sink_from_name: String,
209    ) -> Result<Self> {
210        let environment = if let Some(ref cred) = config.credentials {
211            let mut auth_config = project::Config::default();
212            auth_config = auth_config.with_audience(apiv1::conn_pool::AUDIENCE);
213            auth_config = auth_config.with_scopes(&apiv1::conn_pool::SCOPES);
214            let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| {
215                SinkError::GooglePubSub(
216                    anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"),
217                )
218            })?;
219            let provider =
220                DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file))
221                    .await
222                    .map_err(|e| {
223                        SinkError::GooglePubSub(
224                            anyhow!(e).context(
225                                "Failed to create Google Cloud Pub/Sub token source provider",
226                            ),
227                        )
228                    })?;
229            Environment::GoogleCloud(Box::new(provider))
230        } else if let Some(emu_host) = config.emulator_host {
231            Environment::Emulator(emu_host)
232        } else {
233            return Err(SinkError::GooglePubSub(anyhow!(
234                "Missing emulator_host or credentials in Google Pub/Sub sink"
235            )));
236        };
237
238        let client_config = ClientConfig {
239            endpoint: config.endpoint,
240            project_id: Some(config.project_id),
241            environment,
242            ..Default::default()
243        };
244        let client = Client::new(client_config)
245            .await
246            .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?;
247
248        let topic = async {
249            let topic = client.topic(&config.topic);
250            if !topic.exists(None).await? {
251                topic.create(None, None).await?;
252            }
253            Ok(topic)
254        }
255        .await
256        .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?;
257
258        let formatter = SinkFormatterImpl::new(
259            format_desc,
260            schema,
261            pk_indices,
262            db_name,
263            sink_from_name,
264            topic.fully_qualified_name(),
265        )
266        .await?;
267
268        let publisher = topic.new_publisher(None);
269
270        Ok(Self {
271            formatter,
272            publisher,
273        })
274    }
275}
276
277pub struct GooglePubSubSinkWriter {
278    formatter: SinkFormatterImpl,
279    publisher: Publisher,
280}
281
282impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
283    type DeliveryFuture = GooglePubSubSinkDeliveryFuture;
284
285    async fn write_chunk<'a>(
286        &'a mut self,
287        chunk: StreamChunk,
288        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
289    ) -> Result<()> {
290        let mut payload_writer = GooglePubSubPayloadWriter {
291            publisher: &mut self.publisher,
292            message_vec: Vec::with_capacity(chunk.cardinality()),
293            add_future,
294        };
295        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
296            payload_writer.write_chunk(chunk, formatter).await
297        })?;
298        payload_writer.finish().await
299    }
300}
301
302impl GooglePubSubPayloadWriter<'_> {
303    pub async fn finish(&mut self) -> Result<()> {
304        let message_vec = std::mem::take(&mut self.message_vec);
305        let awaiters = self.publisher.publish_bulk(message_vec).await;
306        self.add_future
307            .add_future_may_await(may_delivery_future(awaiters))
308            .await?;
309        Ok(())
310    }
311}
312
313impl FormattedSink for GooglePubSubPayloadWriter<'_> {
314    type K = String;
315    type V = Vec<u8>;
316
317    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
318        let ordering_key = k.unwrap_or_default();
319        match v {
320            Some(data) => {
321                let msg = PubsubMessage {
322                    data,
323                    ordering_key,
324                    ..Default::default()
325                };
326                self.message_vec.push(msg);
327                Ok(())
328            }
329            None => Err(SinkError::GooglePubSub(anyhow!(
330                "Google Pub/Sub sink error: missing value to publish"
331            ))),
332        }
333    }
334}