risingwave_connector/sink/
pulsar.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::{FutureExt, TryFuture, TryFutureExt};
21use pulsar::producer::{Message, SendFuture};
22use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::catalog::{SinkFormat, SinkFormatDesc};
30use super::{Sink, SinkError, SinkParam, SinkWriterParam};
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::enforce_secret::EnforceSecret;
33use crate::sink::Result;
34use crate::sink::encoder::SerTo;
35use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl};
36use crate::sink::log_store::DeliveryFutureManagerAddFuture;
37use crate::sink::writer::{
38    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
39};
40use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
41
42pub const PULSAR_SINK: &str = "pulsar";
43
44/// The delivery buffer queue size
45/// When the `SendFuture` the current `send_future_buffer`
46/// is buffering is greater than this size, then enforcing commit once
47const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
48
49const fn _default_max_retries() -> u32 {
50    3
51}
52
53const fn _default_retry_backoff() -> Duration {
54    Duration::from_millis(100)
55}
56
57const fn _default_batch_size() -> u32 {
58    10000
59}
60
61const fn _default_batch_byte_size() -> usize {
62    1 << 20
63}
64
65fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
66    SinkError::Pulsar(anyhow!(e))
67}
68
69async fn build_pulsar_producer(
70    pulsar: &Pulsar<TokioExecutor>,
71    config: &PulsarConfig,
72) -> Result<Producer<TokioExecutor>> {
73    // Reduce async state machine size (see `clippy::large_futures`).
74    Box::pin(
75        pulsar
76            .producer()
77            .with_options(ProducerOptions {
78                batch_size: Some(config.producer_properties.batch_size),
79                batch_byte_size: Some(config.producer_properties.batch_byte_size),
80                ..Default::default()
81            })
82            .with_topic(&config.common.topic)
83            .build()
84            .map_err(pulsar_to_sink_err),
85    )
86    .await
87}
88
89#[serde_as]
90#[derive(Debug, Clone, Deserialize, WithOptions)]
91pub struct PulsarPropertiesProducer {
92    #[serde(rename = "properties.batch.size", default = "_default_batch_size")]
93    #[serde_as(as = "DisplayFromStr")]
94    batch_size: u32,
95
96    #[serde(
97        rename = "properties.batch.byte.size",
98        default = "_default_batch_byte_size"
99    )]
100    #[serde_as(as = "DisplayFromStr")]
101    batch_byte_size: usize,
102}
103
104#[serde_as]
105#[derive(Debug, Clone, Deserialize, WithOptions)]
106pub struct PulsarConfig {
107    #[serde(rename = "properties.retry.max", default = "_default_max_retries")]
108    #[serde_as(as = "DisplayFromStr")]
109    pub max_retry_num: u32,
110
111    #[serde(
112        rename = "properties.retry.interval",
113        default = "_default_retry_backoff",
114        deserialize_with = "deserialize_duration_from_string"
115    )]
116    pub retry_interval: Duration,
117
118    #[serde(flatten)]
119    pub common: PulsarCommon,
120
121    #[serde(flatten)]
122    pub oauth: Option<PulsarOauthCommon>,
123
124    #[serde(flatten)]
125    pub aws_auth_props: AwsAuthProps,
126
127    #[serde(flatten)]
128    pub producer_properties: PulsarPropertiesProducer,
129}
130
131impl EnforceSecret for PulsarConfig {
132    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
133        PulsarCommon::enforce_one(prop)?;
134        AwsAuthProps::enforce_one(prop)?;
135        Ok(())
136    }
137}
138impl PulsarConfig {
139    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
140        let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
141            .map_err(|e| SinkError::Config(anyhow!(e)))?;
142
143        Ok(config)
144    }
145}
146
147#[derive(Debug)]
148pub struct PulsarSink {
149    pub config: PulsarConfig,
150    schema: Schema,
151    downstream_pk: Vec<usize>,
152    format_desc: SinkFormatDesc,
153    db_name: String,
154    sink_from_name: String,
155}
156
157impl EnforceSecret for PulsarSink {
158    fn enforce_secret<'a>(
159        prop_iter: impl Iterator<Item = &'a str>,
160    ) -> crate::error::ConnectorResult<()> {
161        for prop in prop_iter {
162            PulsarConfig::enforce_one(prop)?;
163        }
164        Ok(())
165    }
166}
167
168impl TryFrom<SinkParam> for PulsarSink {
169    type Error = SinkError;
170
171    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
172        let schema = param.schema();
173        let downstream_pk = param.downstream_pk_or_empty();
174        let config = PulsarConfig::from_btreemap(param.properties)?;
175        Ok(Self {
176            config,
177            schema,
178            downstream_pk,
179            format_desc: param
180                .format_desc
181                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
182            db_name: param.db_name,
183            sink_from_name: param.sink_from_name,
184        })
185    }
186}
187
188impl Sink for PulsarSink {
189    type LogSinker = AsyncTruncateLogSinkerOf<PulsarSinkWriter>;
190
191    const SINK_NAME: &'static str = PULSAR_SINK;
192
193    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
194        // Reduce async state machine size (see `clippy::large_futures`).
195        let writer = Box::pin(PulsarSinkWriter::new(
196            self.config.clone(),
197            self.schema.clone(),
198            self.downstream_pk.clone(),
199            &self.format_desc,
200            self.db_name.clone(),
201            self.sink_from_name.clone(),
202        ))
203        .await?;
204        Ok(writer.into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE))
205    }
206
207    async fn validate(&self) -> Result<()> {
208        // For non-append-only Pulsar sink, the primary key must be defined.
209        if self.format_desc.format != SinkFormat::AppendOnly && self.downstream_pk.is_empty() {
210            return Err(SinkError::Config(anyhow!(
211                "primary key not defined for {:?} pulsar sink (please define in `primary_key` field)",
212                self.format_desc.format
213            )));
214        }
215        // Check for formatter constructor error, before it is too late for error reporting.
216        SinkFormatterImpl::new(
217            &self.format_desc,
218            self.schema.clone(),
219            self.downstream_pk.clone(),
220            self.db_name.clone(),
221            self.sink_from_name.clone(),
222            &self.config.common.topic,
223        )
224        .await?;
225
226        // Validate pulsar connection.
227        let pulsar = self
228            .config
229            .common
230            .build_client(&self.config.oauth, &self.config.aws_auth_props)
231            .await?;
232        build_pulsar_producer(&pulsar, &self.config).await?;
233
234        Ok(())
235    }
236}
237
238pub struct PulsarSinkWriter {
239    formatter: SinkFormatterImpl,
240    #[expect(dead_code)]
241    pulsar: Pulsar<TokioExecutor>,
242    producer: Producer<TokioExecutor>,
243    config: PulsarConfig,
244}
245
246struct PulsarPayloadWriter<'w> {
247    producer: &'w mut Producer<TokioExecutor>,
248    config: &'w PulsarConfig,
249    add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
250}
251
252mod opaque_type {
253    use super::*;
254    pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
255
256    #[define_opaque(PulsarDeliveryFuture)]
257    pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
258        future.map(|result| {
259            result
260                .map(|_| ())
261                .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
262        })
263    }
264}
265pub use opaque_type::PulsarDeliveryFuture;
266use opaque_type::may_delivery_future;
267
268impl PulsarSinkWriter {
269    pub async fn new(
270        config: PulsarConfig,
271        schema: Schema,
272        downstream_pk: Vec<usize>,
273        format_desc: &SinkFormatDesc,
274        db_name: String,
275        sink_from_name: String,
276    ) -> Result<Self> {
277        let formatter = SinkFormatterImpl::new(
278            format_desc,
279            schema,
280            downstream_pk,
281            db_name,
282            sink_from_name,
283            &config.common.topic,
284        )
285        .await?;
286        let pulsar = config
287            .common
288            .build_client(&config.oauth, &config.aws_auth_props)
289            .await?;
290        let producer = build_pulsar_producer(&pulsar, &config).await?;
291        Ok(Self {
292            formatter,
293            pulsar,
294            producer,
295            config,
296        })
297    }
298}
299
300impl PulsarPayloadWriter<'_> {
301    async fn send_message(&mut self, message: Message) -> Result<()> {
302        let mut success_flag = false;
303        let mut connection_err = None;
304
305        for retry_num in 0..self.config.max_retry_num {
306            if retry_num > 0 {
307                tracing::warn!("Failed to send message, at retry no. {retry_num}");
308            }
309            match Box::pin(self.producer.send_non_blocking(message.clone())).await {
310                // If the message is sent successfully,
311                // a SendFuture holding the message receipt
312                // or error after sending is returned
313                Ok(send_future) => {
314                    self.add_future
315                        .add_future_may_await(may_delivery_future(send_future))
316                        .await?;
317                    success_flag = true;
318                    break;
319                }
320                // error upon sending
321                Err(e) => match e {
322                    pulsar::Error::Connection(_)
323                    | pulsar::Error::Producer(_)
324                    | pulsar::Error::Consumer(_) => {
325                        connection_err = Some(e);
326                        tokio::time::sleep(self.config.retry_interval).await;
327                        continue;
328                    }
329                    _ => return Err(SinkError::Pulsar(anyhow!(e))),
330                },
331            }
332        }
333
334        if !success_flag {
335            Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
336        } else {
337            Ok(())
338        }
339    }
340
341    async fn write_inner(
342        &mut self,
343        event_key_object: Option<String>,
344        event_object: Option<Vec<u8>>,
345    ) -> Result<()> {
346        let message = Message {
347            partition_key: event_key_object,
348            payload: event_object.unwrap_or_default(),
349            ..Default::default()
350        };
351
352        self.send_message(message).await?;
353        Ok(())
354    }
355}
356
357impl FormattedSink for PulsarPayloadWriter<'_> {
358    type K = String;
359    type V = Vec<u8>;
360
361    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
362        self.write_inner(k, v).await
363    }
364}
365
366impl AsyncTruncateSinkWriter for PulsarSinkWriter {
367    type DeliveryFuture = PulsarDeliveryFuture;
368
369    async fn write_chunk<'a>(
370        &'a mut self,
371        chunk: StreamChunk,
372        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
373    ) -> Result<()> {
374        // Structured to avoid `clippy::large_stack_frames` and `large_futures`
375        let iter = {
376            dispatch_sink_formatter_str_key_impl!(
377                &self.formatter,
378                formatter,
379                {
380                    // Convert items to owned, concrete types before any `.await`,
381                    // so the future doesn't capture formatter/iterator generics.
382                    formatter.format_chunk(&chunk).map(|r| {
383                        let (key, value) = r?;
384                        let key: Option<String> = key.map(SerTo::ser_to).transpose()?;
385                        let value: Option<Vec<u8>> = value.map(SerTo::ser_to).transpose()?;
386                        Ok((key, value)) as Result<_>
387                    })
388                },
389                // Produce a single iterator type for all formatter variants.
390                auto_enums::auto_enum(Iterator)
391            )
392        };
393
394        // Only concrete state is held across `.await`, keeping the future small.
395        let mut payload_writer = PulsarPayloadWriter {
396            producer: &mut self.producer,
397            add_future,
398            config: &self.config,
399        };
400
401        for r in iter {
402            let (key, value): (Option<String>, Option<Vec<u8>>) = r?;
403            payload_writer.write_inner(key, value).await?;
404        }
405
406        Ok(())
407    }
408
409    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
410        if is_checkpoint {
411            self.producer
412                .send_batch()
413                .map_err(pulsar_to_sink_err)
414                .await?;
415        }
416
417        Ok(())
418    }
419}