risingwave_connector/sink/
pulsar.rs1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::{FutureExt, TryFuture, TryFutureExt};
21use pulsar::producer::{Message, SendFuture};
22use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::catalog::{SinkFormat, SinkFormatDesc};
30use super::{Sink, SinkError, SinkParam, SinkWriterParam};
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::sink::encoder::SerTo;
33use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl};
34use crate::sink::log_store::DeliveryFutureManagerAddFuture;
35use crate::sink::writer::{
36 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
37};
38use crate::sink::{DummySinkCommitCoordinator, Result};
39use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
40
41pub const PULSAR_SINK: &str = "pulsar";
42
43const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
47
48const fn _default_max_retries() -> u32 {
49 3
50}
51
52const fn _default_retry_backoff() -> Duration {
53 Duration::from_millis(100)
54}
55
56const fn _default_batch_size() -> u32 {
57 10000
58}
59
60const fn _default_batch_byte_size() -> usize {
61 1 << 20
62}
63
64fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
65 SinkError::Pulsar(anyhow!(e))
66}
67
68async fn build_pulsar_producer(
69 pulsar: &Pulsar<TokioExecutor>,
70 config: &PulsarConfig,
71) -> Result<Producer<TokioExecutor>> {
72 pulsar
73 .producer()
74 .with_options(ProducerOptions {
75 batch_size: Some(config.producer_properties.batch_size),
76 batch_byte_size: Some(config.producer_properties.batch_byte_size),
77 ..Default::default()
78 })
79 .with_topic(&config.common.topic)
80 .build()
81 .map_err(pulsar_to_sink_err)
82 .await
83}
84
85#[serde_as]
86#[derive(Debug, Clone, Deserialize, WithOptions)]
87pub struct PulsarPropertiesProducer {
88 #[serde(rename = "properties.batch.size", default = "_default_batch_size")]
89 #[serde_as(as = "DisplayFromStr")]
90 batch_size: u32,
91
92 #[serde(
93 rename = "properties.batch.byte.size",
94 default = "_default_batch_byte_size"
95 )]
96 #[serde_as(as = "DisplayFromStr")]
97 batch_byte_size: usize,
98}
99
100#[serde_as]
101#[derive(Debug, Clone, Deserialize, WithOptions)]
102pub struct PulsarConfig {
103 #[serde(rename = "properties.retry.max", default = "_default_max_retries")]
104 #[serde_as(as = "DisplayFromStr")]
105 pub max_retry_num: u32,
106
107 #[serde(
108 rename = "properties.retry.interval",
109 default = "_default_retry_backoff",
110 deserialize_with = "deserialize_duration_from_string"
111 )]
112 pub retry_interval: Duration,
113
114 #[serde(flatten)]
115 pub common: PulsarCommon,
116
117 #[serde(flatten)]
118 pub oauth: Option<PulsarOauthCommon>,
119
120 #[serde(flatten)]
121 pub aws_auth_props: AwsAuthProps,
122
123 #[serde(flatten)]
124 pub producer_properties: PulsarPropertiesProducer,
125}
126
127impl PulsarConfig {
128 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
129 let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
130 .map_err(|e| SinkError::Config(anyhow!(e)))?;
131
132 Ok(config)
133 }
134}
135
136#[derive(Debug)]
137pub struct PulsarSink {
138 pub config: PulsarConfig,
139 schema: Schema,
140 downstream_pk: Vec<usize>,
141 format_desc: SinkFormatDesc,
142 db_name: String,
143 sink_from_name: String,
144}
145
146impl TryFrom<SinkParam> for PulsarSink {
147 type Error = SinkError;
148
149 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
150 let schema = param.schema();
151 let config = PulsarConfig::from_btreemap(param.properties)?;
152 Ok(Self {
153 config,
154 schema,
155 downstream_pk: param.downstream_pk,
156 format_desc: param
157 .format_desc
158 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
159 db_name: param.db_name,
160 sink_from_name: param.sink_from_name,
161 })
162 }
163}
164
165impl Sink for PulsarSink {
166 type Coordinator = DummySinkCommitCoordinator;
167 type LogSinker = AsyncTruncateLogSinkerOf<PulsarSinkWriter>;
168
169 const SINK_NAME: &'static str = PULSAR_SINK;
170
171 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
172 Ok(PulsarSinkWriter::new(
173 self.config.clone(),
174 self.schema.clone(),
175 self.downstream_pk.clone(),
176 &self.format_desc,
177 self.db_name.clone(),
178 self.sink_from_name.clone(),
179 )
180 .await?
181 .into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE))
182 }
183
184 async fn validate(&self) -> Result<()> {
185 if self.format_desc.format != SinkFormat::AppendOnly && self.downstream_pk.is_empty() {
187 return Err(SinkError::Config(anyhow!(
188 "primary key not defined for {:?} pulsar sink (please define in `primary_key` field)",
189 self.format_desc.format
190 )));
191 }
192 SinkFormatterImpl::new(
194 &self.format_desc,
195 self.schema.clone(),
196 self.downstream_pk.clone(),
197 self.db_name.clone(),
198 self.sink_from_name.clone(),
199 &self.config.common.topic,
200 )
201 .await?;
202
203 let pulsar = self
205 .config
206 .common
207 .build_client(&self.config.oauth, &self.config.aws_auth_props)
208 .await?;
209 build_pulsar_producer(&pulsar, &self.config).await?;
210
211 Ok(())
212 }
213}
214
215pub struct PulsarSinkWriter {
216 formatter: SinkFormatterImpl,
217 #[expect(dead_code)]
218 pulsar: Pulsar<TokioExecutor>,
219 producer: Producer<TokioExecutor>,
220 config: PulsarConfig,
221}
222
223struct PulsarPayloadWriter<'w> {
224 producer: &'w mut Producer<TokioExecutor>,
225 config: &'w PulsarConfig,
226 add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
227}
228
229mod opaque_type {
230 use super::*;
231 pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
232
233 pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
234 future.map(|result| {
235 result
236 .map(|_| ())
237 .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
238 })
239 }
240}
241pub use opaque_type::PulsarDeliveryFuture;
242use opaque_type::may_delivery_future;
243
244impl PulsarSinkWriter {
245 pub async fn new(
246 config: PulsarConfig,
247 schema: Schema,
248 downstream_pk: Vec<usize>,
249 format_desc: &SinkFormatDesc,
250 db_name: String,
251 sink_from_name: String,
252 ) -> Result<Self> {
253 let formatter = SinkFormatterImpl::new(
254 format_desc,
255 schema,
256 downstream_pk,
257 db_name,
258 sink_from_name,
259 &config.common.topic,
260 )
261 .await?;
262 let pulsar = config
263 .common
264 .build_client(&config.oauth, &config.aws_auth_props)
265 .await?;
266 let producer = build_pulsar_producer(&pulsar, &config).await?;
267 Ok(Self {
268 formatter,
269 pulsar,
270 producer,
271 config,
272 })
273 }
274}
275
276impl PulsarPayloadWriter<'_> {
277 async fn send_message(&mut self, message: Message) -> Result<()> {
278 let mut success_flag = false;
279 let mut connection_err = None;
280
281 for retry_num in 0..self.config.max_retry_num {
282 if retry_num > 0 {
283 tracing::warn!("Failed to send message, at retry no. {retry_num}");
284 }
285 match self.producer.send_non_blocking(message.clone()).await {
286 Ok(send_future) => {
290 self.add_future
291 .add_future_may_await(may_delivery_future(send_future))
292 .await?;
293 success_flag = true;
294 break;
295 }
296 Err(e) => match e {
298 pulsar::Error::Connection(_)
299 | pulsar::Error::Producer(_)
300 | pulsar::Error::Consumer(_) => {
301 connection_err = Some(e);
302 tokio::time::sleep(self.config.retry_interval).await;
303 continue;
304 }
305 _ => return Err(SinkError::Pulsar(anyhow!(e))),
306 },
307 }
308 }
309
310 if !success_flag {
311 Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
312 } else {
313 Ok(())
314 }
315 }
316
317 async fn write_inner(
318 &mut self,
319 event_key_object: Option<String>,
320 event_object: Option<Vec<u8>>,
321 ) -> Result<()> {
322 let message = Message {
323 partition_key: event_key_object,
324 payload: event_object.unwrap_or_default(),
325 ..Default::default()
326 };
327
328 self.send_message(message).await?;
329 Ok(())
330 }
331}
332
333impl FormattedSink for PulsarPayloadWriter<'_> {
334 type K = String;
335 type V = Vec<u8>;
336
337 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
338 self.write_inner(k, v).await
339 }
340}
341
342impl AsyncTruncateSinkWriter for PulsarSinkWriter {
343 type DeliveryFuture = PulsarDeliveryFuture;
344
345 async fn write_chunk<'a>(
346 &'a mut self,
347 chunk: StreamChunk,
348 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
349 ) -> Result<()> {
350 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
351 let mut payload_writer = PulsarPayloadWriter {
352 producer: &mut self.producer,
353 add_future,
354 config: &self.config,
355 };
356 for r in formatter.format_chunk(&chunk) {
360 let (key, value) = r?;
361 payload_writer
362 .write_inner(
363 key.map(SerTo::ser_to).transpose()?,
364 value.map(SerTo::ser_to).transpose()?,
365 )
366 .await?;
367 }
368 Ok(())
369 })
370 }
371
372 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
373 if is_checkpoint {
374 self.producer
375 .send_batch()
376 .map_err(pulsar_to_sink_err)
377 .await?;
378 }
379
380 Ok(())
381 }
382}