1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::{Future, FutureExt, TryFuture};
22use rdkafka::ClientConfig;
23use rdkafka::error::KafkaError;
24use rdkafka::message::ToBytes;
25use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
26use rdkafka::types::RDKafkaErrorCode;
27use risingwave_common::array::StreamChunk;
28use risingwave_common::catalog::Schema;
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use strum_macros::{Display, EnumString};
32use thiserror_ext::AsReport;
33use with_options::WithOptions;
34
35use super::catalog::{SinkFormat, SinkFormatDesc};
36use super::{Sink, SinkError, SinkParam};
37use crate::connector_common::{
38 AwsAuthProps, KafkaCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
39 RdKafkaPropertiesCommon,
40};
41use crate::enforce_secret::EnforceSecret;
42use crate::sink::formatter::SinkFormatterImpl;
43use crate::sink::log_store::DeliveryFutureManagerAddFuture;
44use crate::sink::writer::{
45 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
46};
47use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
48use crate::source::kafka::{
49 KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
50};
51use crate::source::{SourceEnumeratorContext, SplitEnumerator};
52use crate::{
53 deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
54};
55
56pub const KAFKA_SINK: &str = "kafka";
57
58const fn _default_max_retries() -> u32 {
59 3
60}
61
62const fn _default_retry_backoff() -> Duration {
63 Duration::from_millis(100)
64}
65
66const fn _default_max_in_flight_requests_per_connection() -> usize {
67 5
68}
69
70#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
71#[strum(serialize_all = "snake_case")]
72pub enum CompressionCodec {
73 None,
74 Gzip,
75 Snappy,
76 Lz4,
77 Zstd,
78}
79
80#[serde_as]
83#[derive(Debug, Clone, Deserialize, WithOptions)]
84pub struct RdKafkaPropertiesProducer {
85 #[serde(rename = "properties.allow.auto.create.topics")]
87 #[serde_as(as = "Option<DisplayFromStr>")]
88 pub allow_auto_create_topics: Option<bool>,
89
90 #[serde(rename = "properties.queue.buffering.max.messages")]
93 #[serde_as(as = "Option<DisplayFromStr>")]
94 pub queue_buffering_max_messages: Option<usize>,
95
96 #[serde(rename = "properties.queue.buffering.max.kbytes")]
99 #[serde_as(as = "Option<DisplayFromStr>")]
100 queue_buffering_max_kbytes: Option<usize>,
101
102 #[serde(rename = "properties.queue.buffering.max.ms")]
107 #[serde_as(as = "Option<DisplayFromStr>")]
108 queue_buffering_max_ms: Option<f64>,
109
110 #[serde(rename = "properties.enable.idempotence")]
117 #[serde_as(as = "Option<DisplayFromStr>")]
118 enable_idempotence: Option<bool>,
119
120 #[serde(rename = "properties.message.send.max.retries")]
122 #[serde_as(as = "Option<DisplayFromStr>")]
123 message_send_max_retries: Option<usize>,
124
125 #[serde(rename = "properties.retry.backoff.ms")]
127 #[serde_as(as = "Option<DisplayFromStr>")]
128 retry_backoff_ms: Option<usize>,
129
130 #[serde(rename = "properties.batch.num.messages")]
132 #[serde_as(as = "Option<DisplayFromStr>")]
133 batch_num_messages: Option<usize>,
134
135 #[serde(rename = "properties.batch.size")]
140 #[serde_as(as = "Option<DisplayFromStr>")]
141 batch_size: Option<usize>,
142
143 #[serde(rename = "properties.compression.codec")]
145 #[serde_as(as = "Option<DisplayFromStr>")]
146 compression_codec: Option<CompressionCodec>,
147
148 #[serde(rename = "properties.message.timeout.ms")]
152 #[serde_as(as = "Option<DisplayFromStr>")]
153 message_timeout_ms: Option<usize>,
154
155 #[serde(
157 rename = "properties.max.in.flight.requests.per.connection",
158 default = "_default_max_in_flight_requests_per_connection"
159 )]
160 #[serde_as(as = "DisplayFromStr")]
161 max_in_flight_requests_per_connection: usize,
162
163 #[serde(rename = "properties.request.required.acks")]
164 #[serde_as(as = "Option<DisplayFromStr>")]
165 request_required_acks: Option<i32>,
166}
167
168impl RdKafkaPropertiesProducer {
169 pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
170 if let Some(v) = self.allow_auto_create_topics {
171 c.set("allow.auto.create.topics", v.to_string());
172 }
173 if let Some(v) = self.queue_buffering_max_messages {
174 c.set("queue.buffering.max.messages", v.to_string());
175 }
176 if let Some(v) = self.queue_buffering_max_kbytes {
177 c.set("queue.buffering.max.kbytes", v.to_string());
178 }
179 if let Some(v) = self.queue_buffering_max_ms {
180 c.set("queue.buffering.max.ms", v.to_string());
181 }
182 if let Some(v) = self.enable_idempotence {
183 c.set("enable.idempotence", v.to_string());
184 }
185 if let Some(v) = self.message_send_max_retries {
186 c.set("message.send.max.retries", v.to_string());
187 }
188 if let Some(v) = self.retry_backoff_ms {
189 c.set("retry.backoff.ms", v.to_string());
190 }
191 if let Some(v) = self.batch_num_messages {
192 c.set("batch.num.messages", v.to_string());
193 }
194 if let Some(v) = self.batch_size {
195 c.set("batch.size", v.to_string());
196 }
197 if let Some(v) = &self.compression_codec {
198 c.set("compression.codec", v.to_string());
199 }
200 if let Some(v) = self.request_required_acks {
201 c.set("request.required.acks", v.to_string());
202 }
203 if let Some(v) = self.message_timeout_ms {
204 c.set("message.timeout.ms", v.to_string());
205 }
206 c.set(
207 "max.in.flight.requests.per.connection",
208 self.max_in_flight_requests_per_connection.to_string(),
209 );
210 }
211}
212
213#[serde_as]
214#[derive(Debug, Clone, Deserialize, WithOptions)]
215pub struct KafkaConfig {
216 #[serde(flatten)]
217 pub common: KafkaCommon,
218
219 #[serde(flatten)]
220 pub connection: KafkaConnectionProps,
221
222 #[serde(
223 rename = "properties.retry.max",
224 default = "_default_max_retries",
225 deserialize_with = "deserialize_u32_from_string"
226 )]
227 pub max_retry_num: u32,
228
229 #[serde(
230 rename = "properties.retry.interval",
231 default = "_default_retry_backoff",
232 deserialize_with = "deserialize_duration_from_string"
233 )]
234 pub retry_interval: Duration,
235
236 pub primary_key: Option<String>,
240
241 #[serde(flatten)]
242 pub rdkafka_properties_common: RdKafkaPropertiesCommon,
243
244 #[serde(flatten)]
245 pub rdkafka_properties_producer: RdKafkaPropertiesProducer,
246
247 #[serde(flatten)]
248 pub privatelink_common: KafkaPrivateLinkCommon,
249
250 #[serde(flatten)]
251 pub aws_auth_props: AwsAuthProps,
252}
253
254impl EnforceSecret for KafkaConfig {
255 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
256 KafkaConnectionProps::enforce_one(prop)?;
257 AwsAuthProps::enforce_one(prop)?;
258 Ok(())
259 }
260}
261
262impl KafkaConfig {
263 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
264 let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
265 .map_err(|e| SinkError::Config(anyhow!(e)))?;
266
267 Ok(config)
268 }
269
270 pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
271 self.rdkafka_properties_common.set_client(c);
272 self.rdkafka_properties_producer.set_client(c);
273 }
274}
275
276impl From<KafkaConfig> for KafkaProperties {
277 fn from(val: KafkaConfig) -> Self {
278 KafkaProperties {
279 bytes_per_second: None,
280 max_num_messages: None,
281 scan_startup_mode: None,
282 time_offset: None,
283 upsert: None,
284 common: val.common,
285 connection: val.connection,
286 rdkafka_properties_common: val.rdkafka_properties_common,
287 rdkafka_properties_consumer: Default::default(),
288 privatelink_common: val.privatelink_common,
289 aws_auth_props: val.aws_auth_props,
290 group_id_prefix: None,
291 unknown_fields: Default::default(),
292 }
293 }
294}
295
296#[derive(Debug)]
297pub struct KafkaSink {
298 pub config: KafkaConfig,
299 schema: Schema,
300 pk_indices: Vec<usize>,
301 format_desc: SinkFormatDesc,
302 db_name: String,
303 sink_from_name: String,
304}
305
306impl EnforceSecret for KafkaSink {
307 fn enforce_secret<'a>(
308 prop_iter: impl Iterator<Item = &'a str>,
309 ) -> crate::error::ConnectorResult<()> {
310 for prop in prop_iter {
311 KafkaConfig::enforce_one(prop)?;
312 }
313 Ok(())
314 }
315}
316
317impl TryFrom<SinkParam> for KafkaSink {
318 type Error = SinkError;
319
320 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
321 let schema = param.schema();
322 let config = KafkaConfig::from_btreemap(param.properties)?;
323 Ok(Self {
324 config,
325 schema,
326 pk_indices: param.downstream_pk,
327 format_desc: param
328 .format_desc
329 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
330 db_name: param.db_name,
331 sink_from_name: param.sink_from_name,
332 })
333 }
334}
335
336impl Sink for KafkaSink {
337 type Coordinator = DummySinkCommitCoordinator;
338 type LogSinker = AsyncTruncateLogSinkerOf<KafkaSinkWriter>;
339
340 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[
341 "properties.allow.auto.create.topics",
342 "properties.batch.num.messages",
343 "properties.batch.size",
344 "properties.enable.idempotence",
345 "properties.max.in.flight.requests.per.connection",
346 "properties.message.max.bytes",
347 "properties.message.send.max.retries",
348 "properties.message.timeout.ms",
349 "properties.queue.buffering.max.kbytes",
350 "properties.queue.buffering.max.messages",
351 "properties.queue.buffering.max.ms",
352 "properties.request.required.acks",
353 "properties.retry.backoff.ms",
354 "properties.receive.message.max.bytes",
355 ];
356 const SINK_NAME: &'static str = KAFKA_SINK;
357
358 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
359 let formatter = SinkFormatterImpl::new(
360 &self.format_desc,
361 self.schema.clone(),
362 self.pk_indices.clone(),
363 self.db_name.clone(),
364 self.sink_from_name.clone(),
365 &self.config.common.topic,
366 )
367 .await?;
368 let max_delivery_buffer_size = (self
369 .config
370 .rdkafka_properties_producer
371 .queue_buffering_max_messages
372 .as_ref()
373 .cloned()
374 .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
375 * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;
376
377 Ok(KafkaSinkWriter::new(self.config.clone(), formatter)
378 .await?
379 .into_log_sinker(max_delivery_buffer_size))
380 }
381
382 async fn validate(&self) -> Result<()> {
383 if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
385 return Err(SinkError::Config(anyhow!(
386 "primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
387 self.format_desc.format
388 )));
389 }
390 SinkFormatterImpl::new(
392 &self.format_desc,
393 self.schema.clone(),
394 self.pk_indices.clone(),
395 self.db_name.clone(),
396 self.sink_from_name.clone(),
397 &self.config.common.topic,
398 )
399 .await?;
400
401 let check = KafkaSplitEnumerator::new(
405 KafkaProperties::from(self.config.clone()),
406 Arc::new(SourceEnumeratorContext::dummy()),
407 )
408 .await?;
409 if !check.check_reachability().await {
410 return Err(SinkError::Config(anyhow!(
411 "cannot connect to kafka broker ({})",
412 self.config.connection.brokers
413 )));
414 }
415 Ok(())
416 }
417
418 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
419 KafkaConfig::from_btreemap(config.clone())?;
420 Ok(())
421 }
422}
423
424const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
428const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;
432
433struct KafkaPayloadWriter<'a> {
434 inner: &'a FutureProducer<RwProducerContext>,
435 add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
436 config: &'a KafkaConfig,
437}
438
439mod opaque_type {
440 use super::*;
441 pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
442
443 pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
444 future.map(KafkaPayloadWriter::<'static>::map_future_result)
445 }
446}
447pub use opaque_type::KafkaSinkDeliveryFuture;
448use opaque_type::map_delivery_future;
449
450pub struct KafkaSinkWriter {
451 formatter: SinkFormatterImpl,
452 inner: FutureProducer<RwProducerContext>,
453 config: KafkaConfig,
454}
455
456impl KafkaSinkWriter {
457 async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
458 let inner: FutureProducer<RwProducerContext> = {
459 let mut c = ClientConfig::new();
460
461 config.connection.set_security_properties(&mut c);
463 config.set_client(&mut c);
464
465 c.set("bootstrap.servers", &config.connection.brokers);
467
468 let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
470 let ctx_common = KafkaContextCommon::new(
471 broker_rewrite_map,
472 None,
473 None,
474 config.aws_auth_props.clone(),
475 config.connection.is_aws_msk_iam(),
476 )
477 .await?;
478 let producer_ctx = RwProducerContext::new(ctx_common);
479 c.create_with_context(producer_ctx).await?
481 };
482
483 Ok(KafkaSinkWriter {
484 formatter,
485 inner,
486 config: config.clone(),
487 })
488 }
489}
490
491impl AsyncTruncateSinkWriter for KafkaSinkWriter {
492 type DeliveryFuture = KafkaSinkDeliveryFuture;
493
494 async fn write_chunk<'a>(
495 &'a mut self,
496 chunk: StreamChunk,
497 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
498 ) -> Result<()> {
499 let mut payload_writer = KafkaPayloadWriter {
500 inner: &mut self.inner,
501 add_future,
502 config: &self.config,
503 };
504 dispatch_sink_formatter_impl!(&self.formatter, formatter, {
505 payload_writer.write_chunk(chunk, formatter).await
506 })
507 }
508}
509
510impl KafkaPayloadWriter<'_> {
511 async fn send_result<'a, K, P>(&'a mut self, mut record: FutureRecord<'a, K, P>) -> Result<()>
514 where
515 K: ToBytes + ?Sized,
516 P: ToBytes + ?Sized,
517 {
518 let mut success_flag = false;
519
520 let mut ret = Ok(());
521
522 for i in 0..self.config.max_retry_num {
523 match self.inner.send_result(record) {
524 Ok(delivery_future) => {
525 if self
526 .add_future
527 .add_future_may_await(map_delivery_future(delivery_future))
528 .await?
529 {
530 tracing::warn!(
531 "Number of records being delivered ({}) >= expected kafka producer queue size ({}).
532 This indicates the default value of queue.buffering.max.messages has changed.",
533 self.add_future.future_count(),
534 self.add_future.max_future_count()
535 );
536 }
537 success_flag = true;
538 break;
539 }
540 Err((e, rec)) => {
543 tracing::warn!(
544 error = %e.as_report(),
545 "producing message (key {:?}) to topic {} failed",
546 rec.key.map(|k| k.to_bytes()),
547 rec.topic,
548 );
549 record = rec;
550 match e {
551 KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
552 tracing::warn!(
553 "Producer queue full. Delivery future buffer size={}. Await and retry #{}",
554 self.add_future.future_count(),
555 i
556 );
557 self.add_future.await_one_delivery().await?;
558 continue;
559 }
560 _ => return Err(e.into()),
561 }
562 }
563 }
564 }
565
566 if !success_flag {
567 ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull).into());
570 }
571
572 ret
573 }
574
575 async fn write_inner(
576 &mut self,
577 event_key_object: Option<Vec<u8>>,
578 event_object: Option<Vec<u8>>,
579 ) -> Result<()> {
580 let topic = self.config.common.topic.clone();
581 let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str());
582 if let Some(key_str) = &event_key_object {
583 record = record.key(key_str);
584 }
585 if let Some(payload) = &event_object {
586 record = record.payload(payload);
587 }
588 self.send_result(record).await?;
591 Ok(())
592 }
593
594 fn map_future_result(delivery_future_result: <DeliveryFuture as Future>::Output) -> Result<()> {
595 match delivery_future_result {
596 Ok(Ok(_)) => Ok(()),
600 Ok(Err((k_err, _msg))) => Err(k_err.into()),
606 Err(_) => Err(KafkaError::Canceled.into()),
610 }
611 }
612}
613
614impl FormattedSink for KafkaPayloadWriter<'_> {
615 type K = Vec<u8>;
616 type V = Vec<u8>;
617
618 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
619 self.write_inner(k, v).await
620 }
621}
622
623#[cfg(test)]
624mod test {
625 use maplit::btreemap;
626 use risingwave_common::catalog::Field;
627 use risingwave_common::types::DataType;
628
629 use super::*;
630 use crate::sink::encoder::{
631 DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode,
632 TimestamptzHandlingMode,
633 };
634 use crate::sink::formatter::AppendOnlyFormatter;
635
636 #[test]
637 fn parse_rdkafka_props() {
638 let props: BTreeMap<String, String> = btreemap! {
639 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
642 "topic".to_owned() => "test".to_owned(),
643 "properties.message.max.bytes".to_owned() => "12345".to_owned(),
646 "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
647 "properties.queue.buffering.max.messages".to_owned() => "114514".to_owned(),
649 "properties.queue.buffering.max.kbytes".to_owned() => "114514".to_owned(),
650 "properties.queue.buffering.max.ms".to_owned() => "114.514".to_owned(),
651 "properties.enable.idempotence".to_owned() => "false".to_owned(),
652 "properties.message.send.max.retries".to_owned() => "114514".to_owned(),
653 "properties.retry.backoff.ms".to_owned() => "114514".to_owned(),
654 "properties.batch.num.messages".to_owned() => "114514".to_owned(),
655 "properties.batch.size".to_owned() => "114514".to_owned(),
656 "properties.compression.codec".to_owned() => "zstd".to_owned(),
657 "properties.message.timeout.ms".to_owned() => "114514".to_owned(),
658 "properties.max.in.flight.requests.per.connection".to_owned() => "114514".to_owned(),
659 "properties.request.required.acks".to_owned() => "-1".to_owned(),
660 };
661 let c = KafkaConfig::from_btreemap(props).unwrap();
662 assert_eq!(
663 c.rdkafka_properties_producer.queue_buffering_max_ms,
664 Some(114.514f64)
665 );
666 assert_eq!(
667 c.rdkafka_properties_producer.compression_codec,
668 Some(CompressionCodec::Zstd)
669 );
670 assert_eq!(
671 c.rdkafka_properties_producer.message_timeout_ms,
672 Some(114514)
673 );
674 assert_eq!(
675 c.rdkafka_properties_producer
676 .max_in_flight_requests_per_connection,
677 114514
678 );
679 assert_eq!(
680 c.rdkafka_properties_producer.request_required_acks,
681 Some(-1)
682 );
683
684 let props: BTreeMap<String, String> = btreemap! {
685 "connector".to_owned() => "kafka".to_owned(),
687 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
688 "topic".to_owned() => "test".to_owned(),
689 "type".to_owned() => "append-only".to_owned(),
690
691 "properties.enable.idempotence".to_owned() => "True".to_owned(), };
693 assert!(KafkaConfig::from_btreemap(props).is_err());
694
695 let props: BTreeMap<String, String> = btreemap! {
696 "connector".to_owned() => "kafka".to_owned(),
698 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
699 "topic".to_owned() => "test".to_owned(),
700 "type".to_owned() => "append-only".to_owned(),
701 "properties.queue.buffering.max.kbytes".to_owned() => "-114514".to_owned(), };
703 assert!(KafkaConfig::from_btreemap(props).is_err());
704
705 let props: BTreeMap<String, String> = btreemap! {
706 "connector".to_owned() => "kafka".to_owned(),
708 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
709 "topic".to_owned() => "test".to_owned(),
710 "type".to_owned() => "append-only".to_owned(),
711 "properties.compression.codec".to_owned() => "notvalid".to_owned(), };
713 assert!(KafkaConfig::from_btreemap(props).is_err());
714 }
715
716 #[test]
717 fn parse_kafka_config() {
718 let properties: BTreeMap<String, String> = btreemap! {
719 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
721 "topic".to_owned() => "test".to_owned(),
722 "properties.security.protocol".to_owned() => "SASL".to_owned(),
725 "properties.sasl.mechanism".to_owned() => "SASL".to_owned(),
726 "properties.sasl.username".to_owned() => "test".to_owned(),
727 "properties.sasl.password".to_owned() => "test".to_owned(),
728 "properties.retry.max".to_owned() => "20".to_owned(),
729 "properties.retry.interval".to_owned() => "500ms".to_owned(),
730 "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
732 };
733 let config = KafkaConfig::from_btreemap(properties).unwrap();
734 assert_eq!(config.connection.brokers, "localhost:9092");
735 assert_eq!(config.common.topic, "test");
736 assert_eq!(config.max_retry_num, 20);
737 assert_eq!(config.retry_interval, Duration::from_millis(500));
738
739 let btreemap: BTreeMap<String, String> = btreemap! {
741 "broker1".to_owned() => "10.0.0.1:8001".to_owned()
742 };
743 assert_eq!(config.privatelink_common.broker_rewrite_map, Some(btreemap));
744
745 let properties: BTreeMap<String, String> = btreemap! {
747 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
749 "topic".to_owned() => "test".to_owned(),
750 };
752 let config = KafkaConfig::from_btreemap(properties).unwrap();
753 assert_eq!(config.max_retry_num, 3);
754 assert_eq!(config.retry_interval, Duration::from_millis(100));
755
756 let properties: BTreeMap<String, String> = btreemap! {
758 "connector".to_owned() => "kafka".to_owned(),
759 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
760 "topic".to_owned() => "test".to_owned(),
761 "type".to_owned() => "upsert".to_owned(),
762 "properties.retry.max".to_owned() => "-20".to_owned(), };
764 assert!(KafkaConfig::from_btreemap(properties).is_err());
765
766 let properties: BTreeMap<String, String> = btreemap! {
768 "connector".to_owned() => "kafka".to_owned(),
769 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
770 "topic".to_owned() => "test".to_owned(),
771 "type".to_owned() => "upsert".to_owned(),
772 "properties.retry.interval".to_owned() => "500miiinutes".to_owned(), };
774 assert!(KafkaConfig::from_btreemap(properties).is_err());
775 }
776
777 #[ignore]
780 #[tokio::test]
781 async fn test_kafka_producer() -> Result<()> {
782 let properties = btreemap! {
784 "connector".to_owned() => "kafka".to_owned(),
785 "properties.bootstrap.server".to_owned() => "localhost:29092".to_owned(),
786 "type".to_owned() => "append-only".to_owned(),
787 "topic".to_owned() => "test_topic".to_owned(),
788 "properties.compression.codec".to_owned() => "zstd".to_owned(),
789 };
790
791 let schema = Schema::new(vec![
793 Field {
794 data_type: DataType::Int32,
795 name: "id".into(),
796 },
797 Field {
798 data_type: DataType::Varchar,
799 name: "v2".into(),
800 },
801 ]);
802
803 let kafka_config = KafkaConfig::from_btreemap(properties)?;
804
805 let sink = KafkaSinkWriter::new(
807 kafka_config.clone(),
808 SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
809 None,
811 JsonEncoder::new(
812 schema,
813 None,
814 DateHandlingMode::FromCe,
815 TimestampHandlingMode::Milli,
816 TimestamptzHandlingMode::UtcString,
817 TimeHandlingMode::Milli,
818 JsonbHandlingMode::String,
819 ),
820 )),
821 )
822 .await
823 .unwrap();
824
825 use crate::sink::log_store::DeliveryFutureManager;
826
827 let mut future_manager = DeliveryFutureManager::new(usize::MAX);
828
829 for i in 0..10 {
830 println!("epoch: {}", i);
831 for j in 0..100 {
832 let mut writer = KafkaPayloadWriter {
833 inner: &sink.inner,
834 add_future: future_manager.start_write_chunk(i, j),
835 config: &sink.config,
836 };
837 match writer
838 .send_result(
839 FutureRecord::to(kafka_config.common.topic.as_str())
840 .payload(format!("value-{}", j).as_bytes())
841 .key(format!("dummy_key_for_epoch-{}", i).as_bytes()),
842 )
843 .await
844 {
845 Ok(_) => {}
846 Err(e) => {
847 println!("{:?}", e);
848 break;
849 }
850 };
851 }
852 }
853
854 Ok(())
855 }
856}