risingwave_connector/sink/
pulsar.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::{FutureExt, TryFuture, TryFutureExt};
21use pulsar::producer::{Message, SendFuture};
22use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::catalog::{SinkFormat, SinkFormatDesc};
30use super::{Sink, SinkError, SinkParam, SinkWriterParam};
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::sink::encoder::SerTo;
33use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl};
34use crate::sink::log_store::DeliveryFutureManagerAddFuture;
35use crate::sink::writer::{
36    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
37};
38use crate::sink::{DummySinkCommitCoordinator, Result};
39use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
40
41pub const PULSAR_SINK: &str = "pulsar";
42
43/// The delivery buffer queue size
44/// When the `SendFuture` the current `send_future_buffer`
45/// is buffering is greater than this size, then enforcing commit once
46const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
47
48const fn _default_max_retries() -> u32 {
49    3
50}
51
52const fn _default_retry_backoff() -> Duration {
53    Duration::from_millis(100)
54}
55
56const fn _default_batch_size() -> u32 {
57    10000
58}
59
60const fn _default_batch_byte_size() -> usize {
61    1 << 20
62}
63
64fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
65    SinkError::Pulsar(anyhow!(e))
66}
67
68async fn build_pulsar_producer(
69    pulsar: &Pulsar<TokioExecutor>,
70    config: &PulsarConfig,
71) -> Result<Producer<TokioExecutor>> {
72    pulsar
73        .producer()
74        .with_options(ProducerOptions {
75            batch_size: Some(config.producer_properties.batch_size),
76            batch_byte_size: Some(config.producer_properties.batch_byte_size),
77            ..Default::default()
78        })
79        .with_topic(&config.common.topic)
80        .build()
81        .map_err(pulsar_to_sink_err)
82        .await
83}
84
85#[serde_as]
86#[derive(Debug, Clone, Deserialize, WithOptions)]
87pub struct PulsarPropertiesProducer {
88    #[serde(rename = "properties.batch.size", default = "_default_batch_size")]
89    #[serde_as(as = "DisplayFromStr")]
90    batch_size: u32,
91
92    #[serde(
93        rename = "properties.batch.byte.size",
94        default = "_default_batch_byte_size"
95    )]
96    #[serde_as(as = "DisplayFromStr")]
97    batch_byte_size: usize,
98}
99
100#[serde_as]
101#[derive(Debug, Clone, Deserialize, WithOptions)]
102pub struct PulsarConfig {
103    #[serde(rename = "properties.retry.max", default = "_default_max_retries")]
104    #[serde_as(as = "DisplayFromStr")]
105    pub max_retry_num: u32,
106
107    #[serde(
108        rename = "properties.retry.interval",
109        default = "_default_retry_backoff",
110        deserialize_with = "deserialize_duration_from_string"
111    )]
112    pub retry_interval: Duration,
113
114    #[serde(flatten)]
115    pub common: PulsarCommon,
116
117    #[serde(flatten)]
118    pub oauth: Option<PulsarOauthCommon>,
119
120    #[serde(flatten)]
121    pub aws_auth_props: AwsAuthProps,
122
123    #[serde(flatten)]
124    pub producer_properties: PulsarPropertiesProducer,
125}
126
127impl PulsarConfig {
128    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
129        let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
130            .map_err(|e| SinkError::Config(anyhow!(e)))?;
131
132        Ok(config)
133    }
134}
135
136#[derive(Debug)]
137pub struct PulsarSink {
138    pub config: PulsarConfig,
139    schema: Schema,
140    downstream_pk: Vec<usize>,
141    format_desc: SinkFormatDesc,
142    db_name: String,
143    sink_from_name: String,
144}
145
146impl TryFrom<SinkParam> for PulsarSink {
147    type Error = SinkError;
148
149    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
150        let schema = param.schema();
151        let config = PulsarConfig::from_btreemap(param.properties)?;
152        Ok(Self {
153            config,
154            schema,
155            downstream_pk: param.downstream_pk,
156            format_desc: param
157                .format_desc
158                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
159            db_name: param.db_name,
160            sink_from_name: param.sink_from_name,
161        })
162    }
163}
164
165impl Sink for PulsarSink {
166    type Coordinator = DummySinkCommitCoordinator;
167    type LogSinker = AsyncTruncateLogSinkerOf<PulsarSinkWriter>;
168
169    const SINK_NAME: &'static str = PULSAR_SINK;
170
171    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
172        Ok(PulsarSinkWriter::new(
173            self.config.clone(),
174            self.schema.clone(),
175            self.downstream_pk.clone(),
176            &self.format_desc,
177            self.db_name.clone(),
178            self.sink_from_name.clone(),
179        )
180        .await?
181        .into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE))
182    }
183
184    async fn validate(&self) -> Result<()> {
185        // For upsert Pulsar sink, the primary key must be defined.
186        if self.format_desc.format != SinkFormat::AppendOnly && self.downstream_pk.is_empty() {
187            return Err(SinkError::Config(anyhow!(
188                "primary key not defined for {:?} pulsar sink (please define in `primary_key` field)",
189                self.format_desc.format
190            )));
191        }
192        // Check for formatter constructor error, before it is too late for error reporting.
193        SinkFormatterImpl::new(
194            &self.format_desc,
195            self.schema.clone(),
196            self.downstream_pk.clone(),
197            self.db_name.clone(),
198            self.sink_from_name.clone(),
199            &self.config.common.topic,
200        )
201        .await?;
202
203        // Validate pulsar connection.
204        let pulsar = self
205            .config
206            .common
207            .build_client(&self.config.oauth, &self.config.aws_auth_props)
208            .await?;
209        build_pulsar_producer(&pulsar, &self.config).await?;
210
211        Ok(())
212    }
213}
214
215pub struct PulsarSinkWriter {
216    formatter: SinkFormatterImpl,
217    #[expect(dead_code)]
218    pulsar: Pulsar<TokioExecutor>,
219    producer: Producer<TokioExecutor>,
220    config: PulsarConfig,
221}
222
223struct PulsarPayloadWriter<'w> {
224    producer: &'w mut Producer<TokioExecutor>,
225    config: &'w PulsarConfig,
226    add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
227}
228
229mod opaque_type {
230    use super::*;
231    pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
232
233    pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
234        future.map(|result| {
235            result
236                .map(|_| ())
237                .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
238        })
239    }
240}
241pub use opaque_type::PulsarDeliveryFuture;
242use opaque_type::may_delivery_future;
243
244impl PulsarSinkWriter {
245    pub async fn new(
246        config: PulsarConfig,
247        schema: Schema,
248        downstream_pk: Vec<usize>,
249        format_desc: &SinkFormatDesc,
250        db_name: String,
251        sink_from_name: String,
252    ) -> Result<Self> {
253        let formatter = SinkFormatterImpl::new(
254            format_desc,
255            schema,
256            downstream_pk,
257            db_name,
258            sink_from_name,
259            &config.common.topic,
260        )
261        .await?;
262        let pulsar = config
263            .common
264            .build_client(&config.oauth, &config.aws_auth_props)
265            .await?;
266        let producer = build_pulsar_producer(&pulsar, &config).await?;
267        Ok(Self {
268            formatter,
269            pulsar,
270            producer,
271            config,
272        })
273    }
274}
275
276impl PulsarPayloadWriter<'_> {
277    async fn send_message(&mut self, message: Message) -> Result<()> {
278        let mut success_flag = false;
279        let mut connection_err = None;
280
281        for retry_num in 0..self.config.max_retry_num {
282            if retry_num > 0 {
283                tracing::warn!("Failed to send message, at retry no. {retry_num}");
284            }
285            match self.producer.send_non_blocking(message.clone()).await {
286                // If the message is sent successfully,
287                // a SendFuture holding the message receipt
288                // or error after sending is returned
289                Ok(send_future) => {
290                    self.add_future
291                        .add_future_may_await(may_delivery_future(send_future))
292                        .await?;
293                    success_flag = true;
294                    break;
295                }
296                // error upon sending
297                Err(e) => match e {
298                    pulsar::Error::Connection(_)
299                    | pulsar::Error::Producer(_)
300                    | pulsar::Error::Consumer(_) => {
301                        connection_err = Some(e);
302                        tokio::time::sleep(self.config.retry_interval).await;
303                        continue;
304                    }
305                    _ => return Err(SinkError::Pulsar(anyhow!(e))),
306                },
307            }
308        }
309
310        if !success_flag {
311            Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
312        } else {
313            Ok(())
314        }
315    }
316
317    async fn write_inner(
318        &mut self,
319        event_key_object: Option<String>,
320        event_object: Option<Vec<u8>>,
321    ) -> Result<()> {
322        let message = Message {
323            partition_key: event_key_object,
324            payload: event_object.unwrap_or_default(),
325            ..Default::default()
326        };
327
328        self.send_message(message).await?;
329        Ok(())
330    }
331}
332
333impl FormattedSink for PulsarPayloadWriter<'_> {
334    type K = String;
335    type V = Vec<u8>;
336
337    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
338        self.write_inner(k, v).await
339    }
340}
341
342impl AsyncTruncateSinkWriter for PulsarSinkWriter {
343    type DeliveryFuture = PulsarDeliveryFuture;
344
345    async fn write_chunk<'a>(
346        &'a mut self,
347        chunk: StreamChunk,
348        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
349    ) -> Result<()> {
350        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
351            let mut payload_writer = PulsarPayloadWriter {
352                producer: &mut self.producer,
353                add_future,
354                config: &self.config,
355            };
356            // TODO: we can call `payload_writer.write_chunk(chunk, formatter)`,
357            // but for an unknown reason, this will greatly increase the compile time,
358            // by nearly 4x. May investigate it later.
359            for r in formatter.format_chunk(&chunk) {
360                let (key, value) = r?;
361                payload_writer
362                    .write_inner(
363                        key.map(SerTo::ser_to).transpose()?,
364                        value.map(SerTo::ser_to).transpose()?,
365                    )
366                    .await?;
367            }
368            Ok(())
369        })
370    }
371
372    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
373        if is_checkpoint {
374            self.producer
375                .send_batch()
376                .map_err(pulsar_to_sink_err)
377                .await?;
378        }
379
380        Ok(())
381    }
382}