risingwave_connector/sink/
google_pubsub.rs1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use google_cloud_gax::conn::Environment;
19use google_cloud_googleapis::pubsub::v1::PubsubMessage;
20use google_cloud_pubsub::apiv1;
21use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
22use google_cloud_pubsub::client::google_cloud_auth::project;
23use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
24use google_cloud_pubsub::client::{Client, ClientConfig};
25use google_cloud_pubsub::publisher::Publisher;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::catalog::Schema;
28use serde_derive::Deserialize;
29use serde_with::serde_as;
30use tonic::Status;
31use with_options::WithOptions;
32
33use super::catalog::SinkFormatDesc;
34use super::formatter::SinkFormatterImpl;
35use super::log_store::DeliveryFutureManagerAddFuture;
36use super::writer::{
37 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
40use crate::dispatch_sink_formatter_str_key_impl;
41use crate::enforce_secret::EnforceSecret;
42
43pub const PUBSUB_SINK: &str = "google_pubsub";
44const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
45
46mod delivery_future {
47 use anyhow::Context;
48 use futures::future::try_join_all;
49 use futures::{FutureExt, TryFuture, TryFutureExt};
50 use google_cloud_pubsub::publisher::Awaiter;
51
52 use crate::sink::SinkError;
53
54 pub type GooglePubSubSinkDeliveryFuture =
55 impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
56
57 #[define_opaque(GooglePubSubSinkDeliveryFuture)]
58 pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
59 try_join_all(awaiter.into_iter().map(|awaiter| {
60 awaiter.get().map(|result| {
61 result
62 .context("Google Pub/Sub sink error")
63 .map_err(SinkError::GooglePubSub)
64 .map(|_| ())
65 })
66 }))
67 .map_ok(|_: Vec<()>| ())
68 .boxed()
69 }
70}
71
72use delivery_future::*;
73
74#[serde_as]
75#[derive(Clone, Debug, Deserialize, WithOptions)]
76pub struct GooglePubSubConfig {
77 #[serde(rename = "pubsub.project_id")]
79 pub project_id: String,
80
81 #[serde(rename = "pubsub.topic")]
83 pub topic: String,
84
85 #[serde(rename = "pubsub.endpoint")]
87 pub endpoint: String,
88
89 #[serde(rename = "pubsub.emulator_host")]
92 pub emulator_host: Option<String>,
93
94 #[serde(rename = "pubsub.credentials")]
99 pub credentials: Option<String>,
100}
101
102impl EnforceSecret for GooglePubSubConfig {
103 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
104 "pubsub.credentials",
105 };
106}
107
108impl GooglePubSubConfig {
109 fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
110 serde_json::from_value::<GooglePubSubConfig>(serde_json::to_value(values).unwrap())
111 .map_err(|e| SinkError::Config(anyhow!(e)))
112 }
113}
114
115#[derive(Clone, Debug)]
116pub struct GooglePubSubSink {
117 pub config: GooglePubSubConfig,
118 is_append_only: bool,
119
120 schema: Schema,
121 pk_indices: Vec<usize>,
122 format_desc: SinkFormatDesc,
123 db_name: String,
124 sink_from_name: String,
125}
126
127impl EnforceSecret for GooglePubSubSink {
128 fn enforce_secret<'a>(
129 prop_iter: impl Iterator<Item = &'a str>,
130 ) -> crate::error::ConnectorResult<()> {
131 for prop in prop_iter {
132 GooglePubSubConfig::enforce_one(prop)?;
133 }
134 Ok(())
135 }
136}
137impl Sink for GooglePubSubSink {
138 type Coordinator = DummySinkCommitCoordinator;
139 type LogSinker = AsyncTruncateLogSinkerOf<GooglePubSubSinkWriter>;
140
141 const SINK_NAME: &'static str = PUBSUB_SINK;
142
143 async fn validate(&self) -> Result<()> {
144 if !self.is_append_only {
145 return Err(SinkError::GooglePubSub(anyhow!(
146 "Google Pub/Sub sink only support append-only mode"
147 )));
148 }
149
150 let conf = &self.config;
151 if matches!((&conf.emulator_host, &conf.credentials), (None, None)) {
152 return Err(SinkError::GooglePubSub(anyhow!(
153 "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink"
154 )));
155 }
156
157 Ok(())
158 }
159
160 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
161 Ok(GooglePubSubSinkWriter::new(
162 self.config.clone(),
163 self.schema.clone(),
164 self.pk_indices.clone(),
165 &self.format_desc,
166 self.db_name.clone(),
167 self.sink_from_name.clone(),
168 )
169 .await?
170 .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
171 }
172}
173
174impl TryFrom<SinkParam> for GooglePubSubSink {
175 type Error = SinkError;
176
177 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
178 let schema = param.schema();
179 let config = GooglePubSubConfig::from_btreemap(param.properties)?;
180
181 let format_desc = param
182 .format_desc
183 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?;
184 Ok(Self {
185 config,
186 is_append_only: param.sink_type.is_append_only(),
187
188 schema,
189 pk_indices: param.downstream_pk,
190 format_desc,
191 db_name: param.db_name,
192 sink_from_name: param.sink_from_name,
193 })
194 }
195}
196
197struct GooglePubSubPayloadWriter<'w> {
198 publisher: &'w mut Publisher,
199 message_vec: Vec<PubsubMessage>,
200 add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
201}
202
203impl GooglePubSubSinkWriter {
204 pub async fn new(
205 config: GooglePubSubConfig,
206 schema: Schema,
207 pk_indices: Vec<usize>,
208 format_desc: &SinkFormatDesc,
209 db_name: String,
210 sink_from_name: String,
211 ) -> Result<Self> {
212 let environment = if let Some(ref cred) = config.credentials {
213 let mut auth_config = project::Config::default();
214 auth_config = auth_config.with_audience(apiv1::conn_pool::AUDIENCE);
215 auth_config = auth_config.with_scopes(&apiv1::conn_pool::SCOPES);
216 let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| {
217 SinkError::GooglePubSub(
218 anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"),
219 )
220 })?;
221 let provider =
222 DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file))
223 .await
224 .map_err(|e| {
225 SinkError::GooglePubSub(
226 anyhow!(e).context(
227 "Failed to create Google Cloud Pub/Sub token source provider",
228 ),
229 )
230 })?;
231 Environment::GoogleCloud(Box::new(provider))
232 } else if let Some(emu_host) = config.emulator_host {
233 Environment::Emulator(emu_host)
234 } else {
235 return Err(SinkError::GooglePubSub(anyhow!(
236 "Missing emulator_host or credentials in Google Pub/Sub sink"
237 )));
238 };
239
240 let client_config = ClientConfig {
241 endpoint: config.endpoint,
242 project_id: Some(config.project_id),
243 environment,
244 ..Default::default()
245 };
246 let client = Client::new(client_config)
247 .await
248 .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?;
249
250 let topic = async {
251 let topic = client.topic(&config.topic);
252 if !topic.exists(None).await? {
253 topic.create(None, None).await?;
254 }
255 Ok(topic)
256 }
257 .await
258 .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?;
259
260 let formatter = SinkFormatterImpl::new(
261 format_desc,
262 schema,
263 pk_indices,
264 db_name,
265 sink_from_name,
266 topic.fully_qualified_name(),
267 )
268 .await?;
269
270 let publisher = topic.new_publisher(None);
271
272 Ok(Self {
273 formatter,
274 publisher,
275 })
276 }
277}
278
279pub struct GooglePubSubSinkWriter {
280 formatter: SinkFormatterImpl,
281 publisher: Publisher,
282}
283
284impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
285 type DeliveryFuture = GooglePubSubSinkDeliveryFuture;
286
287 async fn write_chunk<'a>(
288 &'a mut self,
289 chunk: StreamChunk,
290 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
291 ) -> Result<()> {
292 let mut payload_writer = GooglePubSubPayloadWriter {
293 publisher: &mut self.publisher,
294 message_vec: Vec::with_capacity(chunk.cardinality()),
295 add_future,
296 };
297 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
298 payload_writer.write_chunk(chunk, formatter).await
299 })?;
300 payload_writer.finish().await
301 }
302}
303
304impl GooglePubSubPayloadWriter<'_> {
305 pub async fn finish(&mut self) -> Result<()> {
306 let message_vec = std::mem::take(&mut self.message_vec);
307 let awaiters = self.publisher.publish_bulk(message_vec).await;
308 self.add_future
309 .add_future_may_await(may_delivery_future(awaiters))
310 .await?;
311 Ok(())
312 }
313}
314
315impl FormattedSink for GooglePubSubPayloadWriter<'_> {
316 type K = String;
317 type V = Vec<u8>;
318
319 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
320 let ordering_key = k.unwrap_or_default();
321 match v {
322 Some(data) => {
323 let msg = PubsubMessage {
324 data,
325 ordering_key,
326 ..Default::default()
327 };
328 self.message_vec.push(msg);
329 Ok(())
330 }
331 None => Err(SinkError::GooglePubSub(anyhow!(
332 "Google Pub/Sub sink error: missing value to publish"
333 ))),
334 }
335 }
336}