risingwave_connector/sink/
google_pubsub.rs1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use google_cloud_gax::conn::Environment;
19use google_cloud_googleapis::pubsub::v1::PubsubMessage;
20use google_cloud_pubsub::apiv1;
21use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
22use google_cloud_pubsub::client::google_cloud_auth::project;
23use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
24use google_cloud_pubsub::client::{Client, ClientConfig};
25use google_cloud_pubsub::publisher::Publisher;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::catalog::Schema;
28use serde_derive::Deserialize;
29use serde_with::serde_as;
30use tonic::Status;
31use with_options::WithOptions;
32
33use super::catalog::SinkFormatDesc;
34use super::formatter::SinkFormatterImpl;
35use super::log_store::DeliveryFutureManagerAddFuture;
36use super::writer::{
37 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
40use crate::dispatch_sink_formatter_str_key_impl;
41
42pub const PUBSUB_SINK: &str = "google_pubsub";
43const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
44
45mod delivery_future {
46 use anyhow::Context;
47 use futures::future::try_join_all;
48 use futures::{FutureExt, TryFuture, TryFutureExt};
49 use google_cloud_pubsub::publisher::Awaiter;
50
51 use crate::sink::SinkError;
52
53 pub type GooglePubSubSinkDeliveryFuture =
54 impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
55
56 pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
57 try_join_all(awaiter.into_iter().map(|awaiter| {
58 awaiter.get().map(|result| {
59 result
60 .context("Google Pub/Sub sink error")
61 .map_err(SinkError::GooglePubSub)
62 .map(|_| ())
63 })
64 }))
65 .map_ok(|_: Vec<()>| ())
66 .boxed()
67 }
68}
69
70use delivery_future::*;
71
72#[serde_as]
73#[derive(Clone, Debug, Deserialize, WithOptions)]
74pub struct GooglePubSubConfig {
75 #[serde(rename = "pubsub.project_id")]
77 pub project_id: String,
78
79 #[serde(rename = "pubsub.topic")]
81 pub topic: String,
82
83 #[serde(rename = "pubsub.endpoint")]
85 pub endpoint: String,
86
87 #[serde(rename = "pubsub.emulator_host")]
90 pub emulator_host: Option<String>,
91
92 #[serde(rename = "pubsub.credentials")]
97 pub credentials: Option<String>,
98}
99
100impl GooglePubSubConfig {
101 fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
102 serde_json::from_value::<GooglePubSubConfig>(serde_json::to_value(values).unwrap())
103 .map_err(|e| SinkError::Config(anyhow!(e)))
104 }
105}
106
107#[derive(Clone, Debug)]
108pub struct GooglePubSubSink {
109 pub config: GooglePubSubConfig,
110 is_append_only: bool,
111
112 schema: Schema,
113 pk_indices: Vec<usize>,
114 format_desc: SinkFormatDesc,
115 db_name: String,
116 sink_from_name: String,
117}
118
119impl Sink for GooglePubSubSink {
120 type Coordinator = DummySinkCommitCoordinator;
121 type LogSinker = AsyncTruncateLogSinkerOf<GooglePubSubSinkWriter>;
122
123 const SINK_NAME: &'static str = PUBSUB_SINK;
124
125 async fn validate(&self) -> Result<()> {
126 if !self.is_append_only {
127 return Err(SinkError::GooglePubSub(anyhow!(
128 "Google Pub/Sub sink only support append-only mode"
129 )));
130 }
131
132 let conf = &self.config;
133 if matches!((&conf.emulator_host, &conf.credentials), (None, None)) {
134 return Err(SinkError::GooglePubSub(anyhow!(
135 "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink"
136 )));
137 }
138
139 Ok(())
140 }
141
142 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
143 Ok(GooglePubSubSinkWriter::new(
144 self.config.clone(),
145 self.schema.clone(),
146 self.pk_indices.clone(),
147 &self.format_desc,
148 self.db_name.clone(),
149 self.sink_from_name.clone(),
150 )
151 .await?
152 .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
153 }
154}
155
156impl TryFrom<SinkParam> for GooglePubSubSink {
157 type Error = SinkError;
158
159 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
160 let schema = param.schema();
161 let config = GooglePubSubConfig::from_btreemap(param.properties)?;
162
163 let format_desc = param
164 .format_desc
165 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?;
166 Ok(Self {
167 config,
168 is_append_only: param.sink_type.is_append_only(),
169
170 schema,
171 pk_indices: param.downstream_pk,
172 format_desc,
173 db_name: param.db_name,
174 sink_from_name: param.sink_from_name,
175 })
176 }
177}
178
179struct GooglePubSubPayloadWriter<'w> {
180 publisher: &'w mut Publisher,
181 message_vec: Vec<PubsubMessage>,
182 add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
183}
184
185impl GooglePubSubSinkWriter {
186 pub async fn new(
187 config: GooglePubSubConfig,
188 schema: Schema,
189 pk_indices: Vec<usize>,
190 format_desc: &SinkFormatDesc,
191 db_name: String,
192 sink_from_name: String,
193 ) -> Result<Self> {
194 let environment = if let Some(ref cred) = config.credentials {
195 let mut auth_config = project::Config::default();
196 auth_config = auth_config.with_audience(apiv1::conn_pool::AUDIENCE);
197 auth_config = auth_config.with_scopes(&apiv1::conn_pool::SCOPES);
198 let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| {
199 SinkError::GooglePubSub(
200 anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"),
201 )
202 })?;
203 let provider =
204 DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file))
205 .await
206 .map_err(|e| {
207 SinkError::GooglePubSub(
208 anyhow!(e).context(
209 "Failed to create Google Cloud Pub/Sub token source provider",
210 ),
211 )
212 })?;
213 Environment::GoogleCloud(Box::new(provider))
214 } else if let Some(emu_host) = config.emulator_host {
215 Environment::Emulator(emu_host)
216 } else {
217 return Err(SinkError::GooglePubSub(anyhow!(
218 "Missing emulator_host or credentials in Google Pub/Sub sink"
219 )));
220 };
221
222 let client_config = ClientConfig {
223 endpoint: config.endpoint,
224 project_id: Some(config.project_id),
225 environment,
226 ..Default::default()
227 };
228 let client = Client::new(client_config)
229 .await
230 .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?;
231
232 let topic = async {
233 let topic = client.topic(&config.topic);
234 if !topic.exists(None).await? {
235 topic.create(None, None).await?;
236 }
237 Ok(topic)
238 }
239 .await
240 .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?;
241
242 let formatter = SinkFormatterImpl::new(
243 format_desc,
244 schema,
245 pk_indices,
246 db_name,
247 sink_from_name,
248 topic.fully_qualified_name(),
249 )
250 .await?;
251
252 let publisher = topic.new_publisher(None);
253
254 Ok(Self {
255 formatter,
256 publisher,
257 })
258 }
259}
260
261pub struct GooglePubSubSinkWriter {
262 formatter: SinkFormatterImpl,
263 publisher: Publisher,
264}
265
266impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
267 type DeliveryFuture = GooglePubSubSinkDeliveryFuture;
268
269 async fn write_chunk<'a>(
270 &'a mut self,
271 chunk: StreamChunk,
272 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
273 ) -> Result<()> {
274 let mut payload_writer = GooglePubSubPayloadWriter {
275 publisher: &mut self.publisher,
276 message_vec: Vec::with_capacity(chunk.cardinality()),
277 add_future,
278 };
279 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
280 payload_writer.write_chunk(chunk, formatter).await
281 })?;
282 payload_writer.finish().await
283 }
284}
285
286impl GooglePubSubPayloadWriter<'_> {
287 pub async fn finish(&mut self) -> Result<()> {
288 let message_vec = std::mem::take(&mut self.message_vec);
289 let awaiters = self.publisher.publish_bulk(message_vec).await;
290 self.add_future
291 .add_future_may_await(may_delivery_future(awaiters))
292 .await?;
293 Ok(())
294 }
295}
296
297impl FormattedSink for GooglePubSubPayloadWriter<'_> {
298 type K = String;
299 type V = Vec<u8>;
300
301 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
302 let ordering_key = k.unwrap_or_default();
303 match v {
304 Some(data) => {
305 let msg = PubsubMessage {
306 data,
307 ordering_key,
308 ..Default::default()
309 };
310 self.message_vec.push(msg);
311 Ok(())
312 }
313 None => Err(SinkError::GooglePubSub(anyhow!(
314 "Google Pub/Sub sink error: missing value to publish"
315 ))),
316 }
317 }
318}