risingwave_connector/sink/
pulsar.rs1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::{FutureExt, TryFuture, TryFutureExt};
21use pulsar::producer::{Message, SendFuture};
22use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::catalog::{SinkFormat, SinkFormatDesc};
30use super::{Sink, SinkError, SinkParam, SinkWriterParam};
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::enforce_secret::EnforceSecret;
33use crate::sink::Result;
34use crate::sink::encoder::SerTo;
35use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl};
36use crate::sink::log_store::DeliveryFutureManagerAddFuture;
37use crate::sink::writer::{
38 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
39};
40use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
41
42pub const PULSAR_SINK: &str = "pulsar";
43
44const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
48
49const fn _default_max_retries() -> u32 {
50 3
51}
52
53const fn _default_retry_backoff() -> Duration {
54 Duration::from_millis(100)
55}
56
57const fn _default_batch_size() -> u32 {
58 10000
59}
60
61const fn _default_batch_byte_size() -> usize {
62 1 << 20
63}
64
65fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
66 SinkError::Pulsar(anyhow!(e))
67}
68
69async fn build_pulsar_producer(
70 pulsar: &Pulsar<TokioExecutor>,
71 config: &PulsarConfig,
72) -> Result<Producer<TokioExecutor>> {
73 Box::pin(
75 pulsar
76 .producer()
77 .with_options(ProducerOptions {
78 batch_size: Some(config.producer_properties.batch_size),
79 batch_byte_size: Some(config.producer_properties.batch_byte_size),
80 ..Default::default()
81 })
82 .with_topic(&config.common.topic)
83 .build()
84 .map_err(pulsar_to_sink_err),
85 )
86 .await
87}
88
89#[serde_as]
90#[derive(Debug, Clone, Deserialize, WithOptions)]
91pub struct PulsarPropertiesProducer {
92 #[serde(rename = "properties.batch.size", default = "_default_batch_size")]
93 #[serde_as(as = "DisplayFromStr")]
94 batch_size: u32,
95
96 #[serde(
97 rename = "properties.batch.byte.size",
98 default = "_default_batch_byte_size"
99 )]
100 #[serde_as(as = "DisplayFromStr")]
101 batch_byte_size: usize,
102}
103
104#[serde_as]
105#[derive(Debug, Clone, Deserialize, WithOptions)]
106pub struct PulsarConfig {
107 #[serde(rename = "properties.retry.max", default = "_default_max_retries")]
108 #[serde_as(as = "DisplayFromStr")]
109 pub max_retry_num: u32,
110
111 #[serde(
112 rename = "properties.retry.interval",
113 default = "_default_retry_backoff",
114 deserialize_with = "deserialize_duration_from_string"
115 )]
116 pub retry_interval: Duration,
117
118 #[serde(flatten)]
119 pub common: PulsarCommon,
120
121 #[serde(flatten)]
122 pub oauth: Option<PulsarOauthCommon>,
123
124 #[serde(flatten)]
125 pub aws_auth_props: AwsAuthProps,
126
127 #[serde(flatten)]
128 pub producer_properties: PulsarPropertiesProducer,
129}
130
131impl EnforceSecret for PulsarConfig {
132 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
133 PulsarCommon::enforce_one(prop)?;
134 AwsAuthProps::enforce_one(prop)?;
135 Ok(())
136 }
137}
138impl PulsarConfig {
139 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
140 let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
141 .map_err(|e| SinkError::Config(anyhow!(e)))?;
142
143 Ok(config)
144 }
145}
146
147#[derive(Debug)]
148pub struct PulsarSink {
149 pub config: PulsarConfig,
150 schema: Schema,
151 downstream_pk: Vec<usize>,
152 format_desc: SinkFormatDesc,
153 db_name: String,
154 sink_from_name: String,
155}
156
157impl EnforceSecret for PulsarSink {
158 fn enforce_secret<'a>(
159 prop_iter: impl Iterator<Item = &'a str>,
160 ) -> crate::error::ConnectorResult<()> {
161 for prop in prop_iter {
162 PulsarConfig::enforce_one(prop)?;
163 }
164 Ok(())
165 }
166}
167
168impl TryFrom<SinkParam> for PulsarSink {
169 type Error = SinkError;
170
171 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
172 let schema = param.schema();
173 let downstream_pk = param.downstream_pk_or_empty();
174 let config = PulsarConfig::from_btreemap(param.properties)?;
175 Ok(Self {
176 config,
177 schema,
178 downstream_pk,
179 format_desc: param
180 .format_desc
181 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
182 db_name: param.db_name,
183 sink_from_name: param.sink_from_name,
184 })
185 }
186}
187
188impl Sink for PulsarSink {
189 type LogSinker = AsyncTruncateLogSinkerOf<PulsarSinkWriter>;
190
191 const SINK_NAME: &'static str = PULSAR_SINK;
192
193 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
194 let writer = Box::pin(PulsarSinkWriter::new(
196 self.config.clone(),
197 self.schema.clone(),
198 self.downstream_pk.clone(),
199 &self.format_desc,
200 self.db_name.clone(),
201 self.sink_from_name.clone(),
202 ))
203 .await?;
204 Ok(writer.into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE))
205 }
206
207 async fn validate(&self) -> Result<()> {
208 if self.format_desc.format != SinkFormat::AppendOnly && self.downstream_pk.is_empty() {
210 return Err(SinkError::Config(anyhow!(
211 "primary key not defined for {:?} pulsar sink (please define in `primary_key` field)",
212 self.format_desc.format
213 )));
214 }
215 SinkFormatterImpl::new(
217 &self.format_desc,
218 self.schema.clone(),
219 self.downstream_pk.clone(),
220 self.db_name.clone(),
221 self.sink_from_name.clone(),
222 &self.config.common.topic,
223 )
224 .await?;
225
226 let pulsar = self
228 .config
229 .common
230 .build_client(&self.config.oauth, &self.config.aws_auth_props, None)
232 .await?;
233 build_pulsar_producer(&pulsar, &self.config).await?;
234
235 Ok(())
236 }
237}
238
239pub struct PulsarSinkWriter {
240 formatter: SinkFormatterImpl,
241 #[expect(dead_code)]
242 pulsar: Pulsar<TokioExecutor>,
243 producer: Producer<TokioExecutor>,
244 config: PulsarConfig,
245}
246
247struct PulsarPayloadWriter<'w> {
248 producer: &'w mut Producer<TokioExecutor>,
249 config: &'w PulsarConfig,
250 add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
251}
252
253mod opaque_type {
254 use super::*;
255 pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
256
257 #[define_opaque(PulsarDeliveryFuture)]
258 pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
259 future.map(|result| {
260 result
261 .map(|_| ())
262 .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
263 })
264 }
265}
266pub use opaque_type::PulsarDeliveryFuture;
267use opaque_type::may_delivery_future;
268
269impl PulsarSinkWriter {
270 pub async fn new(
271 config: PulsarConfig,
272 schema: Schema,
273 downstream_pk: Vec<usize>,
274 format_desc: &SinkFormatDesc,
275 db_name: String,
276 sink_from_name: String,
277 ) -> Result<Self> {
278 let formatter = SinkFormatterImpl::new(
279 format_desc,
280 schema,
281 downstream_pk,
282 db_name,
283 sink_from_name,
284 &config.common.topic,
285 )
286 .await?;
287 let pulsar = config
288 .common
289 .build_client(&config.oauth, &config.aws_auth_props, None)
291 .await?;
292 let producer = build_pulsar_producer(&pulsar, &config).await?;
293 Ok(Self {
294 formatter,
295 pulsar,
296 producer,
297 config,
298 })
299 }
300}
301
302impl PulsarPayloadWriter<'_> {
303 async fn send_message(&mut self, message: Message) -> Result<()> {
304 let mut success_flag = false;
305 let mut connection_err = None;
306
307 for retry_num in 0..self.config.max_retry_num {
308 if retry_num > 0 {
309 tracing::warn!("Failed to send message, at retry no. {retry_num}");
310 }
311 match Box::pin(self.producer.send_non_blocking(message.clone())).await {
312 Ok(send_future) => {
316 self.add_future
317 .add_future_may_await(may_delivery_future(send_future))
318 .await?;
319 success_flag = true;
320 break;
321 }
322 Err(e) => match e {
324 pulsar::Error::Connection(_)
325 | pulsar::Error::Producer(_)
326 | pulsar::Error::Consumer(_) => {
327 connection_err = Some(e);
328 tokio::time::sleep(self.config.retry_interval).await;
329 continue;
330 }
331 _ => return Err(SinkError::Pulsar(anyhow!(e))),
332 },
333 }
334 }
335
336 if !success_flag {
337 Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
338 } else {
339 Ok(())
340 }
341 }
342
343 async fn write_inner(
344 &mut self,
345 event_key_object: Option<String>,
346 event_object: Option<Vec<u8>>,
347 ) -> Result<()> {
348 let message = Message {
349 partition_key: event_key_object,
350 payload: event_object.unwrap_or_default(),
351 ..Default::default()
352 };
353
354 self.send_message(message).await?;
355 Ok(())
356 }
357}
358
359impl FormattedSink for PulsarPayloadWriter<'_> {
360 type K = String;
361 type V = Vec<u8>;
362
363 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
364 self.write_inner(k, v).await
365 }
366}
367
368impl AsyncTruncateSinkWriter for PulsarSinkWriter {
369 type DeliveryFuture = PulsarDeliveryFuture;
370
371 async fn write_chunk<'a>(
372 &'a mut self,
373 chunk: StreamChunk,
374 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
375 ) -> Result<()> {
376 let iter = {
378 dispatch_sink_formatter_str_key_impl!(
379 &self.formatter,
380 formatter,
381 {
382 formatter.format_chunk(&chunk).map(|r| {
385 let (key, value) = r?;
386 let key: Option<String> = key.map(SerTo::ser_to).transpose()?;
387 let value: Option<Vec<u8>> = value.map(SerTo::ser_to).transpose()?;
388 Ok((key, value)) as Result<_>
389 })
390 },
391 auto_enums::auto_enum(Iterator)
393 )
394 };
395
396 let mut payload_writer = PulsarPayloadWriter {
398 producer: &mut self.producer,
399 add_future,
400 config: &self.config,
401 };
402
403 for r in iter {
404 let (key, value): (Option<String>, Option<Vec<u8>>) = r?;
405 payload_writer.write_inner(key, value).await?;
406 }
407
408 Ok(())
409 }
410
411 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
412 if is_checkpoint {
413 self.producer
414 .send_batch()
415 .map_err(pulsar_to_sink_err)
416 .await?;
417 }
418
419 Ok(())
420 }
421}