risingwave_connector/sink/
pulsar.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::{FutureExt, TryFuture, TryFutureExt};
21use pulsar::producer::{Message, SendFuture};
22use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::{DisplayFromStr, serde_as};
27use with_options::WithOptions;
28
29use super::catalog::{SinkFormat, SinkFormatDesc};
30use super::{Sink, SinkError, SinkParam, SinkWriterParam};
31use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon};
32use crate::enforce_secret::EnforceSecret;
33use crate::sink::Result;
34use crate::sink::encoder::SerTo;
35use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl};
36use crate::sink::log_store::DeliveryFutureManagerAddFuture;
37use crate::sink::writer::{
38    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
39};
40use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl};
41
42pub const PULSAR_SINK: &str = "pulsar";
43
44/// The delivery buffer queue size
45/// When the `SendFuture` the current `send_future_buffer`
46/// is buffering is greater than this size, then enforcing commit once
47const PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
48
49const fn _default_max_retries() -> u32 {
50    3
51}
52
53const fn _default_retry_backoff() -> Duration {
54    Duration::from_millis(100)
55}
56
57const fn _default_batch_size() -> u32 {
58    10000
59}
60
61const fn _default_batch_byte_size() -> usize {
62    1 << 20
63}
64
65fn pulsar_to_sink_err(e: pulsar::Error) -> SinkError {
66    SinkError::Pulsar(anyhow!(e))
67}
68
69async fn build_pulsar_producer(
70    pulsar: &Pulsar<TokioExecutor>,
71    config: &PulsarConfig,
72) -> Result<Producer<TokioExecutor>> {
73    // Reduce async state machine size (see `clippy::large_futures`).
74    Box::pin(
75        pulsar
76            .producer()
77            .with_options(ProducerOptions {
78                batch_size: Some(config.producer_properties.batch_size),
79                batch_byte_size: Some(config.producer_properties.batch_byte_size),
80                ..Default::default()
81            })
82            .with_topic(&config.common.topic)
83            .build()
84            .map_err(pulsar_to_sink_err),
85    )
86    .await
87}
88
89#[serde_as]
90#[derive(Debug, Clone, Deserialize, WithOptions)]
91pub struct PulsarPropertiesProducer {
92    #[serde(rename = "properties.batch.size", default = "_default_batch_size")]
93    #[serde_as(as = "DisplayFromStr")]
94    batch_size: u32,
95
96    #[serde(
97        rename = "properties.batch.byte.size",
98        default = "_default_batch_byte_size"
99    )]
100    #[serde_as(as = "DisplayFromStr")]
101    batch_byte_size: usize,
102}
103
104#[serde_as]
105#[derive(Debug, Clone, Deserialize, WithOptions)]
106pub struct PulsarConfig {
107    #[serde(rename = "properties.retry.max", default = "_default_max_retries")]
108    #[serde_as(as = "DisplayFromStr")]
109    pub max_retry_num: u32,
110
111    #[serde(
112        rename = "properties.retry.interval",
113        default = "_default_retry_backoff",
114        deserialize_with = "deserialize_duration_from_string"
115    )]
116    pub retry_interval: Duration,
117
118    #[serde(flatten)]
119    pub common: PulsarCommon,
120
121    #[serde(flatten)]
122    pub oauth: Option<PulsarOauthCommon>,
123
124    #[serde(flatten)]
125    pub aws_auth_props: AwsAuthProps,
126
127    #[serde(flatten)]
128    pub producer_properties: PulsarPropertiesProducer,
129}
130
131impl EnforceSecret for PulsarConfig {
132    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
133        PulsarCommon::enforce_one(prop)?;
134        AwsAuthProps::enforce_one(prop)?;
135        Ok(())
136    }
137}
138impl PulsarConfig {
139    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
140        let config = serde_json::from_value::<PulsarConfig>(serde_json::to_value(values).unwrap())
141            .map_err(|e| SinkError::Config(anyhow!(e)))?;
142
143        Ok(config)
144    }
145}
146
147#[derive(Debug)]
148pub struct PulsarSink {
149    pub config: PulsarConfig,
150    schema: Schema,
151    downstream_pk: Vec<usize>,
152    format_desc: SinkFormatDesc,
153    db_name: String,
154    sink_from_name: String,
155}
156
157impl EnforceSecret for PulsarSink {
158    fn enforce_secret<'a>(
159        prop_iter: impl Iterator<Item = &'a str>,
160    ) -> crate::error::ConnectorResult<()> {
161        for prop in prop_iter {
162            PulsarConfig::enforce_one(prop)?;
163        }
164        Ok(())
165    }
166}
167
168impl TryFrom<SinkParam> for PulsarSink {
169    type Error = SinkError;
170
171    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
172        let schema = param.schema();
173        let downstream_pk = param.downstream_pk_or_empty();
174        let config = PulsarConfig::from_btreemap(param.properties)?;
175        Ok(Self {
176            config,
177            schema,
178            downstream_pk,
179            format_desc: param
180                .format_desc
181                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
182            db_name: param.db_name,
183            sink_from_name: param.sink_from_name,
184        })
185    }
186}
187
188impl Sink for PulsarSink {
189    type LogSinker = AsyncTruncateLogSinkerOf<PulsarSinkWriter>;
190
191    const SINK_NAME: &'static str = PULSAR_SINK;
192
193    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
194        // Reduce async state machine size (see `clippy::large_futures`).
195        let writer = Box::pin(PulsarSinkWriter::new(
196            self.config.clone(),
197            self.schema.clone(),
198            self.downstream_pk.clone(),
199            &self.format_desc,
200            self.db_name.clone(),
201            self.sink_from_name.clone(),
202        ))
203        .await?;
204        Ok(writer.into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE))
205    }
206
207    async fn validate(&self) -> Result<()> {
208        // For non-append-only Pulsar sink, the primary key must be defined.
209        if self.format_desc.format != SinkFormat::AppendOnly && self.downstream_pk.is_empty() {
210            return Err(SinkError::Config(anyhow!(
211                "primary key not defined for {:?} pulsar sink (please define in `primary_key` field)",
212                self.format_desc.format
213            )));
214        }
215        // Check for formatter constructor error, before it is too late for error reporting.
216        SinkFormatterImpl::new(
217            &self.format_desc,
218            self.schema.clone(),
219            self.downstream_pk.clone(),
220            self.db_name.clone(),
221            self.sink_from_name.clone(),
222            &self.config.common.topic,
223        )
224        .await?;
225
226        // Validate pulsar connection.
227        let pulsar = self
228            .config
229            .common
230            // Source-side Pulsar client retry overrides are intentionally not applied to sinks.
231            .build_client(&self.config.oauth, &self.config.aws_auth_props, None)
232            .await?;
233        build_pulsar_producer(&pulsar, &self.config).await?;
234
235        Ok(())
236    }
237}
238
239pub struct PulsarSinkWriter {
240    formatter: SinkFormatterImpl,
241    #[expect(dead_code)]
242    pulsar: Pulsar<TokioExecutor>,
243    producer: Producer<TokioExecutor>,
244    config: PulsarConfig,
245}
246
247struct PulsarPayloadWriter<'w> {
248    producer: &'w mut Producer<TokioExecutor>,
249    config: &'w PulsarConfig,
250    add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>,
251}
252
253mod opaque_type {
254    use super::*;
255    pub type PulsarDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
256
257    #[define_opaque(PulsarDeliveryFuture)]
258    pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture {
259        future.map(|result| {
260            result
261                .map(|_| ())
262                .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e)))
263        })
264    }
265}
266pub use opaque_type::PulsarDeliveryFuture;
267use opaque_type::may_delivery_future;
268
269impl PulsarSinkWriter {
270    pub async fn new(
271        config: PulsarConfig,
272        schema: Schema,
273        downstream_pk: Vec<usize>,
274        format_desc: &SinkFormatDesc,
275        db_name: String,
276        sink_from_name: String,
277    ) -> Result<Self> {
278        let formatter = SinkFormatterImpl::new(
279            format_desc,
280            schema,
281            downstream_pk,
282            db_name,
283            sink_from_name,
284            &config.common.topic,
285        )
286        .await?;
287        let pulsar = config
288            .common
289            // Source-side Pulsar client retry overrides are intentionally not applied to sinks.
290            .build_client(&config.oauth, &config.aws_auth_props, None)
291            .await?;
292        let producer = build_pulsar_producer(&pulsar, &config).await?;
293        Ok(Self {
294            formatter,
295            pulsar,
296            producer,
297            config,
298        })
299    }
300}
301
302impl PulsarPayloadWriter<'_> {
303    async fn send_message(&mut self, message: Message) -> Result<()> {
304        let mut success_flag = false;
305        let mut connection_err = None;
306
307        for retry_num in 0..self.config.max_retry_num {
308            if retry_num > 0 {
309                tracing::warn!("Failed to send message, at retry no. {retry_num}");
310            }
311            match Box::pin(self.producer.send_non_blocking(message.clone())).await {
312                // If the message is sent successfully,
313                // a SendFuture holding the message receipt
314                // or error after sending is returned
315                Ok(send_future) => {
316                    self.add_future
317                        .add_future_may_await(may_delivery_future(send_future))
318                        .await?;
319                    success_flag = true;
320                    break;
321                }
322                // error upon sending
323                Err(e) => match e {
324                    pulsar::Error::Connection(_)
325                    | pulsar::Error::Producer(_)
326                    | pulsar::Error::Consumer(_) => {
327                        connection_err = Some(e);
328                        tokio::time::sleep(self.config.retry_interval).await;
329                        continue;
330                    }
331                    _ => return Err(SinkError::Pulsar(anyhow!(e))),
332                },
333            }
334        }
335
336        if !success_flag {
337            Err(SinkError::Pulsar(anyhow!(connection_err.unwrap())))
338        } else {
339            Ok(())
340        }
341    }
342
343    async fn write_inner(
344        &mut self,
345        event_key_object: Option<String>,
346        event_object: Option<Vec<u8>>,
347    ) -> Result<()> {
348        let message = Message {
349            partition_key: event_key_object,
350            payload: event_object.unwrap_or_default(),
351            ..Default::default()
352        };
353
354        self.send_message(message).await?;
355        Ok(())
356    }
357}
358
359impl FormattedSink for PulsarPayloadWriter<'_> {
360    type K = String;
361    type V = Vec<u8>;
362
363    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
364        self.write_inner(k, v).await
365    }
366}
367
368impl AsyncTruncateSinkWriter for PulsarSinkWriter {
369    type DeliveryFuture = PulsarDeliveryFuture;
370
371    async fn write_chunk<'a>(
372        &'a mut self,
373        chunk: StreamChunk,
374        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
375    ) -> Result<()> {
376        // Structured to avoid `clippy::large_stack_frames` and `large_futures`
377        let iter = {
378            dispatch_sink_formatter_str_key_impl!(
379                &self.formatter,
380                formatter,
381                {
382                    // Convert items to owned, concrete types before any `.await`,
383                    // so the future doesn't capture formatter/iterator generics.
384                    formatter.format_chunk(&chunk).map(|r| {
385                        let (key, value) = r?;
386                        let key: Option<String> = key.map(SerTo::ser_to).transpose()?;
387                        let value: Option<Vec<u8>> = value.map(SerTo::ser_to).transpose()?;
388                        Ok((key, value)) as Result<_>
389                    })
390                },
391                // Produce a single iterator type for all formatter variants.
392                auto_enums::auto_enum(Iterator)
393            )
394        };
395
396        // Only concrete state is held across `.await`, keeping the future small.
397        let mut payload_writer = PulsarPayloadWriter {
398            producer: &mut self.producer,
399            add_future,
400            config: &self.config,
401        };
402
403        for r in iter {
404            let (key, value): (Option<String>, Option<Vec<u8>>) = r?;
405            payload_writer.write_inner(key, value).await?;
406        }
407
408        Ok(())
409    }
410
411    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
412        if is_checkpoint {
413            self.producer
414                .send_batch()
415                .map_err(pulsar_to_sink_err)
416                .await?;
417        }
418
419        Ok(())
420    }
421}