risingwave_connector/sink/
google_pubsub.rs1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use google_cloud_gax::conn::Environment;
19use google_cloud_googleapis::pubsub::v1::PubsubMessage;
20use google_cloud_pubsub::apiv1;
21use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
22use google_cloud_pubsub::client::google_cloud_auth::project;
23use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
24use google_cloud_pubsub::client::{Client, ClientConfig};
25use google_cloud_pubsub::publisher::Publisher;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::catalog::Schema;
28use serde_derive::Deserialize;
29use serde_with::serde_as;
30use tonic::Status;
31use with_options::WithOptions;
32
33use super::catalog::SinkFormatDesc;
34use super::formatter::SinkFormatterImpl;
35use super::log_store::DeliveryFutureManagerAddFuture;
36use super::writer::{
37 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
40use crate::dispatch_sink_formatter_str_key_impl;
41use crate::enforce_secret::EnforceSecret;
42
43pub const PUBSUB_SINK: &str = "google_pubsub";
44const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
45
46mod delivery_future {
47 use anyhow::Context;
48 use futures::future::try_join_all;
49 use futures::{FutureExt, TryFuture, TryFutureExt};
50 use google_cloud_pubsub::publisher::Awaiter;
51
52 use crate::sink::SinkError;
53
54 pub type GooglePubSubSinkDeliveryFuture =
55 impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
56
57 pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
58 try_join_all(awaiter.into_iter().map(|awaiter| {
59 awaiter.get().map(|result| {
60 result
61 .context("Google Pub/Sub sink error")
62 .map_err(SinkError::GooglePubSub)
63 .map(|_| ())
64 })
65 }))
66 .map_ok(|_: Vec<()>| ())
67 .boxed()
68 }
69}
70
71use delivery_future::*;
72
73#[serde_as]
74#[derive(Clone, Debug, Deserialize, WithOptions)]
75pub struct GooglePubSubConfig {
76 #[serde(rename = "pubsub.project_id")]
78 pub project_id: String,
79
80 #[serde(rename = "pubsub.topic")]
82 pub topic: String,
83
84 #[serde(rename = "pubsub.endpoint")]
86 pub endpoint: String,
87
88 #[serde(rename = "pubsub.emulator_host")]
91 pub emulator_host: Option<String>,
92
93 #[serde(rename = "pubsub.credentials")]
98 pub credentials: Option<String>,
99}
100
101impl EnforceSecret for GooglePubSubConfig {
102 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
103 "pubsub.credentials",
104 };
105}
106
107impl GooglePubSubConfig {
108 fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
109 serde_json::from_value::<GooglePubSubConfig>(serde_json::to_value(values).unwrap())
110 .map_err(|e| SinkError::Config(anyhow!(e)))
111 }
112}
113
114#[derive(Clone, Debug)]
115pub struct GooglePubSubSink {
116 pub config: GooglePubSubConfig,
117 is_append_only: bool,
118
119 schema: Schema,
120 pk_indices: Vec<usize>,
121 format_desc: SinkFormatDesc,
122 db_name: String,
123 sink_from_name: String,
124}
125
126impl EnforceSecret for GooglePubSubSink {
127 fn enforce_secret<'a>(
128 prop_iter: impl Iterator<Item = &'a str>,
129 ) -> crate::error::ConnectorResult<()> {
130 for prop in prop_iter {
131 GooglePubSubConfig::enforce_one(prop)?;
132 }
133 Ok(())
134 }
135}
136impl Sink for GooglePubSubSink {
137 type Coordinator = DummySinkCommitCoordinator;
138 type LogSinker = AsyncTruncateLogSinkerOf<GooglePubSubSinkWriter>;
139
140 const SINK_NAME: &'static str = PUBSUB_SINK;
141
142 async fn validate(&self) -> Result<()> {
143 if !self.is_append_only {
144 return Err(SinkError::GooglePubSub(anyhow!(
145 "Google Pub/Sub sink only support append-only mode"
146 )));
147 }
148
149 let conf = &self.config;
150 if matches!((&conf.emulator_host, &conf.credentials), (None, None)) {
151 return Err(SinkError::GooglePubSub(anyhow!(
152 "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink"
153 )));
154 }
155
156 Ok(())
157 }
158
159 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
160 Ok(GooglePubSubSinkWriter::new(
161 self.config.clone(),
162 self.schema.clone(),
163 self.pk_indices.clone(),
164 &self.format_desc,
165 self.db_name.clone(),
166 self.sink_from_name.clone(),
167 )
168 .await?
169 .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
170 }
171}
172
173impl TryFrom<SinkParam> for GooglePubSubSink {
174 type Error = SinkError;
175
176 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177 let schema = param.schema();
178 let config = GooglePubSubConfig::from_btreemap(param.properties)?;
179
180 let format_desc = param
181 .format_desc
182 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?;
183 Ok(Self {
184 config,
185 is_append_only: param.sink_type.is_append_only(),
186
187 schema,
188 pk_indices: param.downstream_pk,
189 format_desc,
190 db_name: param.db_name,
191 sink_from_name: param.sink_from_name,
192 })
193 }
194}
195
196struct GooglePubSubPayloadWriter<'w> {
197 publisher: &'w mut Publisher,
198 message_vec: Vec<PubsubMessage>,
199 add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
200}
201
202impl GooglePubSubSinkWriter {
203 pub async fn new(
204 config: GooglePubSubConfig,
205 schema: Schema,
206 pk_indices: Vec<usize>,
207 format_desc: &SinkFormatDesc,
208 db_name: String,
209 sink_from_name: String,
210 ) -> Result<Self> {
211 let environment = if let Some(ref cred) = config.credentials {
212 let mut auth_config = project::Config::default();
213 auth_config = auth_config.with_audience(apiv1::conn_pool::AUDIENCE);
214 auth_config = auth_config.with_scopes(&apiv1::conn_pool::SCOPES);
215 let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| {
216 SinkError::GooglePubSub(
217 anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"),
218 )
219 })?;
220 let provider =
221 DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file))
222 .await
223 .map_err(|e| {
224 SinkError::GooglePubSub(
225 anyhow!(e).context(
226 "Failed to create Google Cloud Pub/Sub token source provider",
227 ),
228 )
229 })?;
230 Environment::GoogleCloud(Box::new(provider))
231 } else if let Some(emu_host) = config.emulator_host {
232 Environment::Emulator(emu_host)
233 } else {
234 return Err(SinkError::GooglePubSub(anyhow!(
235 "Missing emulator_host or credentials in Google Pub/Sub sink"
236 )));
237 };
238
239 let client_config = ClientConfig {
240 endpoint: config.endpoint,
241 project_id: Some(config.project_id),
242 environment,
243 ..Default::default()
244 };
245 let client = Client::new(client_config)
246 .await
247 .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?;
248
249 let topic = async {
250 let topic = client.topic(&config.topic);
251 if !topic.exists(None).await? {
252 topic.create(None, None).await?;
253 }
254 Ok(topic)
255 }
256 .await
257 .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?;
258
259 let formatter = SinkFormatterImpl::new(
260 format_desc,
261 schema,
262 pk_indices,
263 db_name,
264 sink_from_name,
265 topic.fully_qualified_name(),
266 )
267 .await?;
268
269 let publisher = topic.new_publisher(None);
270
271 Ok(Self {
272 formatter,
273 publisher,
274 })
275 }
276}
277
278pub struct GooglePubSubSinkWriter {
279 formatter: SinkFormatterImpl,
280 publisher: Publisher,
281}
282
283impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
284 type DeliveryFuture = GooglePubSubSinkDeliveryFuture;
285
286 async fn write_chunk<'a>(
287 &'a mut self,
288 chunk: StreamChunk,
289 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
290 ) -> Result<()> {
291 let mut payload_writer = GooglePubSubPayloadWriter {
292 publisher: &mut self.publisher,
293 message_vec: Vec::with_capacity(chunk.cardinality()),
294 add_future,
295 };
296 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
297 payload_writer.write_chunk(chunk, formatter).await
298 })?;
299 payload_writer.finish().await
300 }
301}
302
303impl GooglePubSubPayloadWriter<'_> {
304 pub async fn finish(&mut self) -> Result<()> {
305 let message_vec = std::mem::take(&mut self.message_vec);
306 let awaiters = self.publisher.publish_bulk(message_vec).await;
307 self.add_future
308 .add_future_may_await(may_delivery_future(awaiters))
309 .await?;
310 Ok(())
311 }
312}
313
314impl FormattedSink for GooglePubSubPayloadWriter<'_> {
315 type K = String;
316 type V = Vec<u8>;
317
318 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
319 let ordering_key = k.unwrap_or_default();
320 match v {
321 Some(data) => {
322 let msg = PubsubMessage {
323 data,
324 ordering_key,
325 ..Default::default()
326 };
327 self.message_vec.push(msg);
328 Ok(())
329 }
330 None => Err(SinkError::GooglePubSub(anyhow!(
331 "Google Pub/Sub sink error: missing value to publish"
332 ))),
333 }
334 }
335}