1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::{Future, FutureExt, TryFuture};
22use rdkafka::ClientConfig;
23use rdkafka::error::KafkaError;
24use rdkafka::message::ToBytes;
25use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
26use rdkafka::types::RDKafkaErrorCode;
27use risingwave_common::array::StreamChunk;
28use risingwave_common::catalog::Schema;
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use strum_macros::{Display, EnumString};
32use thiserror_ext::AsReport;
33use with_options::WithOptions;
34
35use super::catalog::{SinkFormat, SinkFormatDesc};
36use super::{Sink, SinkError, SinkParam};
37use crate::connector_common::{
38 AwsAuthProps, KafkaCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
39 RdKafkaPropertiesCommon,
40};
41use crate::sink::formatter::SinkFormatterImpl;
42use crate::sink::log_store::DeliveryFutureManagerAddFuture;
43use crate::sink::writer::{
44 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
45};
46use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
47use crate::source::kafka::{
48 KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
49};
50use crate::source::{SourceEnumeratorContext, SplitEnumerator};
51use crate::{
52 deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
53};
54
55pub const KAFKA_SINK: &str = "kafka";
56
57const fn _default_max_retries() -> u32 {
58 3
59}
60
61const fn _default_retry_backoff() -> Duration {
62 Duration::from_millis(100)
63}
64
65const fn _default_max_in_flight_requests_per_connection() -> usize {
66 5
67}
68
69#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
70#[strum(serialize_all = "snake_case")]
71pub enum CompressionCodec {
72 None,
73 Gzip,
74 Snappy,
75 Lz4,
76 Zstd,
77}
78
79#[serde_as]
82#[derive(Debug, Clone, Deserialize, WithOptions)]
83pub struct RdKafkaPropertiesProducer {
84 #[serde(rename = "properties.allow.auto.create.topics")]
86 #[serde_as(as = "Option<DisplayFromStr>")]
87 pub allow_auto_create_topics: Option<bool>,
88
89 #[serde(rename = "properties.queue.buffering.max.messages")]
92 #[serde_as(as = "Option<DisplayFromStr>")]
93 pub queue_buffering_max_messages: Option<usize>,
94
95 #[serde(rename = "properties.queue.buffering.max.kbytes")]
98 #[serde_as(as = "Option<DisplayFromStr>")]
99 queue_buffering_max_kbytes: Option<usize>,
100
101 #[serde(rename = "properties.queue.buffering.max.ms")]
106 #[serde_as(as = "Option<DisplayFromStr>")]
107 queue_buffering_max_ms: Option<f64>,
108
109 #[serde(rename = "properties.enable.idempotence")]
116 #[serde_as(as = "Option<DisplayFromStr>")]
117 enable_idempotence: Option<bool>,
118
119 #[serde(rename = "properties.message.send.max.retries")]
121 #[serde_as(as = "Option<DisplayFromStr>")]
122 message_send_max_retries: Option<usize>,
123
124 #[serde(rename = "properties.retry.backoff.ms")]
126 #[serde_as(as = "Option<DisplayFromStr>")]
127 retry_backoff_ms: Option<usize>,
128
129 #[serde(rename = "properties.batch.num.messages")]
131 #[serde_as(as = "Option<DisplayFromStr>")]
132 batch_num_messages: Option<usize>,
133
134 #[serde(rename = "properties.batch.size")]
139 #[serde_as(as = "Option<DisplayFromStr>")]
140 batch_size: Option<usize>,
141
142 #[serde(rename = "properties.compression.codec")]
144 #[serde_as(as = "Option<DisplayFromStr>")]
145 compression_codec: Option<CompressionCodec>,
146
147 #[serde(rename = "properties.message.timeout.ms")]
151 #[serde_as(as = "Option<DisplayFromStr>")]
152 message_timeout_ms: Option<usize>,
153
154 #[serde(
156 rename = "properties.max.in.flight.requests.per.connection",
157 default = "_default_max_in_flight_requests_per_connection"
158 )]
159 #[serde_as(as = "DisplayFromStr")]
160 max_in_flight_requests_per_connection: usize,
161
162 #[serde(rename = "properties.request.required.acks")]
163 #[serde_as(as = "Option<DisplayFromStr>")]
164 request_required_acks: Option<i32>,
165}
166
167impl RdKafkaPropertiesProducer {
168 pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
169 if let Some(v) = self.allow_auto_create_topics {
170 c.set("allow.auto.create.topics", v.to_string());
171 }
172 if let Some(v) = self.queue_buffering_max_messages {
173 c.set("queue.buffering.max.messages", v.to_string());
174 }
175 if let Some(v) = self.queue_buffering_max_kbytes {
176 c.set("queue.buffering.max.kbytes", v.to_string());
177 }
178 if let Some(v) = self.queue_buffering_max_ms {
179 c.set("queue.buffering.max.ms", v.to_string());
180 }
181 if let Some(v) = self.enable_idempotence {
182 c.set("enable.idempotence", v.to_string());
183 }
184 if let Some(v) = self.message_send_max_retries {
185 c.set("message.send.max.retries", v.to_string());
186 }
187 if let Some(v) = self.retry_backoff_ms {
188 c.set("retry.backoff.ms", v.to_string());
189 }
190 if let Some(v) = self.batch_num_messages {
191 c.set("batch.num.messages", v.to_string());
192 }
193 if let Some(v) = self.batch_size {
194 c.set("batch.size", v.to_string());
195 }
196 if let Some(v) = &self.compression_codec {
197 c.set("compression.codec", v.to_string());
198 }
199 if let Some(v) = self.request_required_acks {
200 c.set("request.required.acks", v.to_string());
201 }
202 if let Some(v) = self.message_timeout_ms {
203 c.set("message.timeout.ms", v.to_string());
204 }
205 c.set(
206 "max.in.flight.requests.per.connection",
207 self.max_in_flight_requests_per_connection.to_string(),
208 );
209 }
210}
211
212#[serde_as]
213#[derive(Debug, Clone, Deserialize, WithOptions)]
214pub struct KafkaConfig {
215 #[serde(flatten)]
216 pub common: KafkaCommon,
217
218 #[serde(flatten)]
219 pub connection: KafkaConnectionProps,
220
221 #[serde(
222 rename = "properties.retry.max",
223 default = "_default_max_retries",
224 deserialize_with = "deserialize_u32_from_string"
225 )]
226 pub max_retry_num: u32,
227
228 #[serde(
229 rename = "properties.retry.interval",
230 default = "_default_retry_backoff",
231 deserialize_with = "deserialize_duration_from_string"
232 )]
233 pub retry_interval: Duration,
234
235 pub primary_key: Option<String>,
239
240 #[serde(flatten)]
241 pub rdkafka_properties_common: RdKafkaPropertiesCommon,
242
243 #[serde(flatten)]
244 pub rdkafka_properties_producer: RdKafkaPropertiesProducer,
245
246 #[serde(flatten)]
247 pub privatelink_common: KafkaPrivateLinkCommon,
248
249 #[serde(flatten)]
250 pub aws_auth_props: AwsAuthProps,
251}
252
253impl KafkaConfig {
254 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
255 let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
256 .map_err(|e| SinkError::Config(anyhow!(e)))?;
257
258 Ok(config)
259 }
260
261 pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
262 self.rdkafka_properties_common.set_client(c);
263 self.rdkafka_properties_producer.set_client(c);
264 }
265}
266
267impl From<KafkaConfig> for KafkaProperties {
268 fn from(val: KafkaConfig) -> Self {
269 KafkaProperties {
270 bytes_per_second: None,
271 max_num_messages: None,
272 scan_startup_mode: None,
273 time_offset: None,
274 upsert: None,
275 common: val.common,
276 connection: val.connection,
277 rdkafka_properties_common: val.rdkafka_properties_common,
278 rdkafka_properties_consumer: Default::default(),
279 privatelink_common: val.privatelink_common,
280 aws_auth_props: val.aws_auth_props,
281 group_id_prefix: None,
282 unknown_fields: Default::default(),
283 }
284 }
285}
286
287#[derive(Debug)]
288pub struct KafkaSink {
289 pub config: KafkaConfig,
290 schema: Schema,
291 pk_indices: Vec<usize>,
292 format_desc: SinkFormatDesc,
293 db_name: String,
294 sink_from_name: String,
295}
296
297impl TryFrom<SinkParam> for KafkaSink {
298 type Error = SinkError;
299
300 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
301 let schema = param.schema();
302 let config = KafkaConfig::from_btreemap(param.properties)?;
303 Ok(Self {
304 config,
305 schema,
306 pk_indices: param.downstream_pk,
307 format_desc: param
308 .format_desc
309 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
310 db_name: param.db_name,
311 sink_from_name: param.sink_from_name,
312 })
313 }
314}
315
316impl Sink for KafkaSink {
317 type Coordinator = DummySinkCommitCoordinator;
318 type LogSinker = AsyncTruncateLogSinkerOf<KafkaSinkWriter>;
319
320 const SINK_NAME: &'static str = KAFKA_SINK;
321
322 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
323 let formatter = SinkFormatterImpl::new(
324 &self.format_desc,
325 self.schema.clone(),
326 self.pk_indices.clone(),
327 self.db_name.clone(),
328 self.sink_from_name.clone(),
329 &self.config.common.topic,
330 )
331 .await?;
332 let max_delivery_buffer_size = (self
333 .config
334 .rdkafka_properties_producer
335 .queue_buffering_max_messages
336 .as_ref()
337 .cloned()
338 .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
339 * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;
340
341 Ok(KafkaSinkWriter::new(self.config.clone(), formatter)
342 .await?
343 .into_log_sinker(max_delivery_buffer_size))
344 }
345
346 async fn validate(&self) -> Result<()> {
347 if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
349 return Err(SinkError::Config(anyhow!(
350 "primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
351 self.format_desc.format
352 )));
353 }
354 SinkFormatterImpl::new(
356 &self.format_desc,
357 self.schema.clone(),
358 self.pk_indices.clone(),
359 self.db_name.clone(),
360 self.sink_from_name.clone(),
361 &self.config.common.topic,
362 )
363 .await?;
364
365 let check = KafkaSplitEnumerator::new(
369 KafkaProperties::from(self.config.clone()),
370 Arc::new(SourceEnumeratorContext::dummy()),
371 )
372 .await?;
373 if !check.check_reachability().await {
374 return Err(SinkError::Config(anyhow!(
375 "cannot connect to kafka broker ({})",
376 self.config.connection.brokers
377 )));
378 }
379 Ok(())
380 }
381}
382
383const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
387const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;
391
392struct KafkaPayloadWriter<'a> {
393 inner: &'a FutureProducer<RwProducerContext>,
394 add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
395 config: &'a KafkaConfig,
396}
397
398mod opaque_type {
399 use super::*;
400 pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
401
402 pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
403 future.map(KafkaPayloadWriter::<'static>::map_future_result)
404 }
405}
406pub use opaque_type::KafkaSinkDeliveryFuture;
407use opaque_type::map_delivery_future;
408
409pub struct KafkaSinkWriter {
410 formatter: SinkFormatterImpl,
411 inner: FutureProducer<RwProducerContext>,
412 config: KafkaConfig,
413}
414
415impl KafkaSinkWriter {
416 async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
417 let inner: FutureProducer<RwProducerContext> = {
418 let mut c = ClientConfig::new();
419
420 config.connection.set_security_properties(&mut c);
422 config.set_client(&mut c);
423
424 c.set("bootstrap.servers", &config.connection.brokers);
426
427 let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
429 let ctx_common = KafkaContextCommon::new(
430 broker_rewrite_map,
431 None,
432 None,
433 config.aws_auth_props.clone(),
434 config.connection.is_aws_msk_iam(),
435 )
436 .await?;
437 let producer_ctx = RwProducerContext::new(ctx_common);
438 c.create_with_context(producer_ctx).await?
440 };
441
442 Ok(KafkaSinkWriter {
443 formatter,
444 inner,
445 config: config.clone(),
446 })
447 }
448}
449
450impl AsyncTruncateSinkWriter for KafkaSinkWriter {
451 type DeliveryFuture = KafkaSinkDeliveryFuture;
452
453 async fn write_chunk<'a>(
454 &'a mut self,
455 chunk: StreamChunk,
456 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
457 ) -> Result<()> {
458 let mut payload_writer = KafkaPayloadWriter {
459 inner: &mut self.inner,
460 add_future,
461 config: &self.config,
462 };
463 dispatch_sink_formatter_impl!(&self.formatter, formatter, {
464 payload_writer.write_chunk(chunk, formatter).await
465 })
466 }
467}
468
469impl KafkaPayloadWriter<'_> {
470 async fn send_result<'a, K, P>(&'a mut self, mut record: FutureRecord<'a, K, P>) -> Result<()>
473 where
474 K: ToBytes + ?Sized,
475 P: ToBytes + ?Sized,
476 {
477 let mut success_flag = false;
478
479 let mut ret = Ok(());
480
481 for i in 0..self.config.max_retry_num {
482 match self.inner.send_result(record) {
483 Ok(delivery_future) => {
484 if self
485 .add_future
486 .add_future_may_await(map_delivery_future(delivery_future))
487 .await?
488 {
489 tracing::warn!(
490 "Number of records being delivered ({}) >= expected kafka producer queue size ({}).
491 This indicates the default value of queue.buffering.max.messages has changed.",
492 self.add_future.future_count(),
493 self.add_future.max_future_count()
494 );
495 }
496 success_flag = true;
497 break;
498 }
499 Err((e, rec)) => {
502 tracing::warn!(
503 error = %e.as_report(),
504 "producing message (key {:?}) to topic {} failed",
505 rec.key.map(|k| k.to_bytes()),
506 rec.topic,
507 );
508 record = rec;
509 match e {
510 KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
511 tracing::warn!(
512 "Producer queue full. Delivery future buffer size={}. Await and retry #{}",
513 self.add_future.future_count(),
514 i
515 );
516 self.add_future.await_one_delivery().await?;
517 continue;
518 }
519 _ => return Err(e.into()),
520 }
521 }
522 }
523 }
524
525 if !success_flag {
526 ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull).into());
529 }
530
531 ret
532 }
533
534 async fn write_inner(
535 &mut self,
536 event_key_object: Option<Vec<u8>>,
537 event_object: Option<Vec<u8>>,
538 ) -> Result<()> {
539 let topic = self.config.common.topic.clone();
540 let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str());
541 if let Some(key_str) = &event_key_object {
542 record = record.key(key_str);
543 }
544 if let Some(payload) = &event_object {
545 record = record.payload(payload);
546 }
547 self.send_result(record).await?;
550 Ok(())
551 }
552
553 fn map_future_result(delivery_future_result: <DeliveryFuture as Future>::Output) -> Result<()> {
554 match delivery_future_result {
555 Ok(Ok(_)) => Ok(()),
559 Ok(Err((k_err, _msg))) => Err(k_err.into()),
565 Err(_) => Err(KafkaError::Canceled.into()),
569 }
570 }
571}
572
573impl FormattedSink for KafkaPayloadWriter<'_> {
574 type K = Vec<u8>;
575 type V = Vec<u8>;
576
577 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
578 self.write_inner(k, v).await
579 }
580}
581
582#[cfg(test)]
583mod test {
584 use maplit::btreemap;
585 use risingwave_common::catalog::Field;
586 use risingwave_common::types::DataType;
587
588 use super::*;
589 use crate::sink::encoder::{
590 DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode,
591 TimestamptzHandlingMode,
592 };
593 use crate::sink::formatter::AppendOnlyFormatter;
594
595 #[test]
596 fn parse_rdkafka_props() {
597 let props: BTreeMap<String, String> = btreemap! {
598 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
601 "topic".to_owned() => "test".to_owned(),
602 "properties.message.max.bytes".to_owned() => "12345".to_owned(),
605 "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
606 "properties.queue.buffering.max.messages".to_owned() => "114514".to_owned(),
608 "properties.queue.buffering.max.kbytes".to_owned() => "114514".to_owned(),
609 "properties.queue.buffering.max.ms".to_owned() => "114.514".to_owned(),
610 "properties.enable.idempotence".to_owned() => "false".to_owned(),
611 "properties.message.send.max.retries".to_owned() => "114514".to_owned(),
612 "properties.retry.backoff.ms".to_owned() => "114514".to_owned(),
613 "properties.batch.num.messages".to_owned() => "114514".to_owned(),
614 "properties.batch.size".to_owned() => "114514".to_owned(),
615 "properties.compression.codec".to_owned() => "zstd".to_owned(),
616 "properties.message.timeout.ms".to_owned() => "114514".to_owned(),
617 "properties.max.in.flight.requests.per.connection".to_owned() => "114514".to_owned(),
618 "properties.request.required.acks".to_owned() => "-1".to_owned(),
619 };
620 let c = KafkaConfig::from_btreemap(props).unwrap();
621 assert_eq!(
622 c.rdkafka_properties_producer.queue_buffering_max_ms,
623 Some(114.514f64)
624 );
625 assert_eq!(
626 c.rdkafka_properties_producer.compression_codec,
627 Some(CompressionCodec::Zstd)
628 );
629 assert_eq!(
630 c.rdkafka_properties_producer.message_timeout_ms,
631 Some(114514)
632 );
633 assert_eq!(
634 c.rdkafka_properties_producer
635 .max_in_flight_requests_per_connection,
636 114514
637 );
638 assert_eq!(
639 c.rdkafka_properties_producer.request_required_acks,
640 Some(-1)
641 );
642
643 let props: BTreeMap<String, String> = btreemap! {
644 "connector".to_owned() => "kafka".to_owned(),
646 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
647 "topic".to_owned() => "test".to_owned(),
648 "type".to_owned() => "append-only".to_owned(),
649
650 "properties.enable.idempotence".to_owned() => "True".to_owned(), };
652 assert!(KafkaConfig::from_btreemap(props).is_err());
653
654 let props: BTreeMap<String, String> = btreemap! {
655 "connector".to_owned() => "kafka".to_owned(),
657 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
658 "topic".to_owned() => "test".to_owned(),
659 "type".to_owned() => "append-only".to_owned(),
660 "properties.queue.buffering.max.kbytes".to_owned() => "-114514".to_owned(), };
662 assert!(KafkaConfig::from_btreemap(props).is_err());
663
664 let props: BTreeMap<String, String> = btreemap! {
665 "connector".to_owned() => "kafka".to_owned(),
667 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
668 "topic".to_owned() => "test".to_owned(),
669 "type".to_owned() => "append-only".to_owned(),
670 "properties.compression.codec".to_owned() => "notvalid".to_owned(), };
672 assert!(KafkaConfig::from_btreemap(props).is_err());
673 }
674
675 #[test]
676 fn parse_kafka_config() {
677 let properties: BTreeMap<String, String> = btreemap! {
678 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
680 "topic".to_owned() => "test".to_owned(),
681 "properties.security.protocol".to_owned() => "SASL".to_owned(),
684 "properties.sasl.mechanism".to_owned() => "SASL".to_owned(),
685 "properties.sasl.username".to_owned() => "test".to_owned(),
686 "properties.sasl.password".to_owned() => "test".to_owned(),
687 "properties.retry.max".to_owned() => "20".to_owned(),
688 "properties.retry.interval".to_owned() => "500ms".to_owned(),
689 "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
691 };
692 let config = KafkaConfig::from_btreemap(properties).unwrap();
693 assert_eq!(config.connection.brokers, "localhost:9092");
694 assert_eq!(config.common.topic, "test");
695 assert_eq!(config.max_retry_num, 20);
696 assert_eq!(config.retry_interval, Duration::from_millis(500));
697
698 let btreemap: BTreeMap<String, String> = btreemap! {
700 "broker1".to_owned() => "10.0.0.1:8001".to_owned()
701 };
702 assert_eq!(config.privatelink_common.broker_rewrite_map, Some(btreemap));
703
704 let properties: BTreeMap<String, String> = btreemap! {
706 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
708 "topic".to_owned() => "test".to_owned(),
709 };
711 let config = KafkaConfig::from_btreemap(properties).unwrap();
712 assert_eq!(config.max_retry_num, 3);
713 assert_eq!(config.retry_interval, Duration::from_millis(100));
714
715 let properties: BTreeMap<String, String> = btreemap! {
717 "connector".to_owned() => "kafka".to_owned(),
718 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
719 "topic".to_owned() => "test".to_owned(),
720 "type".to_owned() => "upsert".to_owned(),
721 "properties.retry.max".to_owned() => "-20".to_owned(), };
723 assert!(KafkaConfig::from_btreemap(properties).is_err());
724
725 let properties: BTreeMap<String, String> = btreemap! {
727 "connector".to_owned() => "kafka".to_owned(),
728 "properties.bootstrap.server".to_owned() => "localhost:9092".to_owned(),
729 "topic".to_owned() => "test".to_owned(),
730 "type".to_owned() => "upsert".to_owned(),
731 "properties.retry.interval".to_owned() => "500miiinutes".to_owned(), };
733 assert!(KafkaConfig::from_btreemap(properties).is_err());
734 }
735
736 #[ignore]
739 #[tokio::test]
740 async fn test_kafka_producer() -> Result<()> {
741 let properties = btreemap! {
743 "connector".to_owned() => "kafka".to_owned(),
744 "properties.bootstrap.server".to_owned() => "localhost:29092".to_owned(),
745 "type".to_owned() => "append-only".to_owned(),
746 "topic".to_owned() => "test_topic".to_owned(),
747 "properties.compression.codec".to_owned() => "zstd".to_owned(),
748 };
749
750 let schema = Schema::new(vec![
752 Field {
753 data_type: DataType::Int32,
754 name: "id".into(),
755 },
756 Field {
757 data_type: DataType::Varchar,
758 name: "v2".into(),
759 },
760 ]);
761
762 let kafka_config = KafkaConfig::from_btreemap(properties)?;
763
764 let sink = KafkaSinkWriter::new(
766 kafka_config.clone(),
767 SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
768 None,
770 JsonEncoder::new(
771 schema,
772 None,
773 DateHandlingMode::FromCe,
774 TimestampHandlingMode::Milli,
775 TimestamptzHandlingMode::UtcString,
776 TimeHandlingMode::Milli,
777 JsonbHandlingMode::String,
778 ),
779 )),
780 )
781 .await
782 .unwrap();
783
784 use crate::sink::log_store::DeliveryFutureManager;
785
786 let mut future_manager = DeliveryFutureManager::new(usize::MAX);
787
788 for i in 0..10 {
789 println!("epoch: {}", i);
790 for j in 0..100 {
791 let mut writer = KafkaPayloadWriter {
792 inner: &sink.inner,
793 add_future: future_manager.start_write_chunk(i, j),
794 config: &sink.config,
795 };
796 match writer
797 .send_result(
798 FutureRecord::to(kafka_config.common.topic.as_str())
799 .payload(format!("value-{}", j).as_bytes())
800 .key(format!("dummy_key_for_epoch-{}", i).as_bytes()),
801 )
802 .await
803 {
804 Ok(_) => {}
805 Err(e) => {
806 println!("{:?}", e);
807 break;
808 }
809 };
810 }
811 }
812
813 Ok(())
814 }
815}