risingwave_connector/sink/
pulsar.rs1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::{FutureExt, TryFuture, TryFutureExt};
21use pulsar::producer::{Message, SendFuture};
22use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::catalog::{SinkFormat, SinkFormatDesc};
30use super::{Sink, SinkError, SinkParam, SinkWriterParam};
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::enforce_secret::EnforceSecret;
33use crate::sink::encoder::SerTo;
34use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl};
35use crate::sink::log_store::DeliveryFutureManagerAddFuture;
36use crate::sink::writer::{
37 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use crate::sink::{DummySinkCommitCoordinator, Result};
40use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
41
42pub const PULSAR_SINK: &str = "pulsar";
43
44const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
48
49const fn _default_max_retries() -> u32 {
50 3
51}
52
53const fn _default_retry_backoff() -> Duration {
54 Duration::from_millis(100)
55}
56
57const fn _default_batch_size() -> u32 {
58 10000
59}
60
61const fn _default_batch_byte_size() -> usize {
62 1 << 20
63}
64
65fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
66 SinkError::Pulsar(anyhow!(e))
67}
68
69async fn build_pulsar_producer(
70 pulsar: &Pulsar<TokioExecutor>,
71 config: &PulsarConfig,
72) -> Result<Producer<TokioExecutor>> {
73 pulsar
74 .producer()
75 .with_options(ProducerOptions {
76 batch_size: Some(config.producer_properties.batch_size),
77 batch_byte_size: Some(config.producer_properties.batch_byte_size),
78 ..Default::default()
79 })
80 .with_topic(&config.common.topic)
81 .build()
82 .map_err(pulsar_to_sink_err)
83 .await
84}
85
86#[serde_as]
87#[derive(Debug, Clone, Deserialize, WithOptions)]
88pub struct PulsarPropertiesProducer {
89 #[serde(rename = "properties.batch.size", default = "_default_batch_size")]
90 #[serde_as(as = "DisplayFromStr")]
91 batch_size: u32,
92
93 #[serde(
94 rename = "properties.batch.byte.size",
95 default = "_default_batch_byte_size"
96 )]
97 #[serde_as(as = "DisplayFromStr")]
98 batch_byte_size: usize,
99}
100
101#[serde_as]
102#[derive(Debug, Clone, Deserialize, WithOptions)]
103pub struct PulsarConfig {
104 #[serde(rename = "properties.retry.max", default = "_default_max_retries")]
105 #[serde_as(as = "DisplayFromStr")]
106 pub max_retry_num: u32,
107
108 #[serde(
109 rename = "properties.retry.interval",
110 default = "_default_retry_backoff",
111 deserialize_with = "deserialize_duration_from_string"
112 )]
113 pub retry_interval: Duration,
114
115 #[serde(flatten)]
116 pub common: PulsarCommon,
117
118 #[serde(flatten)]
119 pub oauth: Option<PulsarOauthCommon>,
120
121 #[serde(flatten)]
122 pub aws_auth_props: AwsAuthProps,
123
124 #[serde(flatten)]
125 pub producer_properties: PulsarPropertiesProducer,
126}
127
128impl EnforceSecret for PulsarConfig {
129 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
130 PulsarCommon::enforce_one(prop)?;
131 AwsAuthProps::enforce_one(prop)?;
132 Ok(())
133 }
134}
135impl PulsarConfig {
136 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
137 let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
138 .map_err(|e| SinkError::Config(anyhow!(e)))?;
139
140 Ok(config)
141 }
142}
143
144#[derive(Debug)]
145pub struct PulsarSink {
146 pub config: PulsarConfig,
147 schema: Schema,
148 downstream_pk: Vec<usize>,
149 format_desc: SinkFormatDesc,
150 db_name: String,
151 sink_from_name: String,
152}
153
154impl EnforceSecret for PulsarSink {
155 fn enforce_secret<'a>(
156 prop_iter: impl Iterator<Item = &'a str>,
157 ) -> crate::error::ConnectorResult<()> {
158 for prop in prop_iter {
159 PulsarConfig::enforce_one(prop)?;
160 }
161 Ok(())
162 }
163}
164
165impl TryFrom<SinkParam> for PulsarSink {
166 type Error = SinkError;
167
168 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
169 let schema = param.schema();
170 let config = PulsarConfig::from_btreemap(param.properties)?;
171 Ok(Self {
172 config,
173 schema,
174 downstream_pk: param.downstream_pk,
175 format_desc: param
176 .format_desc
177 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
178 db_name: param.db_name,
179 sink_from_name: param.sink_from_name,
180 })
181 }
182}
183
184impl Sink for PulsarSink {
185 type Coordinator = DummySinkCommitCoordinator;
186 type LogSinker = AsyncTruncateLogSinkerOf<PulsarSinkWriter>;
187
188 const SINK_NAME: &'static str = PULSAR_SINK;
189
190 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
191 Ok(PulsarSinkWriter::new(
192 self.config.clone(),
193 self.schema.clone(),
194 self.downstream_pk.clone(),
195 &self.format_desc,
196 self.db_name.clone(),
197 self.sink_from_name.clone(),
198 )
199 .await?
200 .into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE))
201 }
202
203 async fn validate(&self) -> Result<()> {
204 if self.format_desc.format != SinkFormat::AppendOnly && self.downstream_pk.is_empty() {
206 return Err(SinkError::Config(anyhow!(
207 "primary key not defined for {:?} pulsar sink (please define in `primary_key` field)",
208 self.format_desc.format
209 )));
210 }
211 SinkFormatterImpl::new(
213 &self.format_desc,
214 self.schema.clone(),
215 self.downstream_pk.clone(),
216 self.db_name.clone(),
217 self.sink_from_name.clone(),
218 &self.config.common.topic,
219 )
220 .await?;
221
222 let pulsar = self
224 .config
225 .common
226 .build_client(&self.config.oauth, &self.config.aws_auth_props)
227 .await?;
228 build_pulsar_producer(&pulsar, &self.config).await?;
229
230 Ok(())
231 }
232}
233
234pub struct PulsarSinkWriter {
235 formatter: SinkFormatterImpl,
236 #[expect(dead_code)]
237 pulsar: Pulsar<TokioExecutor>,
238 producer: Producer<TokioExecutor>,
239 config: PulsarConfig,
240}
241
242struct PulsarPayloadWriter<'w> {
243 producer: &'w mut Producer<TokioExecutor>,
244 config: &'w PulsarConfig,
245 add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
246}
247
248mod opaque_type {
249 use super::*;
250 pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
251
252 #[define_opaque(PulsarDeliveryFuture)]
253 pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
254 future.map(|result| {
255 result
256 .map(|_| ())
257 .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
258 })
259 }
260}
261pub use opaque_type::PulsarDeliveryFuture;
262use opaque_type::may_delivery_future;
263
264impl PulsarSinkWriter {
265 pub async fn new(
266 config: PulsarConfig,
267 schema: Schema,
268 downstream_pk: Vec<usize>,
269 format_desc: &SinkFormatDesc,
270 db_name: String,
271 sink_from_name: String,
272 ) -> Result<Self> {
273 let formatter = SinkFormatterImpl::new(
274 format_desc,
275 schema,
276 downstream_pk,
277 db_name,
278 sink_from_name,
279 &config.common.topic,
280 )
281 .await?;
282 let pulsar = config
283 .common
284 .build_client(&config.oauth, &config.aws_auth_props)
285 .await?;
286 let producer = build_pulsar_producer(&pulsar, &config).await?;
287 Ok(Self {
288 formatter,
289 pulsar,
290 producer,
291 config,
292 })
293 }
294}
295
296impl PulsarPayloadWriter<'_> {
297 async fn send_message(&mut self, message: Message) -> Result<()> {
298 let mut success_flag = false;
299 let mut connection_err = None;
300
301 for retry_num in 0..self.config.max_retry_num {
302 if retry_num > 0 {
303 tracing::warn!("Failed to send message, at retry no. {retry_num}");
304 }
305 match self.producer.send_non_blocking(message.clone()).await {
306 Ok(send_future) => {
310 self.add_future
311 .add_future_may_await(may_delivery_future(send_future))
312 .await?;
313 success_flag = true;
314 break;
315 }
316 Err(e) => match e {
318 pulsar::Error::Connection(_)
319 | pulsar::Error::Producer(_)
320 | pulsar::Error::Consumer(_) => {
321 connection_err = Some(e);
322 tokio::time::sleep(self.config.retry_interval).await;
323 continue;
324 }
325 _ => return Err(SinkError::Pulsar(anyhow!(e))),
326 },
327 }
328 }
329
330 if !success_flag {
331 Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
332 } else {
333 Ok(())
334 }
335 }
336
337 async fn write_inner(
338 &mut self,
339 event_key_object: Option<String>,
340 event_object: Option<Vec<u8>>,
341 ) -> Result<()> {
342 let message = Message {
343 partition_key: event_key_object,
344 payload: event_object.unwrap_or_default(),
345 ..Default::default()
346 };
347
348 self.send_message(message).await?;
349 Ok(())
350 }
351}
352
353impl FormattedSink for PulsarPayloadWriter<'_> {
354 type K = String;
355 type V = Vec<u8>;
356
357 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
358 self.write_inner(k, v).await
359 }
360}
361
362impl AsyncTruncateSinkWriter for PulsarSinkWriter {
363 type DeliveryFuture = PulsarDeliveryFuture;
364
365 async fn write_chunk<'a>(
366 &'a mut self,
367 chunk: StreamChunk,
368 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
369 ) -> Result<()> {
370 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
371 let mut payload_writer = PulsarPayloadWriter {
372 producer: &mut self.producer,
373 add_future,
374 config: &self.config,
375 };
376 for r in formatter.format_chunk(&chunk) {
380 let (key, value) = r?;
381 payload_writer
382 .write_inner(
383 key.map(SerTo::ser_to).transpose()?,
384 value.map(SerTo::ser_to).transpose()?,
385 )
386 .await?;
387 }
388 Ok(())
389 })
390 }
391
392 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
393 if is_checkpoint {
394 self.producer
395 .send_batch()
396 .map_err(pulsar_to_sink_err)
397 .await?;
398 }
399
400 Ok(())
401 }
402}