risingwave_connector/sink/
kafka.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::{Future, FutureExt, TryFuture};
22use rdkafka::ClientConfig;
23use rdkafka::error::KafkaError;
24use rdkafka::message::ToBytes;
25use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
26use rdkafka::types::RDKafkaErrorCode;
27use risingwave_common::array::StreamChunk;
28use risingwave_common::catalog::Schema;
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use strum_macros::{Display, EnumString};
32use thiserror_ext::AsReport;
33use with_options::WithOptions;
34
35use super::catalog::{SinkFormat, SinkFormatDesc};
36use super::{Sink, SinkError, SinkParam};
37use crate::connector_common::{
38    AwsAuthProps, KafkaCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
39    RdKafkaPropertiesCommon,
40};
41use crate::sink::formatter::SinkFormatterImpl;
42use crate::sink::log_store::DeliveryFutureManagerAddFuture;
43use crate::sink::writer::{
44    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
45};
46use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
47use crate::source::kafka::{
48    KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
49};
50use crate::source::{SourceEnumeratorContext, SplitEnumerator};
51use crate::{
52    deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
53};
54
55pub const KAFKA_SINK: &str = "kafka";
56
57const fn _default_max_retries() -> u32 {
58    3
59}
60
61const fn _default_retry_backoff() -> Duration {
62    Duration::from_millis(100)
63}
64
65const fn _default_max_in_flight_requests_per_connection() -> usize {
66    5
67}
68
69#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
70#[strum(serialize_all = "snake_case")]
71pub enum CompressionCodec {
72    None,
73    Gzip,
74    Snappy,
75    Lz4,
76    Zstd,
77}
78
79/// See <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
80/// for the detailed meaning of these librdkafka producer properties
81#[serde_as]
82#[derive(Debug, Clone, Deserialize, WithOptions)]
83pub struct RdKafkaPropertiesProducer {
84    /// Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics.
85    #[serde(rename = "properties.allow.auto.create.topics")]
86    #[serde_as(as = "Option<DisplayFromStr>")]
87    pub allow_auto_create_topics: Option<bool>,
88
89    /// Maximum number of messages allowed on the producer queue. This queue is shared by all
90    /// topics and partitions. A value of 0 disables this limit.
91    #[serde(rename = "properties.queue.buffering.max.messages")]
92    #[serde_as(as = "Option<DisplayFromStr>")]
93    pub queue_buffering_max_messages: Option<usize>,
94
95    /// Maximum total message size sum allowed on the producer queue. This queue is shared by all
96    /// topics and partitions. This property has higher priority than queue.buffering.max.messages.
97    #[serde(rename = "properties.queue.buffering.max.kbytes")]
98    #[serde_as(as = "Option<DisplayFromStr>")]
99    queue_buffering_max_kbytes: Option<usize>,
100
101    /// Delay in milliseconds to wait for messages in the producer queue to accumulate before
102    /// constructing message batches (MessageSets) to transmit to brokers. A higher value allows
103    /// larger and more effective (less overhead, improved compression) batches of messages to
104    /// accumulate at the expense of increased message delivery latency.
105    #[serde(rename = "properties.queue.buffering.max.ms")]
106    #[serde_as(as = "Option<DisplayFromStr>")]
107    queue_buffering_max_ms: Option<f64>,
108
109    /// When set to true, the producer will ensure that messages are successfully produced exactly
110    /// once and in the original produce order. The following configuration properties are adjusted
111    /// automatically (if not modified by the user) when idempotence is enabled:
112    /// max.in.flight.requests.per.connection=5 (must be less than or equal to 5),
113    /// retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer
114    /// will fail if user-supplied configuration is incompatible.
115    #[serde(rename = "properties.enable.idempotence")]
116    #[serde_as(as = "Option<DisplayFromStr>")]
117    enable_idempotence: Option<bool>,
118
119    /// How many times to retry sending a failing Message.
120    #[serde(rename = "properties.message.send.max.retries")]
121    #[serde_as(as = "Option<DisplayFromStr>")]
122    message_send_max_retries: Option<usize>,
123
124    /// The backoff time in milliseconds before retrying a protocol request.
125    #[serde(rename = "properties.retry.backoff.ms")]
126    #[serde_as(as = "Option<DisplayFromStr>")]
127    retry_backoff_ms: Option<usize>,
128
129    /// Maximum number of messages batched in one MessageSet
130    #[serde(rename = "properties.batch.num.messages")]
131    #[serde_as(as = "Option<DisplayFromStr>")]
132    batch_num_messages: Option<usize>,
133
134    /// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
135    /// framing overhead. This limit is applied after the first message has been added to the
136    /// batch, regardless of the first message's size, this is to ensure that messages that exceed
137    /// batch.size are produced.
138    #[serde(rename = "properties.batch.size")]
139    #[serde_as(as = "Option<DisplayFromStr>")]
140    batch_size: Option<usize>,
141
142    /// Compression codec to use for compressing message sets.
143    #[serde(rename = "properties.compression.codec")]
144    #[serde_as(as = "Option<DisplayFromStr>")]
145    compression_codec: Option<CompressionCodec>,
146
147    /// Produce message timeout.
148    /// This value is used to limits the time a produced message waits for
149    /// successful delivery (including retries).
150    #[serde(rename = "properties.message.timeout.ms")]
151    #[serde_as(as = "Option<DisplayFromStr>")]
152    message_timeout_ms: Option<usize>,
153
154    /// The maximum number of unacknowledged requests the client will send on a single connection before blocking.
155    #[serde(
156        rename = "properties.max.in.flight.requests.per.connection",
157        default = "_default_max_in_flight_requests_per_connection"
158    )]
159    #[serde_as(as = "DisplayFromStr")]
160    max_in_flight_requests_per_connection: usize,
161
162    #[serde(rename = "properties.request.required.acks")]
163    #[serde_as(as = "Option<DisplayFromStr>")]
164    request_required_acks: Option<i32>,
165}
166
167impl RdKafkaPropertiesProducer {
168    pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
169        if let Some(v) = self.allow_auto_create_topics {
170            c.set("allow.auto.create.topics", v.to_string());
171        }
172        if let Some(v) = self.queue_buffering_max_messages {
173            c.set("queue.buffering.max.messages", v.to_string());
174        }
175        if let Some(v) = self.queue_buffering_max_kbytes {
176            c.set("queue.buffering.max.kbytes", v.to_string());
177        }
178        if let Some(v) = self.queue_buffering_max_ms {
179            c.set("queue.buffering.max.ms", v.to_string());
180        }
181        if let Some(v) = self.enable_idempotence {
182            c.set("enable.idempotence", v.to_string());
183        }
184        if let Some(v) = self.message_send_max_retries {
185            c.set("message.send.max.retries", v.to_string());
186        }
187        if let Some(v) = self.retry_backoff_ms {
188            c.set("retry.backoff.ms", v.to_string());
189        }
190        if let Some(v) = self.batch_num_messages {
191            c.set("batch.num.messages", v.to_string());
192        }
193        if let Some(v) = self.batch_size {
194            c.set("batch.size", v.to_string());
195        }
196        if let Some(v) = &self.compression_codec {
197            c.set("compression.codec", v.to_string());
198        }
199        if let Some(v) = self.request_required_acks {
200            c.set("request.required.acks", v.to_string());
201        }
202        if let Some(v) = self.message_timeout_ms {
203            c.set("message.timeout.ms", v.to_string());
204        }
205        c.set(
206            "max.in.flight.requests.per.connection",
207            self.max_in_flight_requests_per_connection.to_string(),
208        );
209    }
210}
211
212#[serde_as]
213#[derive(Debug, Clone, Deserialize, WithOptions)]
214pub struct KafkaConfig {
215    #[serde(flatten)]
216    pub common: KafkaCommon,
217
218    #[serde(flatten)]
219    pub connection: KafkaConnectionProps,
220
221    #[serde(
222        rename = "properties.retry.max",
223        default = "_default_max_retries",
224        deserialize_with = "deserialize_u32_from_string"
225    )]
226    pub max_retry_num: u32,
227
228    #[serde(
229        rename = "properties.retry.interval",
230        default = "_default_retry_backoff",
231        deserialize_with = "deserialize_duration_from_string"
232    )]
233    pub retry_interval: Duration,
234
235    /// We have parsed the primary key for an upsert kafka sink into a `usize` vector representing
236    /// the indices of the pk columns in the frontend, so we simply store the primary key here
237    /// as a string.
238    pub primary_key: Option<String>,
239
240    #[serde(flatten)]
241    pub rdkafka_properties_common: RdKafkaPropertiesCommon,
242
243    #[serde(flatten)]
244    pub rdkafka_properties_producer: RdKafkaPropertiesProducer,
245
246    #[serde(flatten)]
247    pub privatelink_common: KafkaPrivateLinkCommon,
248
249    #[serde(flatten)]
250    pub aws_auth_props: AwsAuthProps,
251}
252
253impl KafkaConfig {
254    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
255        let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
256            .map_err(|e| SinkError::Config(anyhow!(e)))?;
257
258        Ok(config)
259    }
260
261    pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
262        self.rdkafka_properties_common.set_client(c);
263        self.rdkafka_properties_producer.set_client(c);
264    }
265}
266
267impl From<KafkaConfig> for KafkaProperties {
268    fn from(val: KafkaConfig) -> Self {
269        KafkaProperties {
270            bytes_per_second: None,
271            max_num_messages: None,
272            scan_startup_mode: None,
273            time_offset: None,
274            upsert: None,
275            common: val.common,
276            connection: val.connection,
277            rdkafka_properties_common: val.rdkafka_properties_common,
278            rdkafka_properties_consumer: Default::default(),
279            privatelink_common: val.privatelink_common,
280            aws_auth_props: val.aws_auth_props,
281            group_id_prefix: None,
282            unknown_fields: Default::default(),
283        }
284    }
285}
286
287#[derive(Debug)]
288pub struct KafkaSink {
289    pub config: KafkaConfig,
290    schema: Schema,
291    pk_indices: Vec<usize>,
292    format_desc: SinkFormatDesc,
293    db_name: String,
294    sink_from_name: String,
295}
296
297impl TryFrom<SinkParam> for KafkaSink {
298    type Error = SinkError;
299
300    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
301        let schema = param.schema();
302        let config = KafkaConfig::from_btreemap(param.properties)?;
303        Ok(Self {
304            config,
305            schema,
306            pk_indices: param.downstream_pk,
307            format_desc: param
308                .format_desc
309                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
310            db_name: param.db_name,
311            sink_from_name: param.sink_from_name,
312        })
313    }
314}
315
316impl Sink for KafkaSink {
317    type Coordinator = DummySinkCommitCoordinator;
318    type LogSinker = AsyncTruncateLogSinkerOf<KafkaSinkWriter>;
319
320    const SINK_NAME: &'static str = KAFKA_SINK;
321
322    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
323        let formatter = SinkFormatterImpl::new(
324            &self.format_desc,
325            self.schema.clone(),
326            self.pk_indices.clone(),
327            self.db_name.clone(),
328            self.sink_from_name.clone(),
329            &self.config.common.topic,
330        )
331        .await?;
332        let max_delivery_buffer_size = (self
333            .config
334            .rdkafka_properties_producer
335            .queue_buffering_max_messages
336            .as_ref()
337            .cloned()
338            .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
339            * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;
340
341        Ok(KafkaSinkWriter::new(self.config.clone(), formatter)
342            .await?
343            .into_log_sinker(max_delivery_buffer_size))
344    }
345
346    async fn validate(&self) -> Result<()> {
347        // For upsert Kafka sink, the primary key must be defined.
348        if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
349            return Err(SinkError::Config(anyhow!(
350                "primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
351                self.format_desc.format
352            )));
353        }
354        // Check for formatter constructor error, before it is too late for error reporting.
355        SinkFormatterImpl::new(
356            &self.format_desc,
357            self.schema.clone(),
358            self.pk_indices.clone(),
359            self.db_name.clone(),
360            self.sink_from_name.clone(),
361            &self.config.common.topic,
362        )
363        .await?;
364
365        // Try Kafka connection.
366        // There is no such interface for kafka producer to validate a connection
367        // use enumerator to validate broker reachability and existence of topic
368        let check = KafkaSplitEnumerator::new(
369            KafkaProperties::from(self.config.clone()),
370            Arc::new(SourceEnumeratorContext::dummy()),
371        )
372        .await?;
373        if !check.check_reachability().await {
374            return Err(SinkError::Config(anyhow!(
375                "cannot connect to kafka broker ({})",
376                self.config.connection.brokers
377            )));
378        }
379        Ok(())
380    }
381}
382
383/// When the `DeliveryFuture` the current `future_delivery_buffer`
384/// is buffering is greater than `queue_buffering_max_messages` * `KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO`,
385/// then enforcing commit once
386const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
387/// The default queue size used to enforce a commit in kafka producer if `queue.buffering.max.messages` is not specified.
388/// This default value is determined based on the librdkafka default. See the following doc for more details:
389/// <https://github.com/confluentinc/librdkafka/blob/1cb80090dfc75f5a36eae3f4f8844b14885c045e/CONFIGURATION.md>
390const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;
391
392struct KafkaPayloadWriter<'a> {
393    inner: &'a FutureProducer<RwProducerContext>,
394    add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
395    config: &'a KafkaConfig,
396}
397
398mod opaque_type {
399    use super::*;
400    pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
401
402    pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
403        future.map(KafkaPayloadWriter::<'static>::map_future_result)
404    }
405}
406pub use opaque_type::KafkaSinkDeliveryFuture;
407use opaque_type::map_delivery_future;
408
409pub struct KafkaSinkWriter {
410    formatter: SinkFormatterImpl,
411    inner: FutureProducer<RwProducerContext>,
412    config: KafkaConfig,
413}
414
415impl KafkaSinkWriter {
416    async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
417        let inner: FutureProducer<RwProducerContext> = {
418            let mut c = ClientConfig::new();
419
420            // KafkaConfig configuration
421            config.connection.set_security_properties(&mut c);
422            config.set_client(&mut c);
423
424            // ClientConfig configuration
425            c.set("bootstrap.servers", &config.connection.brokers);
426
427            // Create the producer context, will be used to create the producer
428            let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
429            let ctx_common = KafkaContextCommon::new(
430                broker_rewrite_map,
431                None,
432                None,
433                config.aws_auth_props.clone(),
434                config.connection.is_aws_msk_iam(),
435            )
436            .await?;
437            let producer_ctx = RwProducerContext::new(ctx_common);
438            // Generate the producer
439            c.create_with_context(producer_ctx).await?
440        };
441
442        Ok(KafkaSinkWriter {
443            formatter,
444            inner,
445            config: config.clone(),
446        })
447    }
448}
449
450impl AsyncTruncateSinkWriter for KafkaSinkWriter {
451    type DeliveryFuture = KafkaSinkDeliveryFuture;
452
453    async fn write_chunk<'a>(
454        &'a mut self,
455        chunk: StreamChunk,
456        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
457    ) -> Result<()> {
458        let mut payload_writer = KafkaPayloadWriter {
459            inner: &mut self.inner,
460            add_future,
461            config: &self.config,
462        };
463        dispatch_sink_formatter_impl!(&self.formatter, formatter, {
464            payload_writer.write_chunk(chunk, formatter).await
465        })
466    }
467}
468
469impl KafkaPayloadWriter<'_> {
470    /// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink
471    /// messages
472    async fn send_result<'a, K, P>(&'a mut self, mut record: FutureRecord<'a, K, P>) -> Result<()>
473    where
474        K: ToBytes + ?Sized,
475        P: ToBytes + ?Sized,
476    {
477        let mut success_flag = false;
478
479        let mut ret = Ok(());
480
481        for i in 0..self.config.max_retry_num {
482            match self.inner.send_result(record) {
483                Ok(delivery_future) => {
484                    if self
485                        .add_future
486                        .add_future_may_await(map_delivery_future(delivery_future))
487                        .await?
488                    {
489                        tracing::warn!(
490                            "Number of records being delivered ({}) >= expected kafka producer queue size ({}).
491                            This indicates the default value of queue.buffering.max.messages has changed.",
492                            self.add_future.future_count(),
493                            self.add_future.max_future_count()
494                        );
495                    }
496                    success_flag = true;
497                    break;
498                }
499                // The enqueue buffer is full, `send_result` will immediately return
500                // We can retry for another round after sleeping for sometime
501                Err((e, rec)) => {
502                    tracing::warn!(
503                        error = %e.as_report(),
504                        "producing message (key {:?}) to topic {} failed",
505                        rec.key.map(|k| k.to_bytes()),
506                        rec.topic,
507                    );
508                    record = rec;
509                    match e {
510                        KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
511                            tracing::warn!(
512                                "Producer queue full. Delivery future buffer size={}. Await and retry #{}",
513                                self.add_future.future_count(),
514                                i
515                            );
516                            self.add_future.await_one_delivery().await?;
517                            continue;
518                        }
519                        _ => return Err(e.into()),
520                    }
521                }
522            }
523        }
524
525        if !success_flag {
526            // In this case, after trying `max_retry_num`
527            // The enqueue buffer is still full
528            ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull).into());
529        }
530
531        ret
532    }
533
534    async fn write_inner(
535        &mut self,
536        event_key_object: Option<Vec<u8>>,
537        event_object: Option<Vec<u8>>,
538    ) -> Result<()> {
539        let topic = self.config.common.topic.clone();
540        let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str());
541        if let Some(key_str) = &event_key_object {
542            record = record.key(key_str);
543        }
544        if let Some(payload) = &event_object {
545            record = record.payload(payload);
546        }
547        // Send the data but not wait it to finish sinking
548        // Will join all `DeliveryFuture` during commit
549        self.send_result(record).await?;
550        Ok(())
551    }
552
553    fn map_future_result(delivery_future_result: <DeliveryFuture as Future>::Output) -> Result<()> {
554        match delivery_future_result {
555            // Successfully sent the record
556            // Will return the partition and offset of the message (i32, i64)
557            // Note that `Vec<()>` won't cause memory allocation
558            Ok(Ok(_)) => Ok(()),
559            // If the message failed to be delivered. (i.e., flush)
560            // The error & the copy of the original message will be returned
561            // i.e., (KafkaError, OwnedMessage)
562            // We will just stop the loop, and return the error
563            // The sink executor will back to the latest checkpoint
564            Ok(Err((k_err, _msg))) => Err(k_err.into()),
565            // This represents the producer is dropped
566            // before the delivery status is received
567            // Return `KafkaError::Canceled`
568            Err(_) => Err(KafkaError::Canceled.into()),
569        }
570    }
571}
572
573impl FormattedSink for KafkaPayloadWriter<'_> {
574    type K = Vec<u8>;
575    type V = Vec<u8>;
576
577    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
578        self.write_inner(k, v).await
579    }
580}
581
582#[cfg(test)]
583mod test {
584    use maplit::btreemap;
585    use risingwave_common::catalog::Field;
586    use risingwave_common::types::DataType;
587
588    use super::*;
589    use crate::sink::encoder::{
590        DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode,
591        TimestamptzHandlingMode,
592    };
593    use crate::sink::formatter::AppendOnlyFormatter;
594
595    #[test]
596    fn parse_rdkafka_props() {
597        let props: BTreeMap<String, String> = btreemap! {
598            // basic
599            // "connector".to_string() => "kafka".to_string(),
600            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
601            "topic".to_owned() => "test".to_owned(),
602            // "type".to_string() => "append-only".to_string(),
603            // RdKafkaPropertiesCommon
604            "properties.message.max.bytes".to_owned() => "12345".to_owned(),
605            "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
606            // RdKafkaPropertiesProducer
607            "properties.queue.buffering.max.messages".to_owned() => "114514".to_owned(),
608            "properties.queue.buffering.max.kbytes".to_owned() => "114514".to_owned(),
609            "properties.queue.buffering.max.ms".to_owned() => "114.514".to_owned(),
610            "properties.enable.idempotence".to_owned() => "false".to_owned(),
611            "properties.message.send.max.retries".to_owned() => "114514".to_owned(),
612            "properties.retry.backoff.ms".to_owned() => "114514".to_owned(),
613            "properties.batch.num.messages".to_owned() => "114514".to_owned(),
614            "properties.batch.size".to_owned() => "114514".to_owned(),
615            "properties.compression.codec".to_owned() => "zstd".to_owned(),
616            "properties.message.timeout.ms".to_owned() => "114514".to_owned(),
617            "properties.max.in.flight.requests.per.connection".to_owned() => "114514".to_owned(),
618            "properties.request.required.acks".to_owned() => "-1".to_owned(),
619        };
620        let c = KafkaConfig::from_btreemap(props).unwrap();
621        assert_eq!(
622            c.rdkafka_properties_producer.queue_buffering_max_ms,
623            Some(114.514f64)
624        );
625        assert_eq!(
626            c.rdkafka_properties_producer.compression_codec,
627            Some(CompressionCodec::Zstd)
628        );
629        assert_eq!(
630            c.rdkafka_properties_producer.message_timeout_ms,
631            Some(114514)
632        );
633        assert_eq!(
634            c.rdkafka_properties_producer
635                .max_in_flight_requests_per_connection,
636            114514
637        );
638        assert_eq!(
639            c.rdkafka_properties_producer.request_required_acks,
640            Some(-1)
641        );
642
643        let props: BTreeMap<String, String> = btreemap! {
644            // basic
645            "connector".to_owned() => "kafka".to_owned(),
646            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
647            "topic".to_owned() => "test".to_owned(),
648            "type".to_owned() => "append-only".to_owned(),
649
650            "properties.enable.idempotence".to_owned() => "True".to_owned(), // can only be 'true' or 'false'
651        };
652        assert!(KafkaConfig::from_btreemap(props).is_err());
653
654        let props: BTreeMap<String, String> = btreemap! {
655            // basic
656            "connector".to_owned() => "kafka".to_owned(),
657            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
658            "topic".to_owned() => "test".to_owned(),
659            "type".to_owned() => "append-only".to_owned(),
660            "properties.queue.buffering.max.kbytes".to_owned() => "-114514".to_owned(), // usize cannot be negative
661        };
662        assert!(KafkaConfig::from_btreemap(props).is_err());
663
664        let props: BTreeMap<String, String> = btreemap! {
665            // basic
666            "connector".to_owned() => "kafka".to_owned(),
667            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
668            "topic".to_owned() => "test".to_owned(),
669            "type".to_owned() => "append-only".to_owned(),
670            "properties.compression.codec".to_owned() => "notvalid".to_owned(), // has to be a valid CompressionCodec
671        };
672        assert!(KafkaConfig::from_btreemap(props).is_err());
673    }
674
675    #[test]
676    fn parse_kafka_config() {
677        let properties: BTreeMap<String, String> = btreemap! {
678            // "connector".to_string() => "kafka".to_string(),
679            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
680            "topic".to_owned() => "test".to_owned(),
681            // "type".to_string() => "append-only".to_string(),
682            // "force_append_only".to_string() => "true".to_string(),
683            "properties.security.protocol".to_owned() => "SASL".to_owned(),
684            "properties.sasl.mechanism".to_owned() => "SASL".to_owned(),
685            "properties.sasl.username".to_owned() => "test".to_owned(),
686            "properties.sasl.password".to_owned() => "test".to_owned(),
687            "properties.retry.max".to_owned() => "20".to_owned(),
688            "properties.retry.interval".to_owned() => "500ms".to_owned(),
689            // PrivateLink
690            "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
691        };
692        let config = KafkaConfig::from_btreemap(properties).unwrap();
693        assert_eq!(config.connection.brokers, "localhost:9092");
694        assert_eq!(config.common.topic, "test");
695        assert_eq!(config.max_retry_num, 20);
696        assert_eq!(config.retry_interval, Duration::from_millis(500));
697
698        // PrivateLink fields
699        let btreemap: BTreeMap<String, String> = btreemap! {
700            "broker1".to_owned() => "10.0.0.1:8001".to_owned()
701        };
702        assert_eq!(config.privatelink_common.broker_rewrite_map, Some(btreemap));
703
704        // Optional fields eliminated.
705        let properties: BTreeMap<String, String> = btreemap! {
706            // "connector".to_string() => "kafka".to_string(),
707            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
708            "topic".to_owned() => "test".to_owned(),
709            // "type".to_string() => "upsert".to_string(),
710        };
711        let config = KafkaConfig::from_btreemap(properties).unwrap();
712        assert_eq!(config.max_retry_num, 3);
713        assert_eq!(config.retry_interval, Duration::from_millis(100));
714
715        // Invalid u32 input.
716        let properties: BTreeMap<String, String> = btreemap! {
717            "connector".to_owned() => "kafka".to_owned(),
718            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
719            "topic".to_owned() => "test".to_owned(),
720            "type".to_owned() => "upsert".to_owned(),
721            "properties.retry.max".to_owned() => "-20".to_owned(),  // error!
722        };
723        assert!(KafkaConfig::from_btreemap(properties).is_err());
724
725        // Invalid duration input.
726        let properties: BTreeMap<String, String> = btreemap! {
727            "connector".to_owned() => "kafka".to_owned(),
728            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
729            "topic".to_owned() => "test".to_owned(),
730            "type".to_owned() => "upsert".to_owned(),
731            "properties.retry.interval".to_owned() => "500miiinutes".to_owned(),  // invalid duration
732        };
733        assert!(KafkaConfig::from_btreemap(properties).is_err());
734    }
735
736    /// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore]
737    /// to run the test, also remember to modify `risedev.yml`
738    #[ignore]
739    #[tokio::test]
740    async fn test_kafka_producer() -> Result<()> {
741        // Create a dummy kafka properties
742        let properties = btreemap! {
743            "connector".to_owned() => "kafka".to_owned(),
744            "properties.bootstrap.server".to_owned() => "localhost:29092".to_owned(),
745            "type".to_owned() => "append-only".to_owned(),
746            "topic".to_owned() => "test_topic".to_owned(),
747            "properties.compression.codec".to_owned() => "zstd".to_owned(),
748        };
749
750        // Create a table with two columns (| id : INT32 | v2 : VARCHAR |) here
751        let schema = Schema::new(vec![
752            Field {
753                data_type: DataType::Int32,
754                name: "id".into(),
755            },
756            Field {
757                data_type: DataType::Varchar,
758                name: "v2".into(),
759            },
760        ]);
761
762        let kafka_config = KafkaConfig::from_btreemap(properties)?;
763
764        // Create the actual sink writer to Kafka
765        let sink = KafkaSinkWriter::new(
766            kafka_config.clone(),
767            SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
768                // We do not specify primary key for this schema
769                None,
770                JsonEncoder::new(
771                    schema,
772                    None,
773                    DateHandlingMode::FromCe,
774                    TimestampHandlingMode::Milli,
775                    TimestamptzHandlingMode::UtcString,
776                    TimeHandlingMode::Milli,
777                    JsonbHandlingMode::String,
778                ),
779            )),
780        )
781        .await
782        .unwrap();
783
784        use crate::sink::log_store::DeliveryFutureManager;
785
786        let mut future_manager = DeliveryFutureManager::new(usize::MAX);
787
788        for i in 0..10 {
789            println!("epoch: {}", i);
790            for j in 0..100 {
791                let mut writer = KafkaPayloadWriter {
792                    inner: &sink.inner,
793                    add_future: future_manager.start_write_chunk(i, j),
794                    config: &sink.config,
795                };
796                match writer
797                    .send_result(
798                        FutureRecord::to(kafka_config.common.topic.as_str())
799                            .payload(format!("value-{}", j).as_bytes())
800                            .key(format!("dummy_key_for_epoch-{}", i).as_bytes()),
801                    )
802                    .await
803                {
804                    Ok(_) => {}
805                    Err(e) => {
806                        println!("{:?}", e);
807                        break;
808                    }
809                };
810            }
811        }
812
813        Ok(())
814    }
815}