risingwave_connector/sink/
google_pubsub.rs1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use google_cloud_gax::conn::Environment;
19use google_cloud_googleapis::pubsub::v1::PubsubMessage;
20use google_cloud_pubsub::apiv1;
21use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
22use google_cloud_pubsub::client::google_cloud_auth::project;
23use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
24use google_cloud_pubsub::client::{Client, ClientConfig};
25use google_cloud_pubsub::publisher::Publisher;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::catalog::Schema;
28use serde::Deserialize;
29use serde_with::serde_as;
30use tonic::Status;
31use with_options::WithOptions;
32
33use super::catalog::SinkFormatDesc;
34use super::formatter::SinkFormatterImpl;
35use super::log_store::DeliveryFutureManagerAddFuture;
36use super::writer::{
37 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use super::{Result, Sink, SinkError, SinkParam, SinkWriterParam};
40use crate::dispatch_sink_formatter_str_key_impl;
41use crate::enforce_secret::EnforceSecret;
42
43pub const PUBSUB_SINK: &str = "google_pubsub";
44const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
45
46mod delivery_future {
47 use anyhow::Context;
48 use futures::future::try_join_all;
49 use futures::{FutureExt, TryFuture, TryFutureExt};
50 use google_cloud_pubsub::publisher::Awaiter;
51
52 use crate::sink::SinkError;
53
54 pub type GooglePubSubSinkDeliveryFuture =
55 impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
56
57 #[define_opaque(GooglePubSubSinkDeliveryFuture)]
58 pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
59 try_join_all(awaiter.into_iter().map(|awaiter| {
60 awaiter.get().map(|result| {
61 result
62 .context("Google Pub/Sub sink error")
63 .map_err(SinkError::GooglePubSub)
64 .map(|_| ())
65 })
66 }))
67 .map_ok(|_: Vec<()>| ())
68 .boxed()
69 }
70}
71
72use delivery_future::*;
73
74#[serde_as]
75#[derive(Clone, Debug, Deserialize, WithOptions)]
76pub struct GooglePubSubConfig {
77 #[serde(rename = "pubsub.project_id")]
79 pub project_id: String,
80
81 #[serde(rename = "pubsub.topic")]
83 pub topic: String,
84
85 #[serde(rename = "pubsub.endpoint")]
87 pub endpoint: String,
88
89 #[serde(rename = "pubsub.emulator_host")]
92 pub emulator_host: Option<String>,
93
94 #[serde(rename = "pubsub.credentials")]
99 pub credentials: Option<String>,
100}
101
102impl EnforceSecret for GooglePubSubConfig {
103 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
104 "pubsub.credentials",
105 };
106}
107
108impl GooglePubSubConfig {
109 fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
110 serde_json::from_value::<GooglePubSubConfig>(serde_json::to_value(values).unwrap())
111 .map_err(|e| SinkError::Config(anyhow!(e)))
112 }
113}
114
115#[derive(Clone, Debug)]
116pub struct GooglePubSubSink {
117 pub config: GooglePubSubConfig,
118 is_append_only: bool,
119
120 schema: Schema,
121 pk_indices: Vec<usize>,
122 format_desc: SinkFormatDesc,
123 db_name: String,
124 sink_from_name: String,
125}
126
127impl EnforceSecret for GooglePubSubSink {
128 fn enforce_secret<'a>(
129 prop_iter: impl Iterator<Item = &'a str>,
130 ) -> crate::error::ConnectorResult<()> {
131 for prop in prop_iter {
132 GooglePubSubConfig::enforce_one(prop)?;
133 }
134 Ok(())
135 }
136}
137impl Sink for GooglePubSubSink {
138 type LogSinker = AsyncTruncateLogSinkerOf<GooglePubSubSinkWriter>;
139
140 const SINK_NAME: &'static str = PUBSUB_SINK;
141
142 async fn validate(&self) -> Result<()> {
143 if !self.is_append_only {
144 return Err(SinkError::GooglePubSub(anyhow!(
145 "Google Pub/Sub sink only support append-only mode"
146 )));
147 }
148
149 let conf = &self.config;
150 if matches!((&conf.emulator_host, &conf.credentials), (None, None)) {
151 return Err(SinkError::GooglePubSub(anyhow!(
152 "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink"
153 )));
154 }
155
156 Ok(())
157 }
158
159 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
160 Ok(GooglePubSubSinkWriter::new(
161 self.config.clone(),
162 self.schema.clone(),
163 self.pk_indices.clone(),
164 &self.format_desc,
165 self.db_name.clone(),
166 self.sink_from_name.clone(),
167 )
168 .await?
169 .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
170 }
171}
172
173impl TryFrom<SinkParam> for GooglePubSubSink {
174 type Error = SinkError;
175
176 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177 let schema = param.schema();
178 let pk_indices = param.downstream_pk_or_empty();
179 let config = GooglePubSubConfig::from_btreemap(param.properties)?;
180 let format_desc = param
181 .format_desc
182 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?;
183 Ok(Self {
184 config,
185 is_append_only: param.sink_type.is_append_only(),
186 schema,
187 pk_indices,
188 format_desc,
189 db_name: param.db_name,
190 sink_from_name: param.sink_from_name,
191 })
192 }
193}
194
195struct GooglePubSubPayloadWriter<'w> {
196 publisher: &'w mut Publisher,
197 message_vec: Vec<PubsubMessage>,
198 add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
199}
200
201impl GooglePubSubSinkWriter {
202 pub async fn new(
203 config: GooglePubSubConfig,
204 schema: Schema,
205 pk_indices: Vec<usize>,
206 format_desc: &SinkFormatDesc,
207 db_name: String,
208 sink_from_name: String,
209 ) -> Result<Self> {
210 let environment = if let Some(ref cred) = config.credentials {
211 let mut auth_config = project::Config::default();
212 auth_config = auth_config.with_audience(apiv1::conn_pool::AUDIENCE);
213 auth_config = auth_config.with_scopes(&apiv1::conn_pool::SCOPES);
214 let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| {
215 SinkError::GooglePubSub(
216 anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"),
217 )
218 })?;
219 let provider =
220 DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file))
221 .await
222 .map_err(|e| {
223 SinkError::GooglePubSub(
224 anyhow!(e).context(
225 "Failed to create Google Cloud Pub/Sub token source provider",
226 ),
227 )
228 })?;
229 Environment::GoogleCloud(Box::new(provider))
230 } else if let Some(emu_host) = config.emulator_host {
231 Environment::Emulator(emu_host)
232 } else {
233 return Err(SinkError::GooglePubSub(anyhow!(
234 "Missing emulator_host or credentials in Google Pub/Sub sink"
235 )));
236 };
237
238 let client_config = ClientConfig {
239 endpoint: config.endpoint,
240 project_id: Some(config.project_id),
241 environment,
242 ..Default::default()
243 };
244 let client = Client::new(client_config)
245 .await
246 .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?;
247
248 let topic = async {
249 let topic = client.topic(&config.topic);
250 if !topic.exists(None).await? {
251 topic.create(None, None).await?;
252 }
253 Ok(topic)
254 }
255 .await
256 .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?;
257
258 let formatter = SinkFormatterImpl::new(
259 format_desc,
260 schema,
261 pk_indices,
262 db_name,
263 sink_from_name,
264 topic.fully_qualified_name(),
265 )
266 .await?;
267
268 let publisher = topic.new_publisher(None);
269
270 Ok(Self {
271 formatter,
272 publisher,
273 })
274 }
275}
276
277pub struct GooglePubSubSinkWriter {
278 formatter: SinkFormatterImpl,
279 publisher: Publisher,
280}
281
282impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
283 type DeliveryFuture = GooglePubSubSinkDeliveryFuture;
284
285 async fn write_chunk<'a>(
286 &'a mut self,
287 chunk: StreamChunk,
288 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
289 ) -> Result<()> {
290 let mut payload_writer = GooglePubSubPayloadWriter {
291 publisher: &mut self.publisher,
292 message_vec: Vec::with_capacity(chunk.cardinality()),
293 add_future,
294 };
295 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
296 payload_writer.write_chunk(chunk, formatter).await
297 })?;
298 payload_writer.finish().await
299 }
300}
301
302impl GooglePubSubPayloadWriter<'_> {
303 pub async fn finish(&mut self) -> Result<()> {
304 let message_vec = std::mem::take(&mut self.message_vec);
305 let awaiters = self.publisher.publish_bulk(message_vec).await;
306 self.add_future
307 .add_future_may_await(may_delivery_future(awaiters))
308 .await?;
309 Ok(())
310 }
311}
312
313impl FormattedSink for GooglePubSubPayloadWriter<'_> {
314 type K = String;
315 type V = Vec<u8>;
316
317 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
318 let ordering_key = k.unwrap_or_default();
319 match v {
320 Some(data) => {
321 let msg = PubsubMessage {
322 data,
323 ordering_key,
324 ..Default::default()
325 };
326 self.message_vec.push(msg);
327 Ok(())
328 }
329 None => Err(SinkError::GooglePubSub(anyhow!(
330 "Google Pub/Sub sink error: missing value to publish"
331 ))),
332 }
333 }
334}