risingwave_connector/sink/
kafka.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::{Future, FutureExt, TryFuture};
22use rdkafka::ClientConfig;
23use rdkafka::error::KafkaError;
24use rdkafka::message::ToBytes;
25use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
26use rdkafka::types::RDKafkaErrorCode;
27use risingwave_common::array::StreamChunk;
28use risingwave_common::catalog::Schema;
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use strum_macros::{Display, EnumString};
32use thiserror_ext::AsReport;
33use with_options::WithOptions;
34
35use super::catalog::{SinkFormat, SinkFormatDesc};
36use super::{Sink, SinkError, SinkParam};
37use crate::connector_common::{
38    AwsAuthProps, KafkaCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
39    RdKafkaPropertiesCommon, read_kafka_log_level,
40};
41use crate::enforce_secret::EnforceSecret;
42use crate::sink::formatter::SinkFormatterImpl;
43use crate::sink::log_store::DeliveryFutureManagerAddFuture;
44use crate::sink::writer::{
45    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
46};
47use crate::sink::{Result, SinkWriterParam};
48use crate::source::kafka::{
49    KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
50};
51use crate::source::{SourceEnumeratorContext, SplitEnumerator};
52use crate::{
53    deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
54};
55
56pub const KAFKA_SINK: &str = "kafka";
57
58const fn _default_max_retries() -> u32 {
59    3
60}
61
62const fn _default_retry_backoff() -> Duration {
63    Duration::from_millis(100)
64}
65
66const fn _default_max_in_flight_requests_per_connection() -> usize {
67    5
68}
69
70#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
71#[strum(serialize_all = "snake_case")]
72pub enum CompressionCodec {
73    None,
74    Gzip,
75    Snappy,
76    Lz4,
77    Zstd,
78}
79
80/// See <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
81/// for the detailed meaning of these librdkafka producer properties
82#[serde_as]
83#[derive(Debug, Clone, Deserialize, WithOptions)]
84pub struct RdKafkaPropertiesProducer {
85    /// Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics.
86    #[serde(rename = "properties.allow.auto.create.topics")]
87    #[serde_as(as = "Option<DisplayFromStr>")]
88    #[with_option(allow_alter_on_fly)]
89    pub allow_auto_create_topics: Option<bool>,
90
91    /// Maximum number of messages allowed on the producer queue. This queue is shared by all
92    /// topics and partitions. A value of 0 disables this limit.
93    #[serde(rename = "properties.queue.buffering.max.messages")]
94    #[serde_as(as = "Option<DisplayFromStr>")]
95    #[with_option(allow_alter_on_fly)]
96    pub queue_buffering_max_messages: Option<usize>,
97
98    /// Maximum total message size sum allowed on the producer queue. This queue is shared by all
99    /// topics and partitions. This property has higher priority than queue.buffering.max.messages.
100    #[serde(rename = "properties.queue.buffering.max.kbytes")]
101    #[serde_as(as = "Option<DisplayFromStr>")]
102    #[with_option(allow_alter_on_fly)]
103    queue_buffering_max_kbytes: Option<usize>,
104
105    /// Delay in milliseconds to wait for messages in the producer queue to accumulate before
106    /// constructing message batches (`MessageSets`) to transmit to brokers. A higher value allows
107    /// larger and more effective (less overhead, improved compression) batches of messages to
108    /// accumulate at the expense of increased message delivery latency.
109    #[serde(rename = "properties.queue.buffering.max.ms")]
110    #[serde_as(as = "Option<DisplayFromStr>")]
111    #[with_option(allow_alter_on_fly)]
112    queue_buffering_max_ms: Option<f64>,
113
114    /// When set to true, the producer will ensure that messages are successfully produced exactly
115    /// once and in the original produce order. The following configuration properties are adjusted
116    /// automatically (if not modified by the user) when idempotence is enabled:
117    /// max.in.flight.requests.per.connection=5 (must be less than or equal to 5),
118    /// `retries=INT32_MAX` (must be greater than 0), acks=all, queuing.strategy=fifo. Producer
119    /// will fail if user-supplied configuration is incompatible.
120    #[serde(rename = "properties.enable.idempotence")]
121    #[serde_as(as = "Option<DisplayFromStr>")]
122    #[with_option(allow_alter_on_fly)]
123    enable_idempotence: Option<bool>,
124
125    /// How many times to retry sending a failing Message.
126    #[serde(rename = "properties.message.send.max.retries")]
127    #[serde_as(as = "Option<DisplayFromStr>")]
128    #[with_option(allow_alter_on_fly)]
129    message_send_max_retries: Option<usize>,
130
131    /// The backoff time in milliseconds before retrying a protocol request.
132    #[serde(rename = "properties.retry.backoff.ms")]
133    #[serde_as(as = "Option<DisplayFromStr>")]
134    #[with_option(allow_alter_on_fly)]
135    retry_backoff_ms: Option<usize>,
136
137    /// Maximum number of messages batched in one `MessageSet`
138    #[serde(rename = "properties.batch.num.messages")]
139    #[serde_as(as = "Option<DisplayFromStr>")]
140    #[with_option(allow_alter_on_fly)]
141    batch_num_messages: Option<usize>,
142
143    /// Maximum size (in bytes) of all messages batched in one `MessageSet`, including protocol
144    /// framing overhead. This limit is applied after the first message has been added to the
145    /// batch, regardless of the first message's size, this is to ensure that messages that exceed
146    /// batch.size are produced.
147    #[serde(rename = "properties.batch.size")]
148    #[serde_as(as = "Option<DisplayFromStr>")]
149    #[with_option(allow_alter_on_fly)]
150    batch_size: Option<usize>,
151
152    /// Compression codec to use for compressing message sets.
153    #[serde(rename = "properties.compression.codec")]
154    #[serde_as(as = "Option<DisplayFromStr>")]
155    compression_codec: Option<CompressionCodec>,
156
157    /// Produce message timeout.
158    /// This value is used to limits the time a produced message waits for
159    /// successful delivery (including retries).
160    #[serde(rename = "properties.message.timeout.ms")]
161    #[serde_as(as = "Option<DisplayFromStr>")]
162    #[with_option(allow_alter_on_fly)]
163    message_timeout_ms: Option<usize>,
164
165    /// The maximum number of unacknowledged requests the client will send on a single connection before blocking.
166    #[serde(
167        rename = "properties.max.in.flight.requests.per.connection",
168        default = "_default_max_in_flight_requests_per_connection"
169    )]
170    #[serde_as(as = "DisplayFromStr")]
171    #[with_option(allow_alter_on_fly)]
172    max_in_flight_requests_per_connection: usize,
173
174    #[serde(rename = "properties.request.required.acks")]
175    #[serde_as(as = "Option<DisplayFromStr>")]
176    #[with_option(allow_alter_on_fly)]
177    request_required_acks: Option<i32>,
178}
179
180impl RdKafkaPropertiesProducer {
181    pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
182        if let Some(v) = self.allow_auto_create_topics {
183            c.set("allow.auto.create.topics", v.to_string());
184        }
185        if let Some(v) = self.queue_buffering_max_messages {
186            c.set("queue.buffering.max.messages", v.to_string());
187        }
188        if let Some(v) = self.queue_buffering_max_kbytes {
189            c.set("queue.buffering.max.kbytes", v.to_string());
190        }
191        if let Some(v) = self.queue_buffering_max_ms {
192            c.set("queue.buffering.max.ms", v.to_string());
193        }
194        if let Some(v) = self.enable_idempotence {
195            c.set("enable.idempotence", v.to_string());
196        }
197        if let Some(v) = self.message_send_max_retries {
198            c.set("message.send.max.retries", v.to_string());
199        }
200        if let Some(v) = self.retry_backoff_ms {
201            c.set("retry.backoff.ms", v.to_string());
202        }
203        if let Some(v) = self.batch_num_messages {
204            c.set("batch.num.messages", v.to_string());
205        }
206        if let Some(v) = self.batch_size {
207            c.set("batch.size", v.to_string());
208        }
209        if let Some(v) = &self.compression_codec {
210            c.set("compression.codec", v.to_string());
211        }
212        if let Some(v) = self.request_required_acks {
213            c.set("request.required.acks", v.to_string());
214        }
215        if let Some(v) = self.message_timeout_ms {
216            c.set("message.timeout.ms", v.to_string());
217        }
218        c.set(
219            "max.in.flight.requests.per.connection",
220            self.max_in_flight_requests_per_connection.to_string(),
221        );
222    }
223}
224
225#[serde_as]
226#[derive(Debug, Clone, Deserialize, WithOptions)]
227pub struct KafkaConfig {
228    #[serde(flatten)]
229    pub common: KafkaCommon,
230
231    #[serde(flatten)]
232    pub connection: KafkaConnectionProps,
233
234    #[serde(
235        rename = "properties.retry.max",
236        default = "_default_max_retries",
237        deserialize_with = "deserialize_u32_from_string"
238    )]
239    pub max_retry_num: u32,
240
241    #[serde(
242        rename = "properties.retry.interval",
243        default = "_default_retry_backoff",
244        deserialize_with = "deserialize_duration_from_string"
245    )]
246    pub retry_interval: Duration,
247
248    /// We have parsed the primary key for an upsert kafka sink into a `usize` vector representing
249    /// the indices of the pk columns in the frontend, so we simply store the primary key here
250    /// as a string.
251    pub primary_key: Option<String>,
252
253    #[serde(flatten)]
254    pub rdkafka_properties_common: RdKafkaPropertiesCommon,
255
256    #[serde(flatten)]
257    pub rdkafka_properties_producer: RdKafkaPropertiesProducer,
258
259    #[serde(flatten)]
260    pub privatelink_common: KafkaPrivateLinkCommon,
261
262    #[serde(flatten)]
263    pub aws_auth_props: AwsAuthProps,
264}
265
266impl EnforceSecret for KafkaConfig {
267    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
268        KafkaConnectionProps::enforce_one(prop)?;
269        AwsAuthProps::enforce_one(prop)?;
270        Ok(())
271    }
272}
273
274impl KafkaConfig {
275    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
276        let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
277            .map_err(|e| SinkError::Config(anyhow!(e)))?;
278
279        Ok(config)
280    }
281
282    pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
283        self.rdkafka_properties_common.set_client(c);
284        self.rdkafka_properties_producer.set_client(c);
285    }
286}
287
288impl From<KafkaConfig> for KafkaProperties {
289    fn from(val: KafkaConfig) -> Self {
290        KafkaProperties {
291            bytes_per_second: None,
292            max_num_messages: None,
293            scan_startup_mode: None,
294            time_offset: None,
295            upsert: None,
296            common: val.common,
297            connection: val.connection,
298            rdkafka_properties_common: val.rdkafka_properties_common,
299            rdkafka_properties_consumer: Default::default(),
300            privatelink_common: val.privatelink_common,
301            aws_auth_props: val.aws_auth_props,
302            group_id_prefix: None,
303            unknown_fields: Default::default(),
304        }
305    }
306}
307
308#[derive(Debug)]
309pub struct KafkaSink {
310    pub config: KafkaConfig,
311    schema: Schema,
312    pk_indices: Vec<usize>,
313    format_desc: SinkFormatDesc,
314    db_name: String,
315    sink_from_name: String,
316}
317
318impl EnforceSecret for KafkaSink {
319    fn enforce_secret<'a>(
320        prop_iter: impl Iterator<Item = &'a str>,
321    ) -> crate::error::ConnectorResult<()> {
322        for prop in prop_iter {
323            KafkaConfig::enforce_one(prop)?;
324        }
325        Ok(())
326    }
327}
328
329impl TryFrom<SinkParam> for KafkaSink {
330    type Error = SinkError;
331
332    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
333        let schema = param.schema();
334        let config = KafkaConfig::from_btreemap(param.properties)?;
335        Ok(Self {
336            config,
337            schema,
338            pk_indices: param.downstream_pk,
339            format_desc: param
340                .format_desc
341                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
342            db_name: param.db_name,
343            sink_from_name: param.sink_from_name,
344        })
345    }
346}
347
348impl Sink for KafkaSink {
349    type LogSinker = AsyncTruncateLogSinkerOf<KafkaSinkWriter>;
350
351    const SINK_NAME: &'static str = KAFKA_SINK;
352
353    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
354        let formatter = SinkFormatterImpl::new(
355            &self.format_desc,
356            self.schema.clone(),
357            self.pk_indices.clone(),
358            self.db_name.clone(),
359            self.sink_from_name.clone(),
360            &self.config.common.topic,
361        )
362        .await?;
363        let max_delivery_buffer_size = (self
364            .config
365            .rdkafka_properties_producer
366            .queue_buffering_max_messages
367            .as_ref()
368            .cloned()
369            .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
370            * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;
371
372        Ok(KafkaSinkWriter::new(self.config.clone(), formatter)
373            .await?
374            .into_log_sinker(max_delivery_buffer_size))
375    }
376
377    async fn validate(&self) -> Result<()> {
378        // For upsert Kafka sink, the primary key must be defined.
379        if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
380            return Err(SinkError::Config(anyhow!(
381                "primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
382                self.format_desc.format
383            )));
384        }
385        // Check for formatter constructor error, before it is too late for error reporting.
386        SinkFormatterImpl::new(
387            &self.format_desc,
388            self.schema.clone(),
389            self.pk_indices.clone(),
390            self.db_name.clone(),
391            self.sink_from_name.clone(),
392            &self.config.common.topic,
393        )
394        .await?;
395
396        // Try Kafka connection.
397        // There is no such interface for kafka producer to validate a connection
398        // use enumerator to validate broker reachability and existence of topic
399        let check = KafkaSplitEnumerator::new(
400            KafkaProperties::from(self.config.clone()),
401            Arc::new(SourceEnumeratorContext::dummy()),
402        )
403        .await?;
404        if let Err(e) = check.check_reachability().await {
405            return Err(SinkError::Config(
406                anyhow!(
407                    "cannot connect to kafka broker ({})",
408                    self.config.connection.brokers,
409                )
410                .context(e),
411            ));
412        }
413        Ok(())
414    }
415
416    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
417        KafkaConfig::from_btreemap(config.clone())?;
418        Ok(())
419    }
420}
421
422/// When the `DeliveryFuture` the current `future_delivery_buffer`
423/// is buffering is greater than `queue_buffering_max_messages` * `KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO`,
424/// then enforcing commit once
425const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
426/// The default queue size used to enforce a commit in kafka producer if `queue.buffering.max.messages` is not specified.
427/// This default value is determined based on the librdkafka default. See the following doc for more details:
428/// <https://github.com/confluentinc/librdkafka/blob/1cb80090dfc75f5a36eae3f4f8844b14885c045e/CONFIGURATION.md>
429const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;
430
431struct KafkaPayloadWriter<'a> {
432    inner: &'a FutureProducer<RwProducerContext>,
433    add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
434    config: &'a KafkaConfig,
435}
436
437mod opaque_type {
438    use super::*;
439    pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
440
441    #[define_opaque(KafkaSinkDeliveryFuture)]
442    pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
443        future.map(KafkaPayloadWriter::<'static>::map_future_result)
444    }
445}
446pub use opaque_type::KafkaSinkDeliveryFuture;
447use opaque_type::map_delivery_future;
448
449pub struct KafkaSinkWriter {
450    formatter: SinkFormatterImpl,
451    inner: FutureProducer<RwProducerContext>,
452    config: KafkaConfig,
453}
454
455impl KafkaSinkWriter {
456    async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
457        let inner: FutureProducer<RwProducerContext> = {
458            let mut c = ClientConfig::new();
459
460            // KafkaConfig configuration
461            config.connection.set_security_properties(&mut c);
462            config.set_client(&mut c);
463
464            // ClientConfig configuration
465            c.set("bootstrap.servers", &config.connection.brokers);
466
467            // Create the producer context, will be used to create the producer
468            let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
469            let ctx_common = KafkaContextCommon::new(
470                broker_rewrite_map,
471                None,
472                None,
473                config.aws_auth_props.clone(),
474                config.connection.is_aws_msk_iam(),
475            )
476            .await?;
477            let producer_ctx = RwProducerContext::new(ctx_common);
478            // Generate the producer
479
480            if let Some(log_level) = read_kafka_log_level() {
481                c.set_log_level(log_level);
482            }
483            c.create_with_context(producer_ctx).await?
484        };
485
486        Ok(KafkaSinkWriter {
487            formatter,
488            inner,
489            config: config.clone(),
490        })
491    }
492}
493
494impl AsyncTruncateSinkWriter for KafkaSinkWriter {
495    type DeliveryFuture = KafkaSinkDeliveryFuture;
496
497    async fn write_chunk<'a>(
498        &'a mut self,
499        chunk: StreamChunk,
500        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
501    ) -> Result<()> {
502        let mut payload_writer = KafkaPayloadWriter {
503            inner: &mut self.inner,
504            add_future,
505            config: &self.config,
506        };
507        dispatch_sink_formatter_impl!(&self.formatter, formatter, {
508            payload_writer.write_chunk(chunk, formatter).await
509        })
510    }
511}
512
513impl KafkaPayloadWriter<'_> {
514    /// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink
515    /// messages
516    async fn send_result<'a, K, P>(&'a mut self, mut record: FutureRecord<'a, K, P>) -> Result<()>
517    where
518        K: ToBytes + ?Sized,
519        P: ToBytes + ?Sized,
520    {
521        let mut success_flag = false;
522
523        let mut ret = Ok(());
524
525        for i in 0..self.config.max_retry_num {
526            match self.inner.send_result(record) {
527                Ok(delivery_future) => {
528                    if self
529                        .add_future
530                        .add_future_may_await(map_delivery_future(delivery_future))
531                        .await?
532                    {
533                        tracing::warn!(
534                            "Number of records being delivered ({}) >= expected kafka producer queue size ({}).
535                            This indicates the default value of queue.buffering.max.messages has changed.",
536                            self.add_future.future_count(),
537                            self.add_future.max_future_count()
538                        );
539                    }
540                    success_flag = true;
541                    break;
542                }
543                // The enqueue buffer is full, `send_result` will immediately return
544                // We can retry for another round after sleeping for sometime
545                Err((e, rec)) => {
546                    tracing::warn!(
547                        error = %e.as_report(),
548                        "producing message (key {:?}) to topic {} failed",
549                        rec.key.map(|k| k.to_bytes()),
550                        rec.topic,
551                    );
552                    record = rec;
553                    match e {
554                        KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
555                            tracing::warn!(
556                                "Producer queue full. Delivery future buffer size={}. Await and retry #{}",
557                                self.add_future.future_count(),
558                                i
559                            );
560                            self.add_future.await_one_delivery().await?;
561                            continue;
562                        }
563                        _ => return Err(e.into()),
564                    }
565                }
566            }
567        }
568
569        if !success_flag {
570            // In this case, after trying `max_retry_num`
571            // The enqueue buffer is still full
572            ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull).into());
573        }
574
575        ret
576    }
577
578    async fn write_inner(
579        &mut self,
580        event_key_object: Option<Vec<u8>>,
581        event_object: Option<Vec<u8>>,
582    ) -> Result<()> {
583        let topic = self.config.common.topic.clone();
584        let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str());
585        if let Some(key_str) = &event_key_object {
586            record = record.key(key_str);
587        }
588        if let Some(payload) = &event_object {
589            record = record.payload(payload);
590        }
591        // Send the data but not wait it to finish sinking
592        // Will join all `DeliveryFuture` during commit
593        self.send_result(record).await?;
594        Ok(())
595    }
596
597    fn map_future_result(delivery_future_result: <DeliveryFuture as Future>::Output) -> Result<()> {
598        match delivery_future_result {
599            // Successfully sent the record
600            // Will return the partition and offset of the message (i32, i64)
601            // Note that `Vec<()>` won't cause memory allocation
602            Ok(Ok(_)) => Ok(()),
603            // If the message failed to be delivered. (i.e., flush)
604            // The error & the copy of the original message will be returned
605            // i.e., (KafkaError, OwnedMessage)
606            // We will just stop the loop, and return the error
607            // The sink executor will back to the latest checkpoint
608            Ok(Err((k_err, _msg))) => Err(k_err.into()),
609            // This represents the producer is dropped
610            // before the delivery status is received
611            // Return `KafkaError::Canceled`
612            Err(_) => Err(KafkaError::Canceled.into()),
613        }
614    }
615}
616
617impl FormattedSink for KafkaPayloadWriter<'_> {
618    type K = Vec<u8>;
619    type V = Vec<u8>;
620
621    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
622        self.write_inner(k, v).await
623    }
624}
625
626#[cfg(test)]
627mod test {
628    use maplit::btreemap;
629    use risingwave_common::catalog::Field;
630    use risingwave_common::types::DataType;
631
632    use super::*;
633    use crate::sink::encoder::{
634        DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode,
635        TimestamptzHandlingMode,
636    };
637    use crate::sink::formatter::AppendOnlyFormatter;
638
639    #[test]
640    fn parse_rdkafka_props() {
641        let props: BTreeMap<String, String> = btreemap! {
642            // basic
643            // "connector".to_string() => "kafka".to_string(),
644            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
645            "topic".to_owned() => "test".to_owned(),
646            // "type".to_string() => "append-only".to_string(),
647            // RdKafkaPropertiesCommon
648            "properties.message.max.bytes".to_owned() => "12345".to_owned(),
649            "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
650            // RdKafkaPropertiesProducer
651            "properties.queue.buffering.max.messages".to_owned() => "114514".to_owned(),
652            "properties.queue.buffering.max.kbytes".to_owned() => "114514".to_owned(),
653            "properties.queue.buffering.max.ms".to_owned() => "114.514".to_owned(),
654            "properties.enable.idempotence".to_owned() => "false".to_owned(),
655            "properties.message.send.max.retries".to_owned() => "114514".to_owned(),
656            "properties.retry.backoff.ms".to_owned() => "114514".to_owned(),
657            "properties.batch.num.messages".to_owned() => "114514".to_owned(),
658            "properties.batch.size".to_owned() => "114514".to_owned(),
659            "properties.compression.codec".to_owned() => "zstd".to_owned(),
660            "properties.message.timeout.ms".to_owned() => "114514".to_owned(),
661            "properties.max.in.flight.requests.per.connection".to_owned() => "114514".to_owned(),
662            "properties.request.required.acks".to_owned() => "-1".to_owned(),
663        };
664        let c = KafkaConfig::from_btreemap(props).unwrap();
665        assert_eq!(
666            c.rdkafka_properties_producer.queue_buffering_max_ms,
667            Some(114.514f64)
668        );
669        assert_eq!(
670            c.rdkafka_properties_producer.compression_codec,
671            Some(CompressionCodec::Zstd)
672        );
673        assert_eq!(
674            c.rdkafka_properties_producer.message_timeout_ms,
675            Some(114514)
676        );
677        assert_eq!(
678            c.rdkafka_properties_producer
679                .max_in_flight_requests_per_connection,
680            114514
681        );
682        assert_eq!(
683            c.rdkafka_properties_producer.request_required_acks,
684            Some(-1)
685        );
686
687        let props: BTreeMap<String, String> = btreemap! {
688            // basic
689            "connector".to_owned() => "kafka".to_owned(),
690            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
691            "topic".to_owned() => "test".to_owned(),
692            "type".to_owned() => "append-only".to_owned(),
693
694            "properties.enable.idempotence".to_owned() => "True".to_owned(), // can only be 'true' or 'false'
695        };
696        assert!(KafkaConfig::from_btreemap(props).is_err());
697
698        let props: BTreeMap<String, String> = btreemap! {
699            // basic
700            "connector".to_owned() => "kafka".to_owned(),
701            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
702            "topic".to_owned() => "test".to_owned(),
703            "type".to_owned() => "append-only".to_owned(),
704            "properties.queue.buffering.max.kbytes".to_owned() => "-114514".to_owned(), // usize cannot be negative
705        };
706        assert!(KafkaConfig::from_btreemap(props).is_err());
707
708        let props: BTreeMap<String, String> = btreemap! {
709            // basic
710            "connector".to_owned() => "kafka".to_owned(),
711            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
712            "topic".to_owned() => "test".to_owned(),
713            "type".to_owned() => "append-only".to_owned(),
714            "properties.compression.codec".to_owned() => "notvalid".to_owned(), // has to be a valid CompressionCodec
715        };
716        assert!(KafkaConfig::from_btreemap(props).is_err());
717    }
718
719    #[test]
720    fn parse_kafka_config() {
721        let properties: BTreeMap<String, String> = btreemap! {
722            // "connector".to_string() => "kafka".to_string(),
723            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
724            "topic".to_owned() => "test".to_owned(),
725            // "type".to_string() => "append-only".to_string(),
726            // "force_append_only".to_string() => "true".to_string(),
727            "properties.security.protocol".to_owned() => "SASL".to_owned(),
728            "properties.sasl.mechanism".to_owned() => "SASL".to_owned(),
729            "properties.sasl.username".to_owned() => "test".to_owned(),
730            "properties.sasl.password".to_owned() => "test".to_owned(),
731            "properties.retry.max".to_owned() => "20".to_owned(),
732            "properties.retry.interval".to_owned() => "500ms".to_owned(),
733            // PrivateLink
734            "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
735        };
736        let config = KafkaConfig::from_btreemap(properties).unwrap();
737        assert_eq!(config.connection.brokers, "localhost:9092");
738        assert_eq!(config.common.topic, "test");
739        assert_eq!(config.max_retry_num, 20);
740        assert_eq!(config.retry_interval, Duration::from_millis(500));
741
742        // PrivateLink fields
743        let btreemap: BTreeMap<String, String> = btreemap! {
744            "broker1".to_owned() => "10.0.0.1:8001".to_owned()
745        };
746        assert_eq!(config.privatelink_common.broker_rewrite_map, Some(btreemap));
747
748        // Optional fields eliminated.
749        let properties: BTreeMap<String, String> = btreemap! {
750            // "connector".to_string() => "kafka".to_string(),
751            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
752            "topic".to_owned() => "test".to_owned(),
753            // "type".to_string() => "upsert".to_string(),
754        };
755        let config = KafkaConfig::from_btreemap(properties).unwrap();
756        assert_eq!(config.max_retry_num, 3);
757        assert_eq!(config.retry_interval, Duration::from_millis(100));
758
759        // Invalid u32 input.
760        let properties: BTreeMap<String, String> = btreemap! {
761            "connector".to_owned() => "kafka".to_owned(),
762            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
763            "topic".to_owned() => "test".to_owned(),
764            "type".to_owned() => "upsert".to_owned(),
765            "properties.retry.max".to_owned() => "-20".to_owned(),  // error!
766        };
767        assert!(KafkaConfig::from_btreemap(properties).is_err());
768
769        // Invalid duration input.
770        let properties: BTreeMap<String, String> = btreemap! {
771            "connector".to_owned() => "kafka".to_owned(),
772            "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
773            "topic".to_owned() => "test".to_owned(),
774            "type".to_owned() => "upsert".to_owned(),
775            "properties.retry.interval".to_owned() => "500miiinutes".to_owned(),  // invalid duration
776        };
777        assert!(KafkaConfig::from_btreemap(properties).is_err());
778    }
779
780    /// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore]
781    /// to run the test, also remember to modify `risedev.yml`
782    #[ignore]
783    #[tokio::test]
784    async fn test_kafka_producer() -> Result<()> {
785        // Create a dummy kafka properties
786        let properties = btreemap! {
787            "connector".to_owned() => "kafka".to_owned(),
788            "properties.bootstrap.server".to_owned() => "localhost:29092".to_owned(),
789            "type".to_owned() => "append-only".to_owned(),
790            "topic".to_owned() => "test_topic".to_owned(),
791            "properties.compression.codec".to_owned() => "zstd".to_owned(),
792        };
793
794        // Create a table with two columns (| id : INT32 | v2 : VARCHAR |) here
795        let schema = Schema::new(vec![
796            Field {
797                data_type: DataType::Int32,
798                name: "id".into(),
799            },
800            Field {
801                data_type: DataType::Varchar,
802                name: "v2".into(),
803            },
804        ]);
805
806        let kafka_config = KafkaConfig::from_btreemap(properties)?;
807
808        // Create the actual sink writer to Kafka
809        let sink = KafkaSinkWriter::new(
810            kafka_config.clone(),
811            SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
812                // We do not specify primary key for this schema
813                None,
814                JsonEncoder::new(
815                    schema,
816                    None,
817                    DateHandlingMode::FromCe,
818                    TimestampHandlingMode::Milli,
819                    TimestamptzHandlingMode::UtcString,
820                    TimeHandlingMode::Milli,
821                    JsonbHandlingMode::String,
822                ),
823            )),
824        )
825        .await
826        .unwrap();
827
828        use crate::sink::log_store::DeliveryFutureManager;
829
830        let mut future_manager = DeliveryFutureManager::new(usize::MAX);
831
832        for i in 0..10 {
833            println!("epoch: {}", i);
834            for j in 0..100 {
835                let mut writer = KafkaPayloadWriter {
836                    inner: &sink.inner,
837                    add_future: future_manager.start_write_chunk(i, j),
838                    config: &sink.config,
839                };
840                match writer
841                    .send_result(
842                        FutureRecord::to(kafka_config.common.topic.as_str())
843                            .payload(format!("value-{}", j).as_bytes())
844                            .key(format!("dummy_key_for_epoch-{}", i).as_bytes()),
845                    )
846                    .await
847                {
848                    Ok(_) => {}
849                    Err(e) => {
850                        println!("{:?}", e);
851                        break;
852                    }
853                };
854            }
855        }
856
857        Ok(())
858    }
859}