risingwave_connector/sink/
google_pubsub.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use google_cloud_gax::conn::Environment;
19use google_cloud_googleapis::pubsub::v1::PubsubMessage;
20use google_cloud_pubsub::apiv1;
21use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
22use google_cloud_pubsub::client::google_cloud_auth::project;
23use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
24use google_cloud_pubsub::client::{Client, ClientConfig};
25use google_cloud_pubsub::publisher::Publisher;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::catalog::Schema;
28use serde_derive::Deserialize;
29use serde_with::serde_as;
30use tonic::Status;
31use with_options::WithOptions;
32
33use super::catalog::SinkFormatDesc;
34use super::formatter::SinkFormatterImpl;
35use super::log_store::DeliveryFutureManagerAddFuture;
36use super::writer::{
37    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
40use crate::dispatch_sink_formatter_str_key_impl;
41use crate::enforce_secret::EnforceSecret;
42
43pub const PUBSUB_SINK: &str = "google_pubsub";
44const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
45
46mod delivery_future {
47    use anyhow::Context;
48    use futures::future::try_join_all;
49    use futures::{FutureExt, TryFuture, TryFutureExt};
50    use google_cloud_pubsub::publisher::Awaiter;
51
52    use crate::sink::SinkError;
53
54    pub type GooglePubSubSinkDeliveryFuture =
55        impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
56
57    pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
58        try_join_all(awaiter.into_iter().map(|awaiter| {
59            awaiter.get().map(|result| {
60                result
61                    .context("Google Pub/Sub sink error")
62                    .map_err(SinkError::GooglePubSub)
63                    .map(|_| ())
64            })
65        }))
66        .map_ok(|_: Vec<()>| ())
67        .boxed()
68    }
69}
70
71use delivery_future::*;
72
73#[serde_as]
74#[derive(Clone, Debug, Deserialize, WithOptions)]
75pub struct GooglePubSubConfig {
76    /// The Google Pub/Sub Project ID
77    #[serde(rename = "pubsub.project_id")]
78    pub project_id: String,
79
80    /// Specifies the Pub/Sub topic to publish messages
81    #[serde(rename = "pubsub.topic")]
82    pub topic: String,
83
84    /// The Google Pub/Sub endpoint URL
85    #[serde(rename = "pubsub.endpoint")]
86    pub endpoint: String,
87
88    /// use the connector with a pubsub emulator
89    /// <https://cloud.google.com/pubsub/docs/emulator>
90    #[serde(rename = "pubsub.emulator_host")]
91    pub emulator_host: Option<String>,
92
93    /// A JSON string containing the service account credentials for authorization,
94    /// see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide.
95    /// The provided account credential must have the
96    /// `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles)
97    #[serde(rename = "pubsub.credentials")]
98    pub credentials: Option<String>,
99}
100
101impl EnforceSecret for GooglePubSubConfig {
102    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
103        "pubsub.credentials",
104    };
105}
106
107impl GooglePubSubConfig {
108    fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
109        serde_json::from_value::<GooglePubSubConfig>(serde_json::to_value(values).unwrap())
110            .map_err(|e| SinkError::Config(anyhow!(e)))
111    }
112}
113
114#[derive(Clone, Debug)]
115pub struct GooglePubSubSink {
116    pub config: GooglePubSubConfig,
117    is_append_only: bool,
118
119    schema: Schema,
120    pk_indices: Vec<usize>,
121    format_desc: SinkFormatDesc,
122    db_name: String,
123    sink_from_name: String,
124}
125
126impl EnforceSecret for GooglePubSubSink {
127    fn enforce_secret<'a>(
128        prop_iter: impl Iterator<Item = &'a str>,
129    ) -> crate::error::ConnectorResult<()> {
130        for prop in prop_iter {
131            GooglePubSubConfig::enforce_one(prop)?;
132        }
133        Ok(())
134    }
135}
136impl Sink for GooglePubSubSink {
137    type Coordinator = DummySinkCommitCoordinator;
138    type LogSinker = AsyncTruncateLogSinkerOf<GooglePubSubSinkWriter>;
139
140    const SINK_NAME: &'static str = PUBSUB_SINK;
141
142    async fn validate(&self) -> Result<()> {
143        if !self.is_append_only {
144            return Err(SinkError::GooglePubSub(anyhow!(
145                "Google Pub/Sub sink only support append-only mode"
146            )));
147        }
148
149        let conf = &self.config;
150        if matches!((&conf.emulator_host, &conf.credentials), (None, None)) {
151            return Err(SinkError::GooglePubSub(anyhow!(
152                "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink"
153            )));
154        }
155
156        Ok(())
157    }
158
159    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
160        Ok(GooglePubSubSinkWriter::new(
161            self.config.clone(),
162            self.schema.clone(),
163            self.pk_indices.clone(),
164            &self.format_desc,
165            self.db_name.clone(),
166            self.sink_from_name.clone(),
167        )
168        .await?
169        .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
170    }
171}
172
173impl TryFrom<SinkParam> for GooglePubSubSink {
174    type Error = SinkError;
175
176    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177        let schema = param.schema();
178        let config = GooglePubSubConfig::from_btreemap(param.properties)?;
179
180        let format_desc = param
181            .format_desc
182            .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?;
183        Ok(Self {
184            config,
185            is_append_only: param.sink_type.is_append_only(),
186
187            schema,
188            pk_indices: param.downstream_pk,
189            format_desc,
190            db_name: param.db_name,
191            sink_from_name: param.sink_from_name,
192        })
193    }
194}
195
196struct GooglePubSubPayloadWriter<'w> {
197    publisher: &'w mut Publisher,
198    message_vec: Vec<PubsubMessage>,
199    add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
200}
201
202impl GooglePubSubSinkWriter {
203    pub async fn new(
204        config: GooglePubSubConfig,
205        schema: Schema,
206        pk_indices: Vec<usize>,
207        format_desc: &SinkFormatDesc,
208        db_name: String,
209        sink_from_name: String,
210    ) -> Result<Self> {
211        let environment = if let Some(ref cred) = config.credentials {
212            let mut auth_config = project::Config::default();
213            auth_config = auth_config.with_audience(apiv1::conn_pool::AUDIENCE);
214            auth_config = auth_config.with_scopes(&apiv1::conn_pool::SCOPES);
215            let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| {
216                SinkError::GooglePubSub(
217                    anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"),
218                )
219            })?;
220            let provider =
221                DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file))
222                    .await
223                    .map_err(|e| {
224                        SinkError::GooglePubSub(
225                            anyhow!(e).context(
226                                "Failed to create Google Cloud Pub/Sub token source provider",
227                            ),
228                        )
229                    })?;
230            Environment::GoogleCloud(Box::new(provider))
231        } else if let Some(emu_host) = config.emulator_host {
232            Environment::Emulator(emu_host)
233        } else {
234            return Err(SinkError::GooglePubSub(anyhow!(
235                "Missing emulator_host or credentials in Google Pub/Sub sink"
236            )));
237        };
238
239        let client_config = ClientConfig {
240            endpoint: config.endpoint,
241            project_id: Some(config.project_id),
242            environment,
243            ..Default::default()
244        };
245        let client = Client::new(client_config)
246            .await
247            .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?;
248
249        let topic = async {
250            let topic = client.topic(&config.topic);
251            if !topic.exists(None).await? {
252                topic.create(None, None).await?;
253            }
254            Ok(topic)
255        }
256        .await
257        .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?;
258
259        let formatter = SinkFormatterImpl::new(
260            format_desc,
261            schema,
262            pk_indices,
263            db_name,
264            sink_from_name,
265            topic.fully_qualified_name(),
266        )
267        .await?;
268
269        let publisher = topic.new_publisher(None);
270
271        Ok(Self {
272            formatter,
273            publisher,
274        })
275    }
276}
277
278pub struct GooglePubSubSinkWriter {
279    formatter: SinkFormatterImpl,
280    publisher: Publisher,
281}
282
283impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
284    type DeliveryFuture = GooglePubSubSinkDeliveryFuture;
285
286    async fn write_chunk<'a>(
287        &'a mut self,
288        chunk: StreamChunk,
289        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
290    ) -> Result<()> {
291        let mut payload_writer = GooglePubSubPayloadWriter {
292            publisher: &mut self.publisher,
293            message_vec: Vec::with_capacity(chunk.cardinality()),
294            add_future,
295        };
296        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
297            payload_writer.write_chunk(chunk, formatter).await
298        })?;
299        payload_writer.finish().await
300    }
301}
302
303impl GooglePubSubPayloadWriter<'_> {
304    pub async fn finish(&mut self) -> Result<()> {
305        let message_vec = std::mem::take(&mut self.message_vec);
306        let awaiters = self.publisher.publish_bulk(message_vec).await;
307        self.add_future
308            .add_future_may_await(may_delivery_future(awaiters))
309            .await?;
310        Ok(())
311    }
312}
313
314impl FormattedSink for GooglePubSubPayloadWriter<'_> {
315    type K = String;
316    type V = Vec<u8>;
317
318    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
319        let ordering_key = k.unwrap_or_default();
320        match v {
321            Some(data) => {
322                let msg = PubsubMessage {
323                    data,
324                    ordering_key,
325                    ..Default::default()
326                };
327                self.message_vec.push(msg);
328                Ok(())
329            }
330            None => Err(SinkError::GooglePubSub(anyhow!(
331                "Google Pub/Sub sink error: missing value to publish"
332            ))),
333        }
334    }
335}