1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::{Future, FutureExt, TryFuture};
22use rdkafka::ClientConfig;
23use rdkafka::error::KafkaError;
24use rdkafka::message::ToBytes;
25use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
26use rdkafka::types::RDKafkaErrorCode;
27use risingwave_common::array::StreamChunk;
28use risingwave_common::catalog::Schema;
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use strum_macros::{Display, EnumString};
32use thiserror_ext::AsReport;
33use with_options::WithOptions;
34
35use super::catalog::{SinkFormat, SinkFormatDesc};
36use super::{Sink, SinkError, SinkParam};
37use crate::connector_common::{
38 AwsAuthProps, KafkaCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
39 RdKafkaPropertiesCommon, read_kafka_log_level,
40};
41use crate::enforce_secret::EnforceSecret;
42use crate::sink::formatter::SinkFormatterImpl;
43use crate::sink::log_store::DeliveryFutureManagerAddFuture;
44use crate::sink::writer::{
45 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
46};
47use crate::sink::{Result, SinkWriterParam};
48use crate::source::kafka::{
49 KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
50};
51use crate::source::{SourceEnumeratorContext, SplitEnumerator};
52use crate::{
53 deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
54};
55
56pub const KAFKA_SINK: &str = "kafka";
57
58const fn _default_max_retries() -> u32 {
59 3
60}
61
62const fn _default_retry_backoff() -> Duration {
63 Duration::from_millis(100)
64}
65
66const fn _default_max_in_flight_requests_per_connection() -> usize {
67 5
68}
69
70#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
71#[strum(serialize_all = "snake_case")]
72pub enum CompressionCodec {
73 None,
74 Gzip,
75 Snappy,
76 Lz4,
77 Zstd,
78}
79
80#[serde_as]
83#[derive(Debug, Clone, Deserialize, WithOptions)]
84pub struct RdKafkaPropertiesProducer {
85 #[serde(rename = "properties.allow.auto.create.topics")]
87 #[serde_as(as = "Option<DisplayFromStr>")]
88 #[with_option(allow_alter_on_fly)]
89 pub allow_auto_create_topics: Option<bool>,
90
91 #[serde(rename = "properties.queue.buffering.max.messages")]
94 #[serde_as(as = "Option<DisplayFromStr>")]
95 #[with_option(allow_alter_on_fly)]
96 pub queue_buffering_max_messages: Option<usize>,
97
98 #[serde(rename = "properties.queue.buffering.max.kbytes")]
101 #[serde_as(as = "Option<DisplayFromStr>")]
102 #[with_option(allow_alter_on_fly)]
103 queue_buffering_max_kbytes: Option<usize>,
104
105 #[serde(rename = "properties.queue.buffering.max.ms")]
110 #[serde_as(as = "Option<DisplayFromStr>")]
111 #[with_option(allow_alter_on_fly)]
112 queue_buffering_max_ms: Option<f64>,
113
114 #[serde(rename = "properties.enable.idempotence")]
121 #[serde_as(as = "Option<DisplayFromStr>")]
122 #[with_option(allow_alter_on_fly)]
123 enable_idempotence: Option<bool>,
124
125 #[serde(rename = "properties.message.send.max.retries")]
127 #[serde_as(as = "Option<DisplayFromStr>")]
128 #[with_option(allow_alter_on_fly)]
129 message_send_max_retries: Option<usize>,
130
131 #[serde(rename = "properties.retry.backoff.ms")]
133 #[serde_as(as = "Option<DisplayFromStr>")]
134 #[with_option(allow_alter_on_fly)]
135 retry_backoff_ms: Option<usize>,
136
137 #[serde(rename = "properties.batch.num.messages")]
139 #[serde_as(as = "Option<DisplayFromStr>")]
140 #[with_option(allow_alter_on_fly)]
141 batch_num_messages: Option<usize>,
142
143 #[serde(rename = "properties.batch.size")]
148 #[serde_as(as = "Option<DisplayFromStr>")]
149 #[with_option(allow_alter_on_fly)]
150 batch_size: Option<usize>,
151
152 #[serde(rename = "properties.compression.codec")]
154 #[serde_as(as = "Option<DisplayFromStr>")]
155 compression_codec: Option<CompressionCodec>,
156
157 #[serde(rename = "properties.message.timeout.ms")]
161 #[serde_as(as = "Option<DisplayFromStr>")]
162 #[with_option(allow_alter_on_fly)]
163 message_timeout_ms: Option<usize>,
164
165 #[serde(
167 rename = "properties.max.in.flight.requests.per.connection",
168 default = "_default_max_in_flight_requests_per_connection"
169 )]
170 #[serde_as(as = "DisplayFromStr")]
171 #[with_option(allow_alter_on_fly)]
172 max_in_flight_requests_per_connection: usize,
173
174 #[serde(rename = "properties.request.required.acks")]
175 #[serde_as(as = "Option<DisplayFromStr>")]
176 #[with_option(allow_alter_on_fly)]
177 request_required_acks: Option<i32>,
178}
179
180impl RdKafkaPropertiesProducer {
181 pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
182 if let Some(v) = self.allow_auto_create_topics {
183 c.set("allow.auto.create.topics", v.to_string());
184 }
185 if let Some(v) = self.queue_buffering_max_messages {
186 c.set("queue.buffering.max.messages", v.to_string());
187 }
188 if let Some(v) = self.queue_buffering_max_kbytes {
189 c.set("queue.buffering.max.kbytes", v.to_string());
190 }
191 if let Some(v) = self.queue_buffering_max_ms {
192 c.set("queue.buffering.max.ms", v.to_string());
193 }
194 if let Some(v) = self.enable_idempotence {
195 c.set("enable.idempotence", v.to_string());
196 }
197 if let Some(v) = self.message_send_max_retries {
198 c.set("message.send.max.retries", v.to_string());
199 }
200 if let Some(v) = self.retry_backoff_ms {
201 c.set("retry.backoff.ms", v.to_string());
202 }
203 if let Some(v) = self.batch_num_messages {
204 c.set("batch.num.messages", v.to_string());
205 }
206 if let Some(v) = self.batch_size {
207 c.set("batch.size", v.to_string());
208 }
209 if let Some(v) = &self.compression_codec {
210 c.set("compression.codec", v.to_string());
211 }
212 if let Some(v) = self.request_required_acks {
213 c.set("request.required.acks", v.to_string());
214 }
215 if let Some(v) = self.message_timeout_ms {
216 c.set("message.timeout.ms", v.to_string());
217 }
218 c.set(
219 "max.in.flight.requests.per.connection",
220 self.max_in_flight_requests_per_connection.to_string(),
221 );
222 }
223}
224
225#[serde_as]
226#[derive(Debug, Clone, Deserialize, WithOptions)]
227pub struct KafkaConfig {
228 #[serde(flatten)]
229 pub common: KafkaCommon,
230
231 #[serde(flatten)]
232 pub connection: KafkaConnectionProps,
233
234 #[serde(
235 rename = "properties.retry.max",
236 default = "_default_max_retries",
237 deserialize_with = "deserialize_u32_from_string"
238 )]
239 pub max_retry_num: u32,
240
241 #[serde(
242 rename = "properties.retry.interval",
243 default = "_default_retry_backoff",
244 deserialize_with = "deserialize_duration_from_string"
245 )]
246 pub retry_interval: Duration,
247
248 pub primary_key: Option<String>,
252
253 #[serde(flatten)]
254 pub rdkafka_properties_common: RdKafkaPropertiesCommon,
255
256 #[serde(flatten)]
257 pub rdkafka_properties_producer: RdKafkaPropertiesProducer,
258
259 #[serde(flatten)]
260 pub privatelink_common: KafkaPrivateLinkCommon,
261
262 #[serde(flatten)]
263 pub aws_auth_props: AwsAuthProps,
264}
265
266impl EnforceSecret for KafkaConfig {
267 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
268 KafkaConnectionProps::enforce_one(prop)?;
269 AwsAuthProps::enforce_one(prop)?;
270 Ok(())
271 }
272}
273
274impl KafkaConfig {
275 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
276 let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
277 .map_err(|e| SinkError::Config(anyhow!(e)))?;
278
279 Ok(config)
280 }
281
282 pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
283 self.rdkafka_properties_common.set_client(c);
284 self.rdkafka_properties_producer.set_client(c);
285 }
286}
287
288impl From<KafkaConfig> for KafkaProperties {
289 fn from(val: KafkaConfig) -> Self {
290 KafkaProperties {
291 bytes_per_second: None,
292 max_num_messages: None,
293 scan_startup_mode: None,
294 time_offset: None,
295 upsert: None,
296 common: val.common,
297 connection: val.connection,
298 rdkafka_properties_common: val.rdkafka_properties_common,
299 rdkafka_properties_consumer: Default::default(),
300 privatelink_common: val.privatelink_common,
301 aws_auth_props: val.aws_auth_props,
302 group_id_prefix: None,
303 unknown_fields: Default::default(),
304 }
305 }
306}
307
308#[derive(Debug)]
309pub struct KafkaSink {
310 pub config: KafkaConfig,
311 schema: Schema,
312 pk_indices: Vec<usize>,
313 format_desc: SinkFormatDesc,
314 db_name: String,
315 sink_from_name: String,
316}
317
318impl EnforceSecret for KafkaSink {
319 fn enforce_secret<'a>(
320 prop_iter: impl Iterator<Item = &'a str>,
321 ) -> crate::error::ConnectorResult<()> {
322 for prop in prop_iter {
323 KafkaConfig::enforce_one(prop)?;
324 }
325 Ok(())
326 }
327}
328
329impl TryFrom<SinkParam> for KafkaSink {
330 type Error = SinkError;
331
332 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
333 let schema = param.schema();
334 let config = KafkaConfig::from_btreemap(param.properties)?;
335 Ok(Self {
336 config,
337 schema,
338 pk_indices: param.downstream_pk,
339 format_desc: param
340 .format_desc
341 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
342 db_name: param.db_name,
343 sink_from_name: param.sink_from_name,
344 })
345 }
346}
347
348impl Sink for KafkaSink {
349 type LogSinker = AsyncTruncateLogSinkerOf<KafkaSinkWriter>;
350
351 const SINK_NAME: &'static str = KAFKA_SINK;
352
353 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
354 let formatter = SinkFormatterImpl::new(
355 &self.format_desc,
356 self.schema.clone(),
357 self.pk_indices.clone(),
358 self.db_name.clone(),
359 self.sink_from_name.clone(),
360 &self.config.common.topic,
361 )
362 .await?;
363 let max_delivery_buffer_size = (self
364 .config
365 .rdkafka_properties_producer
366 .queue_buffering_max_messages
367 .as_ref()
368 .cloned()
369 .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
370 * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;
371
372 Ok(KafkaSinkWriter::new(self.config.clone(), formatter)
373 .await?
374 .into_log_sinker(max_delivery_buffer_size))
375 }
376
377 async fn validate(&self) -> Result<()> {
378 if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
380 return Err(SinkError::Config(anyhow!(
381 "primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
382 self.format_desc.format
383 )));
384 }
385 SinkFormatterImpl::new(
387 &self.format_desc,
388 self.schema.clone(),
389 self.pk_indices.clone(),
390 self.db_name.clone(),
391 self.sink_from_name.clone(),
392 &self.config.common.topic,
393 )
394 .await?;
395
396 let check = KafkaSplitEnumerator::new(
400 KafkaProperties::from(self.config.clone()),
401 Arc::new(SourceEnumeratorContext::dummy()),
402 )
403 .await?;
404 if let Err(e) = check.check_reachability().await {
405 return Err(SinkError::Config(
406 anyhow!(
407 "cannot connect to kafka broker ({})",
408 self.config.connection.brokers,
409 )
410 .context(e),
411 ));
412 }
413 Ok(())
414 }
415
416 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
417 KafkaConfig::from_btreemap(config.clone())?;
418 Ok(())
419 }
420}
421
422const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
426const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;
430
431struct KafkaPayloadWriter<'a> {
432 inner: &'a FutureProducer<RwProducerContext>,
433 add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
434 config: &'a KafkaConfig,
435}
436
437mod opaque_type {
438 use super::*;
439 pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
440
441 #[define_opaque(KafkaSinkDeliveryFuture)]
442 pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
443 future.map(KafkaPayloadWriter::<'static>::map_future_result)
444 }
445}
446pub use opaque_type::KafkaSinkDeliveryFuture;
447use opaque_type::map_delivery_future;
448
449pub struct KafkaSinkWriter {
450 formatter: SinkFormatterImpl,
451 inner: FutureProducer<RwProducerContext>,
452 config: KafkaConfig,
453}
454
455impl KafkaSinkWriter {
456 async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
457 let inner: FutureProducer<RwProducerContext> = {
458 let mut c = ClientConfig::new();
459
460 config.connection.set_security_properties(&mut c);
462 config.set_client(&mut c);
463
464 c.set("bootstrap.servers", &config.connection.brokers);
466
467 let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
469 let ctx_common = KafkaContextCommon::new(
470 broker_rewrite_map,
471 None,
472 None,
473 config.aws_auth_props.clone(),
474 config.connection.is_aws_msk_iam(),
475 )
476 .await?;
477 let producer_ctx = RwProducerContext::new(ctx_common);
478 if let Some(log_level) = read_kafka_log_level() {
481 c.set_log_level(log_level);
482 }
483 c.create_with_context(producer_ctx).await?
484 };
485
486 Ok(KafkaSinkWriter {
487 formatter,
488 inner,
489 config: config.clone(),
490 })
491 }
492}
493
494impl AsyncTruncateSinkWriter for KafkaSinkWriter {
495 type DeliveryFuture = KafkaSinkDeliveryFuture;
496
497 async fn write_chunk<'a>(
498 &'a mut self,
499 chunk: StreamChunk,
500 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
501 ) -> Result<()> {
502 let mut payload_writer = KafkaPayloadWriter {
503 inner: &mut self.inner,
504 add_future,
505 config: &self.config,
506 };
507 dispatch_sink_formatter_impl!(&self.formatter, formatter, {
508 payload_writer.write_chunk(chunk, formatter).await
509 })
510 }
511}
512
513impl KafkaPayloadWriter<'_> {
514 async fn send_result<'a, K, P>(&'a mut self, mut record: FutureRecord<'a, K, P>) -> Result<()>
517 where
518 K: ToBytes + ?Sized,
519 P: ToBytes + ?Sized,
520 {
521 let mut success_flag = false;
522
523 let mut ret = Ok(());
524
525 for i in 0..self.config.max_retry_num {
526 match self.inner.send_result(record) {
527 Ok(delivery_future) => {
528 if self
529 .add_future
530 .add_future_may_await(map_delivery_future(delivery_future))
531 .await?
532 {
533 tracing::warn!(
534 "Number of records being delivered ({}) >= expected kafka producer queue size ({}).
535 This indicates the default value of queue.buffering.max.messages has changed.",
536 self.add_future.future_count(),
537 self.add_future.max_future_count()
538 );
539 }
540 success_flag = true;
541 break;
542 }
543 Err((e, rec)) => {
546 tracing::warn!(
547 error = %e.as_report(),
548 "producing message (key {:?}) to topic {} failed",
549 rec.key.map(|k| k.to_bytes()),
550 rec.topic,
551 );
552 record = rec;
553 match e {
554 KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
555 tracing::warn!(
556 "Producer queue full. Delivery future buffer size={}. Await and retry #{}",
557 self.add_future.future_count(),
558 i
559 );
560 self.add_future.await_one_delivery().await?;
561 continue;
562 }
563 _ => return Err(e.into()),
564 }
565 }
566 }
567 }
568
569 if !success_flag {
570 ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull).into());
573 }
574
575 ret
576 }
577
578 async fn write_inner(
579 &mut self,
580 event_key_object: Option<Vec<u8>>,
581 event_object: Option<Vec<u8>>,
582 ) -> Result<()> {
583 let topic = self.config.common.topic.clone();
584 let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str());
585 if let Some(key_str) = &event_key_object {
586 record = record.key(key_str);
587 }
588 if let Some(payload) = &event_object {
589 record = record.payload(payload);
590 }
591 self.send_result(record).await?;
594 Ok(())
595 }
596
597 fn map_future_result(delivery_future_result: <DeliveryFuture as Future>::Output) -> Result<()> {
598 match delivery_future_result {
599 Ok(Ok(_)) => Ok(()),
603 Ok(Err((k_err, _msg))) => Err(k_err.into()),
609 Err(_) => Err(KafkaError::Canceled.into()),
613 }
614 }
615}
616
617impl FormattedSink for KafkaPayloadWriter<'_> {
618 type K = Vec<u8>;
619 type V = Vec<u8>;
620
621 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
622 self.write_inner(k, v).await
623 }
624}
625
626#[cfg(test)]
627mod test {
628 use maplit::btreemap;
629 use risingwave_common::catalog::Field;
630 use risingwave_common::types::DataType;
631
632 use super::*;
633 use crate::sink::encoder::{
634 DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode,
635 TimestamptzHandlingMode,
636 };
637 use crate::sink::formatter::AppendOnlyFormatter;
638
639 #[test]
640 fn parse_rdkafka_props() {
641 let props: BTreeMap<String, String> = btreemap! {
642 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
645 "topic".to_owned() => "test".to_owned(),
646 "properties.message.max.bytes".to_owned() => "12345".to_owned(),
649 "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
650 "properties.queue.buffering.max.messages".to_owned() => "114514".to_owned(),
652 "properties.queue.buffering.max.kbytes".to_owned() => "114514".to_owned(),
653 "properties.queue.buffering.max.ms".to_owned() => "114.514".to_owned(),
654 "properties.enable.idempotence".to_owned() => "false".to_owned(),
655 "properties.message.send.max.retries".to_owned() => "114514".to_owned(),
656 "properties.retry.backoff.ms".to_owned() => "114514".to_owned(),
657 "properties.batch.num.messages".to_owned() => "114514".to_owned(),
658 "properties.batch.size".to_owned() => "114514".to_owned(),
659 "properties.compression.codec".to_owned() => "zstd".to_owned(),
660 "properties.message.timeout.ms".to_owned() => "114514".to_owned(),
661 "properties.max.in.flight.requests.per.connection".to_owned() => "114514".to_owned(),
662 "properties.request.required.acks".to_owned() => "-1".to_owned(),
663 };
664 let c = KafkaConfig::from_btreemap(props).unwrap();
665 assert_eq!(
666 c.rdkafka_properties_producer.queue_buffering_max_ms,
667 Some(114.514f64)
668 );
669 assert_eq!(
670 c.rdkafka_properties_producer.compression_codec,
671 Some(CompressionCodec::Zstd)
672 );
673 assert_eq!(
674 c.rdkafka_properties_producer.message_timeout_ms,
675 Some(114514)
676 );
677 assert_eq!(
678 c.rdkafka_properties_producer
679 .max_in_flight_requests_per_connection,
680 114514
681 );
682 assert_eq!(
683 c.rdkafka_properties_producer.request_required_acks,
684 Some(-1)
685 );
686
687 let props: BTreeMap<String, String> = btreemap! {
688 "connector".to_owned() => "kafka".to_owned(),
690 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
691 "topic".to_owned() => "test".to_owned(),
692 "type".to_owned() => "append-only".to_owned(),
693
694 "properties.enable.idempotence".to_owned() => "True".to_owned(), };
696 assert!(KafkaConfig::from_btreemap(props).is_err());
697
698 let props: BTreeMap<String, String> = btreemap! {
699 "connector".to_owned() => "kafka".to_owned(),
701 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
702 "topic".to_owned() => "test".to_owned(),
703 "type".to_owned() => "append-only".to_owned(),
704 "properties.queue.buffering.max.kbytes".to_owned() => "-114514".to_owned(), };
706 assert!(KafkaConfig::from_btreemap(props).is_err());
707
708 let props: BTreeMap<String, String> = btreemap! {
709 "connector".to_owned() => "kafka".to_owned(),
711 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
712 "topic".to_owned() => "test".to_owned(),
713 "type".to_owned() => "append-only".to_owned(),
714 "properties.compression.codec".to_owned() => "notvalid".to_owned(), };
716 assert!(KafkaConfig::from_btreemap(props).is_err());
717 }
718
719 #[test]
720 fn parse_kafka_config() {
721 let properties: BTreeMap<String, String> = btreemap! {
722 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
724 "topic".to_owned() => "test".to_owned(),
725 "properties.security.protocol".to_owned() => "SASL".to_owned(),
728 "properties.sasl.mechanism".to_owned() => "SASL".to_owned(),
729 "properties.sasl.username".to_owned() => "test".to_owned(),
730 "properties.sasl.password".to_owned() => "test".to_owned(),
731 "properties.retry.max".to_owned() => "20".to_owned(),
732 "properties.retry.interval".to_owned() => "500ms".to_owned(),
733 "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
735 };
736 let config = KafkaConfig::from_btreemap(properties).unwrap();
737 assert_eq!(config.connection.brokers, "localhost:9092");
738 assert_eq!(config.common.topic, "test");
739 assert_eq!(config.max_retry_num, 20);
740 assert_eq!(config.retry_interval, Duration::from_millis(500));
741
742 let btreemap: BTreeMap<String, String> = btreemap! {
744 "broker1".to_owned() => "10.0.0.1:8001".to_owned()
745 };
746 assert_eq!(config.privatelink_common.broker_rewrite_map, Some(btreemap));
747
748 let properties: BTreeMap<String, String> = btreemap! {
750 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
752 "topic".to_owned() => "test".to_owned(),
753 };
755 let config = KafkaConfig::from_btreemap(properties).unwrap();
756 assert_eq!(config.max_retry_num, 3);
757 assert_eq!(config.retry_interval, Duration::from_millis(100));
758
759 let properties: BTreeMap<String, String> = btreemap! {
761 "connector".to_owned() => "kafka".to_owned(),
762 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
763 "topic".to_owned() => "test".to_owned(),
764 "type".to_owned() => "upsert".to_owned(),
765 "properties.retry.max".to_owned() => "-20".to_owned(), };
767 assert!(KafkaConfig::from_btreemap(properties).is_err());
768
769 let properties: BTreeMap<String, String> = btreemap! {
771 "connector".to_owned() => "kafka".to_owned(),
772 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
773 "topic".to_owned() => "test".to_owned(),
774 "type".to_owned() => "upsert".to_owned(),
775 "properties.retry.interval".to_owned() => "500miiinutes".to_owned(), };
777 assert!(KafkaConfig::from_btreemap(properties).is_err());
778 }
779
780 #[ignore]
783 #[tokio::test]
784 async fn test_kafka_producer() -> Result<()> {
785 let properties = btreemap! {
787 "connector".to_owned() => "kafka".to_owned(),
788 "properties.bootstrap.server".to_owned() => "localhost:29092".to_owned(),
789 "type".to_owned() => "append-only".to_owned(),
790 "topic".to_owned() => "test_topic".to_owned(),
791 "properties.compression.codec".to_owned() => "zstd".to_owned(),
792 };
793
794 let schema = Schema::new(vec![
796 Field {
797 data_type: DataType::Int32,
798 name: "id".into(),
799 },
800 Field {
801 data_type: DataType::Varchar,
802 name: "v2".into(),
803 },
804 ]);
805
806 let kafka_config = KafkaConfig::from_btreemap(properties)?;
807
808 let sink = KafkaSinkWriter::new(
810 kafka_config.clone(),
811 SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
812 None,
814 JsonEncoder::new(
815 schema,
816 None,
817 DateHandlingMode::FromCe,
818 TimestampHandlingMode::Milli,
819 TimestamptzHandlingMode::UtcString,
820 TimeHandlingMode::Milli,
821 JsonbHandlingMode::String,
822 ),
823 )),
824 )
825 .await
826 .unwrap();
827
828 use crate::sink::log_store::DeliveryFutureManager;
829
830 let mut future_manager = DeliveryFutureManager::new(usize::MAX);
831
832 for i in 0..10 {
833 println!("epoch: {}", i);
834 for j in 0..100 {
835 let mut writer = KafkaPayloadWriter {
836 inner: &sink.inner,
837 add_future: future_manager.start_write_chunk(i, j),
838 config: &sink.config,
839 };
840 match writer
841 .send_result(
842 FutureRecord::to(kafka_config.common.topic.as_str())
843 .payload(format!("value-{}", j).as_bytes())
844 .key(format!("dummy_key_for_epoch-{}", i).as_bytes()),
845 )
846 .await
847 {
848 Ok(_) => {}
849 Err(e) => {
850 println!("{:?}", e);
851 break;
852 }
853 };
854 }
855 }
856
857 Ok(())
858 }
859}