risingwave_connector/sink/
google_pubsub.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use google_cloud_gax::conn::Environment;
19use google_cloud_googleapis::pubsub::v1::PubsubMessage;
20use google_cloud_pubsub::apiv1;
21use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
22use google_cloud_pubsub::client::google_cloud_auth::project;
23use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
24use google_cloud_pubsub::client::{Client, ClientConfig};
25use google_cloud_pubsub::publisher::Publisher;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::catalog::Schema;
28use serde_derive::Deserialize;
29use serde_with::serde_as;
30use tonic::Status;
31use with_options::WithOptions;
32
33use super::catalog::SinkFormatDesc;
34use super::formatter::SinkFormatterImpl;
35use super::log_store::DeliveryFutureManagerAddFuture;
36use super::writer::{
37    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
40use crate::dispatch_sink_formatter_str_key_impl;
41
42pub const PUBSUB_SINK: &str = "google_pubsub";
43const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
44
45mod delivery_future {
46    use anyhow::Context;
47    use futures::future::try_join_all;
48    use futures::{FutureExt, TryFuture, TryFutureExt};
49    use google_cloud_pubsub::publisher::Awaiter;
50
51    use crate::sink::SinkError;
52
53    pub type GooglePubSubSinkDeliveryFuture =
54        impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
55
56    pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
57        try_join_all(awaiter.into_iter().map(|awaiter| {
58            awaiter.get().map(|result| {
59                result
60                    .context("Google Pub/Sub sink error")
61                    .map_err(SinkError::GooglePubSub)
62                    .map(|_| ())
63            })
64        }))
65        .map_ok(|_: Vec<()>| ())
66        .boxed()
67    }
68}
69
70use delivery_future::*;
71
72#[serde_as]
73#[derive(Clone, Debug, Deserialize, WithOptions)]
74pub struct GooglePubSubConfig {
75    /// The Google Pub/Sub Project ID
76    #[serde(rename = "pubsub.project_id")]
77    pub project_id: String,
78
79    /// Specifies the Pub/Sub topic to publish messages
80    #[serde(rename = "pubsub.topic")]
81    pub topic: String,
82
83    /// The Google Pub/Sub endpoint URL
84    #[serde(rename = "pubsub.endpoint")]
85    pub endpoint: String,
86
87    /// use the connector with a pubsub emulator
88    /// <https://cloud.google.com/pubsub/docs/emulator>
89    #[serde(rename = "pubsub.emulator_host")]
90    pub emulator_host: Option<String>,
91
92    /// A JSON string containing the service account credentials for authorization,
93    /// see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide.
94    /// The provided account credential must have the
95    /// `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles)
96    #[serde(rename = "pubsub.credentials")]
97    pub credentials: Option<String>,
98}
99
100impl GooglePubSubConfig {
101    fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
102        serde_json::from_value::<GooglePubSubConfig>(serde_json::to_value(values).unwrap())
103            .map_err(|e| SinkError::Config(anyhow!(e)))
104    }
105}
106
107#[derive(Clone, Debug)]
108pub struct GooglePubSubSink {
109    pub config: GooglePubSubConfig,
110    is_append_only: bool,
111
112    schema: Schema,
113    pk_indices: Vec<usize>,
114    format_desc: SinkFormatDesc,
115    db_name: String,
116    sink_from_name: String,
117}
118
119impl Sink for GooglePubSubSink {
120    type Coordinator = DummySinkCommitCoordinator;
121    type LogSinker = AsyncTruncateLogSinkerOf<GooglePubSubSinkWriter>;
122
123    const SINK_NAME: &'static str = PUBSUB_SINK;
124
125    async fn validate(&self) -> Result<()> {
126        if !self.is_append_only {
127            return Err(SinkError::GooglePubSub(anyhow!(
128                "Google Pub/Sub sink only support append-only mode"
129            )));
130        }
131
132        let conf = &self.config;
133        if matches!((&conf.emulator_host, &conf.credentials), (None, None)) {
134            return Err(SinkError::GooglePubSub(anyhow!(
135                "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink"
136            )));
137        }
138
139        Ok(())
140    }
141
142    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
143        Ok(GooglePubSubSinkWriter::new(
144            self.config.clone(),
145            self.schema.clone(),
146            self.pk_indices.clone(),
147            &self.format_desc,
148            self.db_name.clone(),
149            self.sink_from_name.clone(),
150        )
151        .await?
152        .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
153    }
154}
155
156impl TryFrom<SinkParam> for GooglePubSubSink {
157    type Error = SinkError;
158
159    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
160        let schema = param.schema();
161        let config = GooglePubSubConfig::from_btreemap(param.properties)?;
162
163        let format_desc = param
164            .format_desc
165            .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?;
166        Ok(Self {
167            config,
168            is_append_only: param.sink_type.is_append_only(),
169
170            schema,
171            pk_indices: param.downstream_pk,
172            format_desc,
173            db_name: param.db_name,
174            sink_from_name: param.sink_from_name,
175        })
176    }
177}
178
179struct GooglePubSubPayloadWriter<'w> {
180    publisher: &'w mut Publisher,
181    message_vec: Vec<PubsubMessage>,
182    add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
183}
184
185impl GooglePubSubSinkWriter {
186    pub async fn new(
187        config: GooglePubSubConfig,
188        schema: Schema,
189        pk_indices: Vec<usize>,
190        format_desc: &SinkFormatDesc,
191        db_name: String,
192        sink_from_name: String,
193    ) -> Result<Self> {
194        let environment = if let Some(ref cred) = config.credentials {
195            let mut auth_config = project::Config::default();
196            auth_config = auth_config.with_audience(apiv1::conn_pool::AUDIENCE);
197            auth_config = auth_config.with_scopes(&apiv1::conn_pool::SCOPES);
198            let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| {
199                SinkError::GooglePubSub(
200                    anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"),
201                )
202            })?;
203            let provider =
204                DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file))
205                    .await
206                    .map_err(|e| {
207                        SinkError::GooglePubSub(
208                            anyhow!(e).context(
209                                "Failed to create Google Cloud Pub/Sub token source provider",
210                            ),
211                        )
212                    })?;
213            Environment::GoogleCloud(Box::new(provider))
214        } else if let Some(emu_host) = config.emulator_host {
215            Environment::Emulator(emu_host)
216        } else {
217            return Err(SinkError::GooglePubSub(anyhow!(
218                "Missing emulator_host or credentials in Google Pub/Sub sink"
219            )));
220        };
221
222        let client_config = ClientConfig {
223            endpoint: config.endpoint,
224            project_id: Some(config.project_id),
225            environment,
226            ..Default::default()
227        };
228        let client = Client::new(client_config)
229            .await
230            .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?;
231
232        let topic = async {
233            let topic = client.topic(&config.topic);
234            if !topic.exists(None).await? {
235                topic.create(None, None).await?;
236            }
237            Ok(topic)
238        }
239        .await
240        .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?;
241
242        let formatter = SinkFormatterImpl::new(
243            format_desc,
244            schema,
245            pk_indices,
246            db_name,
247            sink_from_name,
248            topic.fully_qualified_name(),
249        )
250        .await?;
251
252        let publisher = topic.new_publisher(None);
253
254        Ok(Self {
255            formatter,
256            publisher,
257        })
258    }
259}
260
261pub struct GooglePubSubSinkWriter {
262    formatter: SinkFormatterImpl,
263    publisher: Publisher,
264}
265
266impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
267    type DeliveryFuture = GooglePubSubSinkDeliveryFuture;
268
269    async fn write_chunk<'a>(
270        &'a mut self,
271        chunk: StreamChunk,
272        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
273    ) -> Result<()> {
274        let mut payload_writer = GooglePubSubPayloadWriter {
275            publisher: &mut self.publisher,
276            message_vec: Vec::with_capacity(chunk.cardinality()),
277            add_future,
278        };
279        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
280            payload_writer.write_chunk(chunk, formatter).await
281        })?;
282        payload_writer.finish().await
283    }
284}
285
286impl GooglePubSubPayloadWriter<'_> {
287    pub async fn finish(&mut self) -> Result<()> {
288        let message_vec = std::mem::take(&mut self.message_vec);
289        let awaiters = self.publisher.publish_bulk(message_vec).await;
290        self.add_future
291            .add_future_may_await(may_delivery_future(awaiters))
292            .await?;
293        Ok(())
294    }
295}
296
297impl FormattedSink for GooglePubSubPayloadWriter<'_> {
298    type K = String;
299    type V = Vec<u8>;
300
301    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
302        let ordering_key = k.unwrap_or_default();
303        match v {
304            Some(data) => {
305                let msg = PubsubMessage {
306                    data,
307                    ordering_key,
308                    ..Default::default()
309                };
310                self.message_vec.push(msg);
311                Ok(())
312            }
313            None => Err(SinkError::GooglePubSub(anyhow!(
314                "Google Pub/Sub sink error: missing value to publish"
315            ))),
316        }
317    }
318}