risingwave_connector/sink/
pulsar.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::{FutureExt, TryFuture, TryFutureExt};
21use pulsar::producer::{Message, SendFuture};
22use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::catalog::{SinkFormat, SinkFormatDesc};
30use super::{Sink, SinkError, SinkParam, SinkWriterParam};
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::enforce_secret::EnforceSecret;
33use crate::sink::encoder::SerTo;
34use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl};
35use crate::sink::log_store::DeliveryFutureManagerAddFuture;
36use crate::sink::writer::{
37    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use crate::sink::{DummySinkCommitCoordinator, Result};
40use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
41
42pub const PULSAR_SINK: &str = "pulsar";
43
44/// The delivery buffer queue size
45/// When the `SendFuture` the current `send_future_buffer`
46/// is buffering is greater than this size, then enforcing commit once
47const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
48
49const fn _default_max_retries() -> u32 {
50    3
51}
52
53const fn _default_retry_backoff() -> Duration {
54    Duration::from_millis(100)
55}
56
57const fn _default_batch_size() -> u32 {
58    10000
59}
60
61const fn _default_batch_byte_size() -> usize {
62    1 << 20
63}
64
65fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
66    SinkError::Pulsar(anyhow!(e))
67}
68
69async fn build_pulsar_producer(
70    pulsar: &Pulsar<TokioExecutor>,
71    config: &PulsarConfig,
72) -> Result<Producer<TokioExecutor>> {
73    pulsar
74        .producer()
75        .with_options(ProducerOptions {
76            batch_size: Some(config.producer_properties.batch_size),
77            batch_byte_size: Some(config.producer_properties.batch_byte_size),
78            ..Default::default()
79        })
80        .with_topic(&config.common.topic)
81        .build()
82        .map_err(pulsar_to_sink_err)
83        .await
84}
85
86#[serde_as]
87#[derive(Debug, Clone, Deserialize, WithOptions)]
88pub struct PulsarPropertiesProducer {
89    #[serde(rename = "properties.batch.size", default = "_default_batch_size")]
90    #[serde_as(as = "DisplayFromStr")]
91    batch_size: u32,
92
93    #[serde(
94        rename = "properties.batch.byte.size",
95        default = "_default_batch_byte_size"
96    )]
97    #[serde_as(as = "DisplayFromStr")]
98    batch_byte_size: usize,
99}
100
101#[serde_as]
102#[derive(Debug, Clone, Deserialize, WithOptions)]
103pub struct PulsarConfig {
104    #[serde(rename = "properties.retry.max", default = "_default_max_retries")]
105    #[serde_as(as = "DisplayFromStr")]
106    pub max_retry_num: u32,
107
108    #[serde(
109        rename = "properties.retry.interval",
110        default = "_default_retry_backoff",
111        deserialize_with = "deserialize_duration_from_string"
112    )]
113    pub retry_interval: Duration,
114
115    #[serde(flatten)]
116    pub common: PulsarCommon,
117
118    #[serde(flatten)]
119    pub oauth: Option<PulsarOauthCommon>,
120
121    #[serde(flatten)]
122    pub aws_auth_props: AwsAuthProps,
123
124    #[serde(flatten)]
125    pub producer_properties: PulsarPropertiesProducer,
126}
127
128impl EnforceSecret for PulsarConfig {
129    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
130        PulsarCommon::enforce_one(prop)?;
131        AwsAuthProps::enforce_one(prop)?;
132        Ok(())
133    }
134}
135impl PulsarConfig {
136    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
137        let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
138            .map_err(|e| SinkError::Config(anyhow!(e)))?;
139
140        Ok(config)
141    }
142}
143
144#[derive(Debug)]
145pub struct PulsarSink {
146    pub config: PulsarConfig,
147    schema: Schema,
148    downstream_pk: Vec<usize>,
149    format_desc: SinkFormatDesc,
150    db_name: String,
151    sink_from_name: String,
152}
153
154impl EnforceSecret for PulsarSink {
155    fn enforce_secret<'a>(
156        prop_iter: impl Iterator<Item = &'a str>,
157    ) -> crate::error::ConnectorResult<()> {
158        for prop in prop_iter {
159            PulsarConfig::enforce_one(prop)?;
160        }
161        Ok(())
162    }
163}
164
165impl TryFrom<SinkParam> for PulsarSink {
166    type Error = SinkError;
167
168    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
169        let schema = param.schema();
170        let config = PulsarConfig::from_btreemap(param.properties)?;
171        Ok(Self {
172            config,
173            schema,
174            downstream_pk: param.downstream_pk,
175            format_desc: param
176                .format_desc
177                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
178            db_name: param.db_name,
179            sink_from_name: param.sink_from_name,
180        })
181    }
182}
183
184impl Sink for PulsarSink {
185    type Coordinator = DummySinkCommitCoordinator;
186    type LogSinker = AsyncTruncateLogSinkerOf<PulsarSinkWriter>;
187
188    const SINK_NAME: &'static str = PULSAR_SINK;
189
190    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
191        Ok(PulsarSinkWriter::new(
192            self.config.clone(),
193            self.schema.clone(),
194            self.downstream_pk.clone(),
195            &self.format_desc,
196            self.db_name.clone(),
197            self.sink_from_name.clone(),
198        )
199        .await?
200        .into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE))
201    }
202
203    async fn validate(&self) -> Result<()> {
204        // For upsert Pulsar sink, the primary key must be defined.
205        if self.format_desc.format != SinkFormat::AppendOnly && self.downstream_pk.is_empty() {
206            return Err(SinkError::Config(anyhow!(
207                "primary key not defined for {:?} pulsar sink (please define in `primary_key` field)",
208                self.format_desc.format
209            )));
210        }
211        // Check for formatter constructor error, before it is too late for error reporting.
212        SinkFormatterImpl::new(
213            &self.format_desc,
214            self.schema.clone(),
215            self.downstream_pk.clone(),
216            self.db_name.clone(),
217            self.sink_from_name.clone(),
218            &self.config.common.topic,
219        )
220        .await?;
221
222        // Validate pulsar connection.
223        let pulsar = self
224            .config
225            .common
226            .build_client(&self.config.oauth, &self.config.aws_auth_props)
227            .await?;
228        build_pulsar_producer(&pulsar, &self.config).await?;
229
230        Ok(())
231    }
232}
233
234pub struct PulsarSinkWriter {
235    formatter: SinkFormatterImpl,
236    #[expect(dead_code)]
237    pulsar: Pulsar<TokioExecutor>,
238    producer: Producer<TokioExecutor>,
239    config: PulsarConfig,
240}
241
242struct PulsarPayloadWriter<'w> {
243    producer: &'w mut Producer<TokioExecutor>,
244    config: &'w PulsarConfig,
245    add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
246}
247
248mod opaque_type {
249    use super::*;
250    pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
251
252    #[define_opaque(PulsarDeliveryFuture)]
253    pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
254        future.map(|result| {
255            result
256                .map(|_| ())
257                .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
258        })
259    }
260}
261pub use opaque_type::PulsarDeliveryFuture;
262use opaque_type::may_delivery_future;
263
264impl PulsarSinkWriter {
265    pub async fn new(
266        config: PulsarConfig,
267        schema: Schema,
268        downstream_pk: Vec<usize>,
269        format_desc: &SinkFormatDesc,
270        db_name: String,
271        sink_from_name: String,
272    ) -> Result<Self> {
273        let formatter = SinkFormatterImpl::new(
274            format_desc,
275            schema,
276            downstream_pk,
277            db_name,
278            sink_from_name,
279            &config.common.topic,
280        )
281        .await?;
282        let pulsar = config
283            .common
284            .build_client(&config.oauth, &config.aws_auth_props)
285            .await?;
286        let producer = build_pulsar_producer(&pulsar, &config).await?;
287        Ok(Self {
288            formatter,
289            pulsar,
290            producer,
291            config,
292        })
293    }
294}
295
296impl PulsarPayloadWriter<'_> {
297    async fn send_message(&mut self, message: Message) -> Result<()> {
298        let mut success_flag = false;
299        let mut connection_err = None;
300
301        for retry_num in 0..self.config.max_retry_num {
302            if retry_num > 0 {
303                tracing::warn!("Failed to send message, at retry no. {retry_num}");
304            }
305            match self.producer.send_non_blocking(message.clone()).await {
306                // If the message is sent successfully,
307                // a SendFuture holding the message receipt
308                // or error after sending is returned
309                Ok(send_future) => {
310                    self.add_future
311                        .add_future_may_await(may_delivery_future(send_future))
312                        .await?;
313                    success_flag = true;
314                    break;
315                }
316                // error upon sending
317                Err(e) => match e {
318                    pulsar::Error::Connection(_)
319                    | pulsar::Error::Producer(_)
320                    | pulsar::Error::Consumer(_) => {
321                        connection_err = Some(e);
322                        tokio::time::sleep(self.config.retry_interval).await;
323                        continue;
324                    }
325                    _ => return Err(SinkError::Pulsar(anyhow!(e))),
326                },
327            }
328        }
329
330        if !success_flag {
331            Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
332        } else {
333            Ok(())
334        }
335    }
336
337    async fn write_inner(
338        &mut self,
339        event_key_object: Option<String>,
340        event_object: Option<Vec<u8>>,
341    ) -> Result<()> {
342        let message = Message {
343            partition_key: event_key_object,
344            payload: event_object.unwrap_or_default(),
345            ..Default::default()
346        };
347
348        self.send_message(message).await?;
349        Ok(())
350    }
351}
352
353impl FormattedSink for PulsarPayloadWriter<'_> {
354    type K = String;
355    type V = Vec<u8>;
356
357    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
358        self.write_inner(k, v).await
359    }
360}
361
362impl AsyncTruncateSinkWriter for PulsarSinkWriter {
363    type DeliveryFuture = PulsarDeliveryFuture;
364
365    async fn write_chunk<'a>(
366        &'a mut self,
367        chunk: StreamChunk,
368        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
369    ) -> Result<()> {
370        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
371            let mut payload_writer = PulsarPayloadWriter {
372                producer: &mut self.producer,
373                add_future,
374                config: &self.config,
375            };
376            // TODO: we can call `payload_writer.write_chunk(chunk, formatter)`,
377            // but for an unknown reason, this will greatly increase the compile time,
378            // by nearly 4x. May investigate it later.
379            for r in formatter.format_chunk(&chunk) {
380                let (key, value) = r?;
381                payload_writer
382                    .write_inner(
383                        key.map(SerTo::ser_to).transpose()?,
384                        value.map(SerTo::ser_to).transpose()?,
385                    )
386                    .await?;
387            }
388            Ok(())
389        })
390    }
391
392    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
393        if is_checkpoint {
394            self.producer
395                .send_batch()
396                .map_err(pulsar_to_sink_err)
397                .await?;
398        }
399
400        Ok(())
401    }
402}