risingwave_connector/sink/
pulsar.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::{FutureExt, TryFuture, TryFutureExt};
21use pulsar::producer::{Message, SendFuture};
22use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::catalog::{SinkFormat, SinkFormatDesc};
30use super::{Sink, SinkError, SinkParam, SinkWriterParam};
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::enforce_secret::EnforceSecret;
33use crate::sink::encoder::SerTo;
34use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl};
35use crate::sink::log_store::DeliveryFutureManagerAddFuture;
36use crate::sink::writer::{
37    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
38};
39use crate::sink::{DummySinkCommitCoordinator, Result};
40use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
41
42pub const PULSAR_SINK: &str = "pulsar";
43
44/// The delivery buffer queue size
45/// When the `SendFuture` the current `send_future_buffer`
46/// is buffering is greater than this size, then enforcing commit once
47const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
48
49const fn _default_max_retries() -> u32 {
50    3
51}
52
53const fn _default_retry_backoff() -> Duration {
54    Duration::from_millis(100)
55}
56
57const fn _default_batch_size() -> u32 {
58    10000
59}
60
61const fn _default_batch_byte_size() -> usize {
62    1 << 20
63}
64
65fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
66    SinkError::Pulsar(anyhow!(e))
67}
68
69async fn build_pulsar_producer(
70    pulsar: &Pulsar<TokioExecutor>,
71    config: &PulsarConfig,
72) -> Result<Producer<TokioExecutor>> {
73    pulsar
74        .producer()
75        .with_options(ProducerOptions {
76            batch_size: Some(config.producer_properties.batch_size),
77            batch_byte_size: Some(config.producer_properties.batch_byte_size),
78            ..Default::default()
79        })
80        .with_topic(&config.common.topic)
81        .build()
82        .map_err(pulsar_to_sink_err)
83        .await
84}
85
86#[serde_as]
87#[derive(Debug, Clone, Deserialize, WithOptions)]
88pub struct PulsarPropertiesProducer {
89    #[serde(rename = "properties.batch.size", default = "_default_batch_size")]
90    #[serde_as(as = "DisplayFromStr")]
91    batch_size: u32,
92
93    #[serde(
94        rename = "properties.batch.byte.size",
95        default = "_default_batch_byte_size"
96    )]
97    #[serde_as(as = "DisplayFromStr")]
98    batch_byte_size: usize,
99}
100
101#[serde_as]
102#[derive(Debug, Clone, Deserialize, WithOptions)]
103pub struct PulsarConfig {
104    #[serde(rename = "properties.retry.max", default = "_default_max_retries")]
105    #[serde_as(as = "DisplayFromStr")]
106    pub max_retry_num: u32,
107
108    #[serde(
109        rename = "properties.retry.interval",
110        default = "_default_retry_backoff",
111        deserialize_with = "deserialize_duration_from_string"
112    )]
113    pub retry_interval: Duration,
114
115    #[serde(flatten)]
116    pub common: PulsarCommon,
117
118    #[serde(flatten)]
119    pub oauth: Option<PulsarOauthCommon>,
120
121    #[serde(flatten)]
122    pub aws_auth_props: AwsAuthProps,
123
124    #[serde(flatten)]
125    pub producer_properties: PulsarPropertiesProducer,
126}
127
128impl EnforceSecret for PulsarConfig {
129    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
130        PulsarCommon::enforce_one(prop)?;
131        AwsAuthProps::enforce_one(prop)?;
132        Ok(())
133    }
134}
135impl PulsarConfig {
136    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
137        let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
138            .map_err(|e| SinkError::Config(anyhow!(e)))?;
139
140        Ok(config)
141    }
142}
143
144#[derive(Debug)]
145pub struct PulsarSink {
146    pub config: PulsarConfig,
147    schema: Schema,
148    downstream_pk: Vec<usize>,
149    format_desc: SinkFormatDesc,
150    db_name: String,
151    sink_from_name: String,
152}
153
154impl EnforceSecret for PulsarSink {
155    fn enforce_secret<'a>(
156        prop_iter: impl Iterator<Item = &'a str>,
157    ) -> crate::error::ConnectorResult<()> {
158        for prop in prop_iter {
159            PulsarConfig::enforce_one(prop)?;
160        }
161        Ok(())
162    }
163}
164
165impl TryFrom<SinkParam> for PulsarSink {
166    type Error = SinkError;
167
168    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
169        let schema = param.schema();
170        let config = PulsarConfig::from_btreemap(param.properties)?;
171        Ok(Self {
172            config,
173            schema,
174            downstream_pk: param.downstream_pk,
175            format_desc: param
176                .format_desc
177                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
178            db_name: param.db_name,
179            sink_from_name: param.sink_from_name,
180        })
181    }
182}
183
184impl Sink for PulsarSink {
185    type Coordinator = DummySinkCommitCoordinator;
186    type LogSinker = AsyncTruncateLogSinkerOf<PulsarSinkWriter>;
187
188    const SINK_NAME: &'static str = PULSAR_SINK;
189
190    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
191        Ok(PulsarSinkWriter::new(
192            self.config.clone(),
193            self.schema.clone(),
194            self.downstream_pk.clone(),
195            &self.format_desc,
196            self.db_name.clone(),
197            self.sink_from_name.clone(),
198        )
199        .await?
200        .into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE))
201    }
202
203    async fn validate(&self) -> Result<()> {
204        // For upsert Pulsar sink, the primary key must be defined.
205        if self.format_desc.format != SinkFormat::AppendOnly && self.downstream_pk.is_empty() {
206            return Err(SinkError::Config(anyhow!(
207                "primary key not defined for {:?} pulsar sink (please define in `primary_key` field)",
208                self.format_desc.format
209            )));
210        }
211        // Check for formatter constructor error, before it is too late for error reporting.
212        SinkFormatterImpl::new(
213            &self.format_desc,
214            self.schema.clone(),
215            self.downstream_pk.clone(),
216            self.db_name.clone(),
217            self.sink_from_name.clone(),
218            &self.config.common.topic,
219        )
220        .await?;
221
222        // Validate pulsar connection.
223        let pulsar = self
224            .config
225            .common
226            .build_client(&self.config.oauth, &self.config.aws_auth_props)
227            .await?;
228        build_pulsar_producer(&pulsar, &self.config).await?;
229
230        Ok(())
231    }
232}
233
234pub struct PulsarSinkWriter {
235    formatter: SinkFormatterImpl,
236    #[expect(dead_code)]
237    pulsar: Pulsar<TokioExecutor>,
238    producer: Producer<TokioExecutor>,
239    config: PulsarConfig,
240}
241
242struct PulsarPayloadWriter<'w> {
243    producer: &'w mut Producer<TokioExecutor>,
244    config: &'w PulsarConfig,
245    add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
246}
247
248mod opaque_type {
249    use super::*;
250    pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
251
252    pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
253        future.map(|result| {
254            result
255                .map(|_| ())
256                .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
257        })
258    }
259}
260pub use opaque_type::PulsarDeliveryFuture;
261use opaque_type::may_delivery_future;
262
263impl PulsarSinkWriter {
264    pub async fn new(
265        config: PulsarConfig,
266        schema: Schema,
267        downstream_pk: Vec<usize>,
268        format_desc: &SinkFormatDesc,
269        db_name: String,
270        sink_from_name: String,
271    ) -> Result<Self> {
272        let formatter = SinkFormatterImpl::new(
273            format_desc,
274            schema,
275            downstream_pk,
276            db_name,
277            sink_from_name,
278            &config.common.topic,
279        )
280        .await?;
281        let pulsar = config
282            .common
283            .build_client(&config.oauth, &config.aws_auth_props)
284            .await?;
285        let producer = build_pulsar_producer(&pulsar, &config).await?;
286        Ok(Self {
287            formatter,
288            pulsar,
289            producer,
290            config,
291        })
292    }
293}
294
295impl PulsarPayloadWriter<'_> {
296    async fn send_message(&mut self, message: Message) -> Result<()> {
297        let mut success_flag = false;
298        let mut connection_err = None;
299
300        for retry_num in 0..self.config.max_retry_num {
301            if retry_num > 0 {
302                tracing::warn!("Failed to send message, at retry no. {retry_num}");
303            }
304            match self.producer.send_non_blocking(message.clone()).await {
305                // If the message is sent successfully,
306                // a SendFuture holding the message receipt
307                // or error after sending is returned
308                Ok(send_future) => {
309                    self.add_future
310                        .add_future_may_await(may_delivery_future(send_future))
311                        .await?;
312                    success_flag = true;
313                    break;
314                }
315                // error upon sending
316                Err(e) => match e {
317                    pulsar::Error::Connection(_)
318                    | pulsar::Error::Producer(_)
319                    | pulsar::Error::Consumer(_) => {
320                        connection_err = Some(e);
321                        tokio::time::sleep(self.config.retry_interval).await;
322                        continue;
323                    }
324                    _ => return Err(SinkError::Pulsar(anyhow!(e))),
325                },
326            }
327        }
328
329        if !success_flag {
330            Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
331        } else {
332            Ok(())
333        }
334    }
335
336    async fn write_inner(
337        &mut self,
338        event_key_object: Option<String>,
339        event_object: Option<Vec<u8>>,
340    ) -> Result<()> {
341        let message = Message {
342            partition_key: event_key_object,
343            payload: event_object.unwrap_or_default(),
344            ..Default::default()
345        };
346
347        self.send_message(message).await?;
348        Ok(())
349    }
350}
351
352impl FormattedSink for PulsarPayloadWriter<'_> {
353    type K = String;
354    type V = Vec<u8>;
355
356    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
357        self.write_inner(k, v).await
358    }
359}
360
361impl AsyncTruncateSinkWriter for PulsarSinkWriter {
362    type DeliveryFuture = PulsarDeliveryFuture;
363
364    async fn write_chunk<'a>(
365        &'a mut self,
366        chunk: StreamChunk,
367        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
368    ) -> Result<()> {
369        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
370            let mut payload_writer = PulsarPayloadWriter {
371                producer: &mut self.producer,
372                add_future,
373                config: &self.config,
374            };
375            // TODO: we can call `payload_writer.write_chunk(chunk, formatter)`,
376            // but for an unknown reason, this will greatly increase the compile time,
377            // by nearly 4x. May investigate it later.
378            for r in formatter.format_chunk(&chunk) {
379                let (key, value) = r?;
380                payload_writer
381                    .write_inner(
382                        key.map(SerTo::ser_to).transpose()?,
383                        value.map(SerTo::ser_to).transpose()?,
384                    )
385                    .await?;
386            }
387            Ok(())
388        })
389    }
390
391    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
392        if is_checkpoint {
393            self.producer
394                .send_batch()
395                .map_err(pulsar_to_sink_err)
396                .await?;
397        }
398
399        Ok(())
400    }
401}