risingwave_connector/sink/
kafka.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::{Future, FutureExt, TryFuture};
22use rdkafka::ClientConfig;
23use rdkafka::error::KafkaError;
24use rdkafka::message::ToBytes;
25use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
26use rdkafka::types::RDKafkaErrorCode;
27use risingwave_common::array::StreamChunk;
28use risingwave_common::catalog::Schema;
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use strum_macros::{Display, EnumString};
32use thiserror_ext::AsReport;
33use with_options::WithOptions;
34
35use super::catalog::{SinkFormat, SinkFormatDesc};
36use super::{Sink, SinkError, SinkParam};
37use crate::connector_common::{
38    AwsAuthProps, KafkaCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
39    RdKafkaPropertiesCommon, read_kafka_log_level,
40};
41use crate::enforce_secret::EnforceSecret;
42use crate::sink::formatter::SinkFormatterImpl;
43use crate::sink::log_store::DeliveryFutureManagerAddFuture;
44use crate::sink::writer::{
45    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
46};
47use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
48use crate::source::kafka::{
49    KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
50};
51use crate::source::{SourceEnumeratorContext, SplitEnumerator};
52use crate::{
53    deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
54};
55
56pub const KAFKA_SINK: &str = "kafka";
57
58const fn _default_max_retries() -> u32 {
59    3
60}
61
62const fn _default_retry_backoff() -> Duration {
63    Duration::from_millis(100)
64}
65
66const fn _default_max_in_flight_requests_per_connection() -> usize {
67    5
68}
69
70#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
71#[strum(serialize_all = "snake_case")]
72pub enum CompressionCodec {
73    None,
74    Gzip,
75    Snappy,
76    Lz4,
77    Zstd,
78}
79
80/// See <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
81/// for the detailed meaning of these librdkafka producer properties
82#[serde_as]
83#[derive(Debug, Clone, Deserialize, WithOptions)]
84pub struct RdKafkaPropertiesProducer {
85    /// Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics.
86    #[serde(rename = "properties.allow.auto.create.topics")]
87    #[serde_as(as = "Option<DisplayFromStr>")]
88    pub allow_auto_create_topics: Option<bool>,
89
90    /// Maximum number of messages allowed on the producer queue. This queue is shared by all
91    /// topics and partitions. A value of 0 disables this limit.
92    #[serde(rename = "properties.queue.buffering.max.messages")]
93    #[serde_as(as = "Option<DisplayFromStr>")]
94    pub queue_buffering_max_messages: Option<usize>,
95
96    /// Maximum total message size sum allowed on the producer queue. This queue is shared by all
97    /// topics and partitions. This property has higher priority than queue.buffering.max.messages.
98    #[serde(rename = "properties.queue.buffering.max.kbytes")]
99    #[serde_as(as = "Option<DisplayFromStr>")]
100    queue_buffering_max_kbytes: Option<usize>,
101
102    /// Delay in milliseconds to wait for messages in the producer queue to accumulate before
103    /// constructing message batches (MessageSets) to transmit to brokers. A higher value allows
104    /// larger and more effective (less overhead, improved compression) batches of messages to
105    /// accumulate at the expense of increased message delivery latency.
106    #[serde(rename = "properties.queue.buffering.max.ms")]
107    #[serde_as(as = "Option<DisplayFromStr>")]
108    queue_buffering_max_ms: Option<f64>,
109
110    /// When set to true, the producer will ensure that messages are successfully produced exactly
111    /// once and in the original produce order. The following configuration properties are adjusted
112    /// automatically (if not modified by the user) when idempotence is enabled:
113    /// max.in.flight.requests.per.connection=5 (must be less than or equal to 5),
114    /// retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer
115    /// will fail if user-supplied configuration is incompatible.
116    #[serde(rename = "properties.enable.idempotence")]
117    #[serde_as(as = "Option<DisplayFromStr>")]
118    enable_idempotence: Option<bool>,
119
120    /// How many times to retry sending a failing Message.
121    #[serde(rename = "properties.message.send.max.retries")]
122    #[serde_as(as = "Option<DisplayFromStr>")]
123    message_send_max_retries: Option<usize>,
124
125    /// The backoff time in milliseconds before retrying a protocol request.
126    #[serde(rename = "properties.retry.backoff.ms")]
127    #[serde_as(as = "Option<DisplayFromStr>")]
128    retry_backoff_ms: Option<usize>,
129
130    /// Maximum number of messages batched in one MessageSet
131    #[serde(rename = "properties.batch.num.messages")]
132    #[serde_as(as = "Option<DisplayFromStr>")]
133    batch_num_messages: Option<usize>,
134
135    /// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
136    /// framing overhead. This limit is applied after the first message has been added to the
137    /// batch, regardless of the first message's size, this is to ensure that messages that exceed
138    /// batch.size are produced.
139    #[serde(rename = "properties.batch.size")]
140    #[serde_as(as = "Option<DisplayFromStr>")]
141    batch_size: Option<usize>,
142
143    /// Compression codec to use for compressing message sets.
144    #[serde(rename = "properties.compression.codec")]
145    #[serde_as(as = "Option<DisplayFromStr>")]
146    compression_codec: Option<CompressionCodec>,
147
148    /// Produce message timeout.
149    /// This value is used to limits the time a produced message waits for
150    /// successful delivery (including retries).
151    #[serde(rename = "properties.message.timeout.ms")]
152    #[serde_as(as = "Option<DisplayFromStr>")]
153    message_timeout_ms: Option<usize>,
154
155    /// The maximum number of unacknowledged requests the client will send on a single connection before blocking.
156    #[serde(
157        rename = "properties.max.in.flight.requests.per.connection",
158        default = "_default_max_in_flight_requests_per_connection"
159    )]
160    #[serde_as(as = "DisplayFromStr")]
161    max_in_flight_requests_per_connection: usize,
162
163    #[serde(rename = "properties.request.required.acks")]
164    #[serde_as(as = "Option<DisplayFromStr>")]
165    request_required_acks: Option<i32>,
166}
167
168impl RdKafkaPropertiesProducer {
169    pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
170        if let Some(v) = self.allow_auto_create_topics {
171            c.set("allow.auto.create.topics", v.to_string());
172        }
173        if let Some(v) = self.queue_buffering_max_messages {
174            c.set("queue.buffering.max.messages", v.to_string());
175        }
176        if let Some(v) = self.queue_buffering_max_kbytes {
177            c.set("queue.buffering.max.kbytes", v.to_string());
178        }
179        if let Some(v) = self.queue_buffering_max_ms {
180            c.set("queue.buffering.max.ms", v.to_string());
181        }
182        if let Some(v) = self.enable_idempotence {
183            c.set("enable.idempotence", v.to_string());
184        }
185        if let Some(v) = self.message_send_max_retries {
186            c.set("message.send.max.retries", v.to_string());
187        }
188        if let Some(v) = self.retry_backoff_ms {
189            c.set("retry.backoff.ms", v.to_string());
190        }
191        if let Some(v) = self.batch_num_messages {
192            c.set("batch.num.messages", v.to_string());
193        }
194        if let Some(v) = self.batch_size {
195            c.set("batch.size", v.to_string());
196        }
197        if let Some(v) = &self.compression_codec {
198            c.set("compression.codec", v.to_string());
199        }
200        if let Some(v) = self.request_required_acks {
201            c.set("request.required.acks", v.to_string());
202        }
203        if let Some(v) = self.message_timeout_ms {
204            c.set("message.timeout.ms", v.to_string());
205        }
206        c.set(
207            "max.in.flight.requests.per.connection",
208            self.max_in_flight_requests_per_connection.to_string(),
209        );
210    }
211}
212
213#[serde_as]
214#[derive(Debug, Clone, Deserialize, WithOptions)]
215pub struct KafkaConfig {
216    #[serde(flatten)]
217    pub common: KafkaCommon,
218
219    #[serde(flatten)]
220    pub connection: KafkaConnectionProps,
221
222    #[serde(
223        rename = "properties.retry.max",
224        default = "_default_max_retries",
225        deserialize_with = "deserialize_u32_from_string"
226    )]
227    pub max_retry_num: u32,
228
229    #[serde(
230        rename = "properties.retry.interval",
231        default = "_default_retry_backoff",
232        deserialize_with = "deserialize_duration_from_string"
233    )]
234    pub retry_interval: Duration,
235
236    /// We have parsed the primary key for an upsert kafka sink into a `usize` vector representing
237    /// the indices of the pk columns in the frontend, so we simply store the primary key here
238    /// as a string.
239    pub primary_key: Option<String>,
240
241    #[serde(flatten)]
242    pub rdkafka_properties_common: RdKafkaPropertiesCommon,
243
244    #[serde(flatten)]
245    pub rdkafka_properties_producer: RdKafkaPropertiesProducer,
246
247    #[serde(flatten)]
248    pub privatelink_common: KafkaPrivateLinkCommon,
249
250    #[serde(flatten)]
251    pub aws_auth_props: AwsAuthProps,
252}
253
254impl EnforceSecret for KafkaConfig {
255    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
256        KafkaConnectionProps::enforce_one(prop)?;
257        AwsAuthProps::enforce_one(prop)?;
258        Ok(())
259    }
260}
261
262impl KafkaConfig {
263    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
264        let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
265            .map_err(|e| SinkError::Config(anyhow!(e)))?;
266
267        Ok(config)
268    }
269
270    pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
271        self.rdkafka_properties_common.set_client(c);
272        self.rdkafka_properties_producer.set_client(c);
273    }
274}
275
276impl From<KafkaConfig> for KafkaProperties {
277    fn from(val: KafkaConfig) -> Self {
278        KafkaProperties {
279            bytes_per_second: None,
280            max_num_messages: None,
281            scan_startup_mode: None,
282            time_offset: None,
283            upsert: None,
284            common: val.common,
285            connection: val.connection,
286            rdkafka_properties_common: val.rdkafka_properties_common,
287            rdkafka_properties_consumer: Default::default(),
288            privatelink_common: val.privatelink_common,
289            aws_auth_props: val.aws_auth_props,
290            group_id_prefix: None,
291            unknown_fields: Default::default(),
292        }
293    }
294}
295
296#[derive(Debug)]
297pub struct KafkaSink {
298    pub config: KafkaConfig,
299    schema: Schema,
300    pk_indices: Vec<usize>,
301    format_desc: SinkFormatDesc,
302    db_name: String,
303    sink_from_name: String,
304}
305
306impl EnforceSecret for KafkaSink {
307    fn enforce_secret<'a>(
308        prop_iter: impl Iterator<Item = &'a str>,
309    ) -> crate::error::ConnectorResult<()> {
310        for prop in prop_iter {
311            KafkaConfig::enforce_one(prop)?;
312        }
313        Ok(())
314    }
315}
316
317impl TryFrom<SinkParam> for KafkaSink {
318    type Error = SinkError;
319
320    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
321        let schema = param.schema();
322        let config = KafkaConfig::from_btreemap(param.properties)?;
323        Ok(Self {
324            config,
325            schema,
326            pk_indices: param.downstream_pk,
327            format_desc: param
328                .format_desc
329                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
330            db_name: param.db_name,
331            sink_from_name: param.sink_from_name,
332        })
333    }
334}
335
336impl Sink for KafkaSink {
337    type Coordinator = DummySinkCommitCoordinator;
338    type LogSinker = AsyncTruncateLogSinkerOf<KafkaSinkWriter>;
339
340    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[
341        "properties.allow.auto.create.topics",
342        "properties.batch.num.messages",
343        "properties.batch.size",
344        "properties.enable.idempotence",
345        "properties.max.in.flight.requests.per.connection",
346        "properties.message.max.bytes",
347        "properties.message.send.max.retries",
348        "properties.message.timeout.ms",
349        "properties.queue.buffering.max.kbytes",
350        "properties.queue.buffering.max.messages",
351        "properties.queue.buffering.max.ms",
352        "properties.request.required.acks",
353        "properties.retry.backoff.ms",
354        "properties.receive.message.max.bytes",
355    ];
356    const SINK_NAME: &'static str = KAFKA_SINK;
357
358    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
359        let formatter = SinkFormatterImpl::new(
360            &self.format_desc,
361            self.schema.clone(),
362            self.pk_indices.clone(),
363            self.db_name.clone(),
364            self.sink_from_name.clone(),
365            &self.config.common.topic,
366        )
367        .await?;
368        let max_delivery_buffer_size = (self
369            .config
370            .rdkafka_properties_producer
371            .queue_buffering_max_messages
372            .as_ref()
373            .cloned()
374            .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
375            * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;
376
377        Ok(KafkaSinkWriter::new(self.config.clone(), formatter)
378            .await?
379            .into_log_sinker(max_delivery_buffer_size))
380    }
381
382    async fn validate(&self) -> Result<()> {
383        // For upsert Kafka sink, the primary key must be defined.
384        if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
385            return Err(SinkError::Config(anyhow!(
386                "primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
387                self.format_desc.format
388            )));
389        }
390        // Check for formatter constructor error, before it is too late for error reporting.
391        SinkFormatterImpl::new(
392            &self.format_desc,
393            self.schema.clone(),
394            self.pk_indices.clone(),
395            self.db_name.clone(),
396            self.sink_from_name.clone(),
397            &self.config.common.topic,
398        )
399        .await?;
400
401        // Try Kafka connection.
402        // There is no such interface for kafka producer to validate a connection
403        // use enumerator to validate broker reachability and existence of topic
404        let check = KafkaSplitEnumerator::new(
405            KafkaProperties::from(self.config.clone()),
406            Arc::new(SourceEnumeratorContext::dummy()),
407        )
408        .await?;
409        if !check.check_reachability().await {
410            return Err(SinkError::Config(anyhow!(
411                "cannot connect to kafka broker ({})",
412                self.config.connection.brokers
413            )));
414        }
415        Ok(())
416    }
417
418    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
419        KafkaConfig::from_btreemap(config.clone())?;
420        Ok(())
421    }
422}
423
424/// When the `DeliveryFuture` the current `future_delivery_buffer`
425/// is buffering is greater than `queue_buffering_max_messages` * `KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO`,
426/// then enforcing commit once
427const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
428/// The default queue size used to enforce a commit in kafka producer if `queue.buffering.max.messages` is not specified.
429/// This default value is determined based on the librdkafka default. See the following doc for more details:
430/// <https://github.com/confluentinc/librdkafka/blob/1cb80090dfc75f5a36eae3f4f8844b14885c045e/CONFIGURATION.md>
431const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;
432
433struct KafkaPayloadWriter<'a> {
434    inner: &'a FutureProducer<RwProducerContext>,
435    add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
436    config: &'a KafkaConfig,
437}
438
439mod opaque_type {
440    use super::*;
441    pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
442
443    pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
444        future.map(KafkaPayloadWriter::<'static>::map_future_result)
445    }
446}
447pub use opaque_type::KafkaSinkDeliveryFuture;
448use opaque_type::map_delivery_future;
449
450pub struct KafkaSinkWriter {
451    formatter: SinkFormatterImpl,
452    inner: FutureProducer<RwProducerContext>,
453    config: KafkaConfig,
454}
455
456impl KafkaSinkWriter {
457    async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
458        let inner: FutureProducer<RwProducerContext> = {
459            let mut c = ClientConfig::new();
460
461            // KafkaConfig configuration
462            config.connection.set_security_properties(&mut c);
463            config.set_client(&mut c);
464
465            // ClientConfig configuration
466            c.set("bootstrap.servers", &config.connection.brokers);
467
468            // Create the producer context, will be used to create the producer
469            let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
470            let ctx_common = KafkaContextCommon::new(
471                broker_rewrite_map,
472                None,
473                None,
474                config.aws_auth_props.clone(),
475                config.connection.is_aws_msk_iam(),
476            )
477            .await?;
478            let producer_ctx = RwProducerContext::new(ctx_common);
479            // Generate the producer
480
481            if let Some(log_level) = read_kafka_log_level() {
482                c.set_log_level(log_level);
483            }
484            c.create_with_context(producer_ctx).await?
485        };
486
487        Ok(KafkaSinkWriter {
488            formatter,
489            inner,
490            config: config.clone(),
491        })
492    }
493}
494
495impl AsyncTruncateSinkWriter for KafkaSinkWriter {
496    type DeliveryFuture = KafkaSinkDeliveryFuture;
497
498    async fn write_chunk<'a>(
499        &'a mut self,
500        chunk: StreamChunk,
501        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
502    ) -> Result<()> {
503        let mut payload_writer = KafkaPayloadWriter {
504            inner: &mut self.inner,
505            add_future,
506            config: &self.config,
507        };
508        dispatch_sink_formatter_impl!(&self.formatter, formatter, {
509            payload_writer.write_chunk(chunk, formatter).await
510        })
511    }
512}
513
514impl KafkaPayloadWriter<'_> {
515    /// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink
516    /// messages
517    async fn send_result<'a, K, P>(&'a mut self, mut record: FutureRecord<'a, K, P>) -> Result<()>
518    where
519        K: ToBytes + ?Sized,
520        P: ToBytes + ?Sized,
521    {
522        let mut success_flag = false;
523
524        let mut ret = Ok(());
525
526        for i in 0..self.config.max_retry_num {
527            match self.inner.send_result(record) {
528                Ok(delivery_future) => {
529                    if self
530                        .add_future
531                        .add_future_may_await(map_delivery_future(delivery_future))
532                        .await?
533                    {
534                        tracing::warn!(
535                            "Number of records being delivered ({}) >= expected kafka producer queue size ({}).
536                            This indicates the default value of queue.buffering.max.messages has changed.",
537                            self.add_future.future_count(),
538                            self.add_future.max_future_count()
539                        );
540                    }
541                    success_flag = true;
542                    break;
543                }
544                // The enqueue buffer is full, `send_result` will immediately return
545                // We can retry for another round after sleeping for sometime
546                Err((e, rec)) => {
547                    tracing::warn!(
548                        error = %e.as_report(),
549                        "producing message (key {:?}) to topic {} failed",
550                        rec.key.map(|k| k.to_bytes()),
551                        rec.topic,
552                    );
553                    record = rec;
554                    match e {
555                        KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
556                            tracing::warn!(
557                                "Producer queue full. Delivery future buffer size={}. Await and retry #{}",
558                                self.add_future.future_count(),
559                                i
560                            );
561                            self.add_future.await_one_delivery().await?;
562                            continue;
563                        }
564                        _ => return Err(e.into()),
565                    }
566                }
567            }
568        }
569
570        if !success_flag {
571            // In this case, after trying `max_retry_num`
572            // The enqueue buffer is still full
573            ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull).into());
574        }
575
576        ret
577    }
578
579    async fn write_inner(
580        &mut self,
581        event_key_object: Option<Vec<u8>>,
582        event_object: Option<Vec<u8>>,
583    ) -> Result<()> {
584        let topic = self.config.common.topic.clone();
585        let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str());
586        if let Some(key_str) = &event_key_object {
587            record = record.key(key_str);
588        }
589        if let Some(payload) = &event_object {
590            record = record.payload(payload);
591        }
592        // Send the data but not wait it to finish sinking
593        // Will join all `DeliveryFuture` during commit
594        self.send_result(record).await?;
595        Ok(())
596    }
597
598    fn map_future_result(delivery_future_result: <DeliveryFuture as Future>::Output) -> Result<()> {
599        match delivery_future_result {
600            // Successfully sent the record
601            // Will return the partition and offset of the message (i32, i64)
602            // Note that `Vec<()>` won't cause memory allocation
603            Ok(Ok(_)) => Ok(()),
604            // If the message failed to be delivered. (i.e., flush)
605            // The error & the copy of the original message will be returned
606            // i.e., (KafkaError, OwnedMessage)
607            // We will just stop the loop, and return the error
608            // The sink executor will back to the latest checkpoint
609            Ok(Err((k_err, _msg))) => Err(k_err.into()),
610            // This represents the producer is dropped
611            // before the delivery status is received
612            // Return `KafkaError::Canceled`
613            Err(_) => Err(KafkaError::Canceled.into()),
614        }
615    }
616}
617
618impl FormattedSink for KafkaPayloadWriter<'_> {
619    type K = Vec<u8>;
620    type V = Vec<u8>;
621
622    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
623        self.write_inner(k, v).await
624    }
625}
626
627#[cfg(test)]
628mod test {
629    use maplit::btreemap;
630    use risingwave_common::catalog::Field;
631    use risingwave_common::types::DataType;
632
633    use super::*;
634    use crate::sink::encoder::{
635        DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode,
636        TimestamptzHandlingMode,
637    };
638    use crate::sink::formatter::AppendOnlyFormatter;
639
640    #[test]
641    fn parse_rdkafka_props() {
642        let props: BTreeMap<String, String> = btreemap! {
643            // basic
644            // "connector".to_string() => "kafka".to_string(),
645            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
646            "topic".to_owned() => "test".to_owned(),
647            // "type".to_string() => "append-only".to_string(),
648            // RdKafkaPropertiesCommon
649            "properties.message.max.bytes".to_owned() => "12345".to_owned(),
650            "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
651            // RdKafkaPropertiesProducer
652            "properties.queue.buffering.max.messages".to_owned() => "114514".to_owned(),
653            "properties.queue.buffering.max.kbytes".to_owned() => "114514".to_owned(),
654            "properties.queue.buffering.max.ms".to_owned() => "114.514".to_owned(),
655            "properties.enable.idempotence".to_owned() => "false".to_owned(),
656            "properties.message.send.max.retries".to_owned() => "114514".to_owned(),
657            "properties.retry.backoff.ms".to_owned() => "114514".to_owned(),
658            "properties.batch.num.messages".to_owned() => "114514".to_owned(),
659            "properties.batch.size".to_owned() => "114514".to_owned(),
660            "properties.compression.codec".to_owned() => "zstd".to_owned(),
661            "properties.message.timeout.ms".to_owned() => "114514".to_owned(),
662            "properties.max.in.flight.requests.per.connection".to_owned() => "114514".to_owned(),
663            "properties.request.required.acks".to_owned() => "-1".to_owned(),
664        };
665        let c = KafkaConfig::from_btreemap(props).unwrap();
666        assert_eq!(
667            c.rdkafka_properties_producer.queue_buffering_max_ms,
668            Some(114.514f64)
669        );
670        assert_eq!(
671            c.rdkafka_properties_producer.compression_codec,
672            Some(CompressionCodec::Zstd)
673        );
674        assert_eq!(
675            c.rdkafka_properties_producer.message_timeout_ms,
676            Some(114514)
677        );
678        assert_eq!(
679            c.rdkafka_properties_producer
680                .max_in_flight_requests_per_connection,
681            114514
682        );
683        assert_eq!(
684            c.rdkafka_properties_producer.request_required_acks,
685            Some(-1)
686        );
687
688        let props: BTreeMap<String, String> = btreemap! {
689            // basic
690            "connector".to_owned() => "kafka".to_owned(),
691            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
692            "topic".to_owned() => "test".to_owned(),
693            "type".to_owned() => "append-only".to_owned(),
694
695            "properties.enable.idempotence".to_owned() => "True".to_owned(), // can only be 'true' or 'false'
696        };
697        assert!(KafkaConfig::from_btreemap(props).is_err());
698
699        let props: BTreeMap<String, String> = btreemap! {
700            // basic
701            "connector".to_owned() => "kafka".to_owned(),
702            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
703            "topic".to_owned() => "test".to_owned(),
704            "type".to_owned() => "append-only".to_owned(),
705            "properties.queue.buffering.max.kbytes".to_owned() => "-114514".to_owned(), // usize cannot be negative
706        };
707        assert!(KafkaConfig::from_btreemap(props).is_err());
708
709        let props: BTreeMap<String, String> = btreemap! {
710            // basic
711            "connector".to_owned() => "kafka".to_owned(),
712            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
713            "topic".to_owned() => "test".to_owned(),
714            "type".to_owned() => "append-only".to_owned(),
715            "properties.compression.codec".to_owned() => "notvalid".to_owned(), // has to be a valid CompressionCodec
716        };
717        assert!(KafkaConfig::from_btreemap(props).is_err());
718    }
719
720    #[test]
721    fn parse_kafka_config() {
722        let properties: BTreeMap<String, String> = btreemap! {
723            // "connector".to_string() => "kafka".to_string(),
724            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
725            "topic".to_owned() => "test".to_owned(),
726            // "type".to_string() => "append-only".to_string(),
727            // "force_append_only".to_string() => "true".to_string(),
728            "properties.security.protocol".to_owned() => "SASL".to_owned(),
729            "properties.sasl.mechanism".to_owned() => "SASL".to_owned(),
730            "properties.sasl.username".to_owned() => "test".to_owned(),
731            "properties.sasl.password".to_owned() => "test".to_owned(),
732            "properties.retry.max".to_owned() => "20".to_owned(),
733            "properties.retry.interval".to_owned() => "500ms".to_owned(),
734            // PrivateLink
735            "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
736        };
737        let config = KafkaConfig::from_btreemap(properties).unwrap();
738        assert_eq!(config.connection.brokers, "localhost:9092");
739        assert_eq!(config.common.topic, "test");
740        assert_eq!(config.max_retry_num, 20);
741        assert_eq!(config.retry_interval, Duration::from_millis(500));
742
743        // PrivateLink fields
744        let btreemap: BTreeMap<String, String> = btreemap! {
745            "broker1".to_owned() => "10.0.0.1:8001".to_owned()
746        };
747        assert_eq!(config.privatelink_common.broker_rewrite_map, Some(btreemap));
748
749        // Optional fields eliminated.
750        let properties: BTreeMap<String, String> = btreemap! {
751            // "connector".to_string() => "kafka".to_string(),
752            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
753            "topic".to_owned() => "test".to_owned(),
754            // "type".to_string() => "upsert".to_string(),
755        };
756        let config = KafkaConfig::from_btreemap(properties).unwrap();
757        assert_eq!(config.max_retry_num, 3);
758        assert_eq!(config.retry_interval, Duration::from_millis(100));
759
760        // Invalid u32 input.
761        let properties: BTreeMap<String, String> = btreemap! {
762            "connector".to_owned() => "kafka".to_owned(),
763            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
764            "topic".to_owned() => "test".to_owned(),
765            "type".to_owned() => "upsert".to_owned(),
766            "properties.retry.max".to_owned() => "-20".to_owned(),  // error!
767        };
768        assert!(KafkaConfig::from_btreemap(properties).is_err());
769
770        // Invalid duration input.
771        let properties: BTreeMap<String, String> = btreemap! {
772            "connector".to_owned() => "kafka".to_owned(),
773            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
774            "topic".to_owned() => "test".to_owned(),
775            "type".to_owned() => "upsert".to_owned(),
776            "properties.retry.interval".to_owned() => "500miiinutes".to_owned(),  // invalid duration
777        };
778        assert!(KafkaConfig::from_btreemap(properties).is_err());
779    }
780
781    /// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore]
782    /// to run the test, also remember to modify `risedev.yml`
783    #[ignore]
784    #[tokio::test]
785    async fn test_kafka_producer() -> Result<()> {
786        // Create a dummy kafka properties
787        let properties = btreemap! {
788            "connector".to_owned() => "kafka".to_owned(),
789            "properties.bootstrap.server".to_owned() => "localhost:29092".to_owned(),
790            "type".to_owned() => "append-only".to_owned(),
791            "topic".to_owned() => "test_topic".to_owned(),
792            "properties.compression.codec".to_owned() => "zstd".to_owned(),
793        };
794
795        // Create a table with two columns (| id : INT32 | v2 : VARCHAR |) here
796        let schema = Schema::new(vec![
797            Field {
798                data_type: DataType::Int32,
799                name: "id".into(),
800            },
801            Field {
802                data_type: DataType::Varchar,
803                name: "v2".into(),
804            },
805        ]);
806
807        let kafka_config = KafkaConfig::from_btreemap(properties)?;
808
809        // Create the actual sink writer to Kafka
810        let sink = KafkaSinkWriter::new(
811            kafka_config.clone(),
812            SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
813                // We do not specify primary key for this schema
814                None,
815                JsonEncoder::new(
816                    schema,
817                    None,
818                    DateHandlingMode::FromCe,
819                    TimestampHandlingMode::Milli,
820                    TimestamptzHandlingMode::UtcString,
821                    TimeHandlingMode::Milli,
822                    JsonbHandlingMode::String,
823                ),
824            )),
825        )
826        .await
827        .unwrap();
828
829        use crate::sink::log_store::DeliveryFutureManager;
830
831        let mut future_manager = DeliveryFutureManager::new(usize::MAX);
832
833        for i in 0..10 {
834            println!("epoch: {}", i);
835            for j in 0..100 {
836                let mut writer = KafkaPayloadWriter {
837                    inner: &sink.inner,
838                    add_future: future_manager.start_write_chunk(i, j),
839                    config: &sink.config,
840                };
841                match writer
842                    .send_result(
843                        FutureRecord::to(kafka_config.common.topic.as_str())
844                            .payload(format!("value-{}", j).as_bytes())
845                            .key(format!("dummy_key_for_epoch-{}", i).as_bytes()),
846                    )
847                    .await
848                {
849                    Ok(_) => {}
850                    Err(e) => {
851                        println!("{:?}", e);
852                        break;
853                    }
854                };
855            }
856        }
857
858        Ok(())
859    }
860}