use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use futures::{Future, FutureExt, TryFuture};
use rdkafka::error::KafkaError;
use rdkafka::message::ToBytes;
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
use rdkafka::types::RDKafkaErrorCode;
use rdkafka::ClientConfig;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};
use thiserror_ext::AsReport;
use with_options::WithOptions;
use super::catalog::{SinkFormat, SinkFormatDesc};
use super::{Sink, SinkError, SinkParam};
use crate::connector_common::{
AwsAuthProps, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon,
};
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
use crate::sink::writer::{
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
};
use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
use crate::source::kafka::{
KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
};
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
use crate::{
deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
};
pub const KAFKA_SINK: &str = "kafka";
const fn _default_max_retries() -> u32 {
3
}
const fn _default_retry_backoff() -> Duration {
Duration::from_millis(100)
}
const fn _default_max_in_flight_requests_per_connection() -> usize {
5
}
#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
#[strum(serialize_all = "snake_case")]
pub enum CompressionCodec {
None,
Gzip,
Snappy,
Lz4,
Zstd,
}
#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct RdKafkaPropertiesProducer {
#[serde(rename = "properties.allow.auto.create.topics")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub allow_auto_create_topics: Option<bool>,
#[serde(rename = "properties.queue.buffering.max.messages")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub queue_buffering_max_messages: Option<usize>,
#[serde(rename = "properties.queue.buffering.max.kbytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
queue_buffering_max_kbytes: Option<usize>,
#[serde(rename = "properties.queue.buffering.max.ms")]
#[serde_as(as = "Option<DisplayFromStr>")]
queue_buffering_max_ms: Option<f64>,
#[serde(rename = "properties.enable.idempotence")]
#[serde_as(as = "Option<DisplayFromStr>")]
enable_idempotence: Option<bool>,
#[serde(rename = "properties.message.send.max.retries")]
#[serde_as(as = "Option<DisplayFromStr>")]
message_send_max_retries: Option<usize>,
#[serde(rename = "properties.retry.backoff.ms")]
#[serde_as(as = "Option<DisplayFromStr>")]
retry_backoff_ms: Option<usize>,
#[serde(rename = "properties.batch.num.messages")]
#[serde_as(as = "Option<DisplayFromStr>")]
batch_num_messages: Option<usize>,
#[serde(rename = "properties.batch.size")]
#[serde_as(as = "Option<DisplayFromStr>")]
batch_size: Option<usize>,
#[serde(rename = "properties.compression.codec")]
#[serde_as(as = "Option<DisplayFromStr>")]
compression_codec: Option<CompressionCodec>,
#[serde(rename = "properties.message.timeout.ms")]
#[serde_as(as = "Option<DisplayFromStr>")]
message_timeout_ms: Option<usize>,
#[serde(
rename = "properties.max.in.flight.requests.per.connection",
default = "_default_max_in_flight_requests_per_connection"
)]
#[serde_as(as = "DisplayFromStr")]
max_in_flight_requests_per_connection: usize,
#[serde(rename = "properties.request.required.acks")]
#[serde_as(as = "Option<DisplayFromStr>")]
request_required_acks: Option<i32>,
}
impl RdKafkaPropertiesProducer {
pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
if let Some(v) = self.allow_auto_create_topics {
c.set("allow.auto.create.topics", v.to_string());
}
if let Some(v) = self.queue_buffering_max_messages {
c.set("queue.buffering.max.messages", v.to_string());
}
if let Some(v) = self.queue_buffering_max_kbytes {
c.set("queue.buffering.max.kbytes", v.to_string());
}
if let Some(v) = self.queue_buffering_max_ms {
c.set("queue.buffering.max.ms", v.to_string());
}
if let Some(v) = self.enable_idempotence {
c.set("enable.idempotence", v.to_string());
}
if let Some(v) = self.message_send_max_retries {
c.set("message.send.max.retries", v.to_string());
}
if let Some(v) = self.retry_backoff_ms {
c.set("retry.backoff.ms", v.to_string());
}
if let Some(v) = self.batch_num_messages {
c.set("batch.num.messages", v.to_string());
}
if let Some(v) = self.batch_size {
c.set("batch.size", v.to_string());
}
if let Some(v) = &self.compression_codec {
c.set("compression.codec", v.to_string());
}
if let Some(v) = self.request_required_acks {
c.set("request.required.acks", v.to_string());
}
if let Some(v) = self.message_timeout_ms {
c.set("message.timeout.ms", v.to_string());
}
c.set(
"max.in.flight.requests.per.connection",
self.max_in_flight_requests_per_connection.to_string(),
);
}
}
#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaConfig {
#[serde(flatten)]
pub common: KafkaCommon,
#[serde(flatten)]
pub connection: KafkaConnection,
#[serde(
rename = "properties.retry.max",
default = "_default_max_retries",
deserialize_with = "deserialize_u32_from_string"
)]
pub max_retry_num: u32,
#[serde(
rename = "properties.retry.interval",
default = "_default_retry_backoff",
deserialize_with = "deserialize_duration_from_string"
)]
pub retry_interval: Duration,
pub primary_key: Option<String>,
#[serde(flatten)]
pub rdkafka_properties_common: RdKafkaPropertiesCommon,
#[serde(flatten)]
pub rdkafka_properties_producer: RdKafkaPropertiesProducer,
#[serde(flatten)]
pub privatelink_common: KafkaPrivateLinkCommon,
#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,
}
impl KafkaConfig {
pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
Ok(config)
}
pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
self.rdkafka_properties_common.set_client(c);
self.rdkafka_properties_producer.set_client(c);
}
}
impl From<KafkaConfig> for KafkaProperties {
fn from(val: KafkaConfig) -> Self {
KafkaProperties {
bytes_per_second: None,
max_num_messages: None,
scan_startup_mode: None,
time_offset: None,
upsert: None,
common: val.common,
connection: val.connection,
rdkafka_properties_common: val.rdkafka_properties_common,
rdkafka_properties_consumer: Default::default(),
privatelink_common: val.privatelink_common,
aws_auth_props: val.aws_auth_props,
group_id_prefix: None,
unknown_fields: Default::default(),
}
}
}
#[derive(Debug)]
pub struct KafkaSink {
pub config: KafkaConfig,
schema: Schema,
pk_indices: Vec<usize>,
format_desc: SinkFormatDesc,
db_name: String,
sink_from_name: String,
}
impl TryFrom<SinkParam> for KafkaSink {
type Error = SinkError;
fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = KafkaConfig::from_btreemap(param.properties)?;
Ok(Self {
config,
schema,
pk_indices: param.downstream_pk,
format_desc: param
.format_desc
.ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
db_name: param.db_name,
sink_from_name: param.sink_from_name,
})
}
}
impl Sink for KafkaSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = AsyncTruncateLogSinkerOf<KafkaSinkWriter>;
const SINK_NAME: &'static str = KAFKA_SINK;
async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let formatter = SinkFormatterImpl::new(
&self.format_desc,
self.schema.clone(),
self.pk_indices.clone(),
self.db_name.clone(),
self.sink_from_name.clone(),
&self.config.common.topic,
)
.await?;
let max_delivery_buffer_size = (self
.config
.rdkafka_properties_producer
.queue_buffering_max_messages
.as_ref()
.cloned()
.unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
* KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;
Ok(KafkaSinkWriter::new(self.config.clone(), formatter)
.await?
.into_log_sinker(max_delivery_buffer_size))
}
async fn validate(&self) -> Result<()> {
if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
"primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
self.format_desc.format
)));
}
SinkFormatterImpl::new(
&self.format_desc,
self.schema.clone(),
self.pk_indices.clone(),
self.db_name.clone(),
self.sink_from_name.clone(),
&self.config.common.topic,
)
.await?;
let check = KafkaSplitEnumerator::new(
KafkaProperties::from(self.config.clone()),
Arc::new(SourceEnumeratorContext::dummy()),
)
.await?;
if !check.check_reachability().await {
return Err(SinkError::Config(anyhow!(
"cannot connect to kafka broker ({})",
self.config.connection.brokers
)));
}
Ok(())
}
}
const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;
struct KafkaPayloadWriter<'a> {
inner: &'a FutureProducer<RwProducerContext>,
add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
config: &'a KafkaConfig,
}
mod opaque_type {
use super::*;
pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture {
future.map(KafkaPayloadWriter::<'static>::map_future_result)
}
}
use opaque_type::map_delivery_future;
pub use opaque_type::KafkaSinkDeliveryFuture;
pub struct KafkaSinkWriter {
formatter: SinkFormatterImpl,
inner: FutureProducer<RwProducerContext>,
config: KafkaConfig,
}
impl KafkaSinkWriter {
async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
let inner: FutureProducer<RwProducerContext> = {
let mut c = ClientConfig::new();
config.connection.set_security_properties(&mut c);
config.set_client(&mut c);
c.set("bootstrap.servers", &config.connection.brokers);
let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
let ctx_common = KafkaContextCommon::new(
broker_rewrite_map,
None,
None,
config.aws_auth_props.clone(),
config.connection.is_aws_msk_iam(),
)
.await?;
let producer_ctx = RwProducerContext::new(ctx_common);
c.create_with_context(producer_ctx).await?
};
Ok(KafkaSinkWriter {
formatter,
inner,
config: config.clone(),
})
}
}
impl AsyncTruncateSinkWriter for KafkaSinkWriter {
type DeliveryFuture = KafkaSinkDeliveryFuture;
async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
let mut payload_writer = KafkaPayloadWriter {
inner: &mut self.inner,
add_future,
config: &self.config,
};
dispatch_sink_formatter_impl!(&self.formatter, formatter, {
payload_writer.write_chunk(chunk, formatter).await
})
}
}
impl<'w> KafkaPayloadWriter<'w> {
async fn send_result<'a, K, P>(&'a mut self, mut record: FutureRecord<'a, K, P>) -> Result<()>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
let mut success_flag = false;
let mut ret = Ok(());
for i in 0..self.config.max_retry_num {
match self.inner.send_result(record) {
Ok(delivery_future) => {
if self
.add_future
.add_future_may_await(map_delivery_future(delivery_future))
.await?
{
tracing::warn!(
"Number of records being delivered ({}) >= expected kafka producer queue size ({}).
This indicates the default value of queue.buffering.max.messages has changed.",
self.add_future.future_count(),
self.add_future.max_future_count()
);
}
success_flag = true;
break;
}
Err((e, rec)) => {
tracing::warn!(
error = %e.as_report(),
"producing message (key {:?}) to topic {} failed",
rec.key.map(|k| k.to_bytes()),
rec.topic,
);
record = rec;
match e {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tracing::warn!(
"Producer queue full. Delivery future buffer size={}. Await and retry #{}",
self.add_future.future_count(),
i
);
self.add_future.await_one_delivery().await?;
continue;
}
_ => return Err(e.into()),
}
}
}
}
if !success_flag {
ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull).into());
}
ret
}
async fn write_inner(
&mut self,
event_key_object: Option<Vec<u8>>,
event_object: Option<Vec<u8>>,
) -> Result<()> {
let topic = self.config.common.topic.clone();
let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str());
if let Some(key_str) = &event_key_object {
record = record.key(key_str);
}
if let Some(payload) = &event_object {
record = record.payload(payload);
}
self.send_result(record).await?;
Ok(())
}
fn map_future_result(delivery_future_result: <DeliveryFuture as Future>::Output) -> Result<()> {
match delivery_future_result {
Ok(Ok(_)) => Ok(()),
Ok(Err((k_err, _msg))) => Err(k_err.into()),
Err(_) => Err(KafkaError::Canceled.into()),
}
}
}
impl<'a> FormattedSink for KafkaPayloadWriter<'a> {
type K = Vec<u8>;
type V = Vec<u8>;
async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
self.write_inner(k, v).await
}
}
#[cfg(test)]
mod test {
use maplit::btreemap;
use risingwave_common::catalog::Field;
use risingwave_common::types::DataType;
use super::*;
use crate::sink::encoder::{
DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use crate::sink::formatter::AppendOnlyFormatter;
#[test]
fn parse_rdkafka_props() {
let props: BTreeMap<String, String> = btreemap! {
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"properties.message.max.bytes".to_string() => "12345".to_string(),
"properties.receive.message.max.bytes".to_string() => "54321".to_string(),
"properties.queue.buffering.max.messages".to_string() => "114514".to_string(),
"properties.queue.buffering.max.kbytes".to_string() => "114514".to_string(),
"properties.queue.buffering.max.ms".to_string() => "114.514".to_string(),
"properties.enable.idempotence".to_string() => "false".to_string(),
"properties.message.send.max.retries".to_string() => "114514".to_string(),
"properties.retry.backoff.ms".to_string() => "114514".to_string(),
"properties.batch.num.messages".to_string() => "114514".to_string(),
"properties.batch.size".to_string() => "114514".to_string(),
"properties.compression.codec".to_string() => "zstd".to_string(),
"properties.message.timeout.ms".to_string() => "114514".to_string(),
"properties.max.in.flight.requests.per.connection".to_string() => "114514".to_string(),
"properties.request.required.acks".to_string() => "-1".to_string(),
};
let c = KafkaConfig::from_btreemap(props).unwrap();
assert_eq!(
c.rdkafka_properties_producer.queue_buffering_max_ms,
Some(114.514f64)
);
assert_eq!(
c.rdkafka_properties_producer.compression_codec,
Some(CompressionCodec::Zstd)
);
assert_eq!(
c.rdkafka_properties_producer.message_timeout_ms,
Some(114514)
);
assert_eq!(
c.rdkafka_properties_producer
.max_in_flight_requests_per_connection,
114514
);
assert_eq!(
c.rdkafka_properties_producer.request_required_acks,
Some(-1)
);
let props: BTreeMap<String, String> = btreemap! {
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),
"properties.enable.idempotence".to_string() => "True".to_string(), };
assert!(KafkaConfig::from_btreemap(props).is_err());
let props: BTreeMap<String, String> = btreemap! {
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),
"properties.queue.buffering.max.kbytes".to_string() => "-114514".to_string(), };
assert!(KafkaConfig::from_btreemap(props).is_err());
let props: BTreeMap<String, String> = btreemap! {
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),
"properties.compression.codec".to_string() => "notvalid".to_string(), };
assert!(KafkaConfig::from_btreemap(props).is_err());
}
#[test]
fn parse_kafka_config() {
let properties: BTreeMap<String, String> = btreemap! {
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"properties.security.protocol".to_string() => "SASL".to_string(),
"properties.sasl.mechanism".to_string() => "SASL".to_string(),
"properties.sasl.username".to_string() => "test".to_string(),
"properties.sasl.password".to_string() => "test".to_string(),
"properties.retry.max".to_string() => "20".to_string(),
"properties.retry.interval".to_string() => "500ms".to_string(),
"broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(),
};
let config = KafkaConfig::from_btreemap(properties).unwrap();
assert_eq!(config.connection.brokers, "localhost:9092");
assert_eq!(config.common.topic, "test");
assert_eq!(config.max_retry_num, 20);
assert_eq!(config.retry_interval, Duration::from_millis(500));
let btreemap: BTreeMap<String, String> = btreemap! {
"broker1".to_string() => "10.0.0.1:8001".to_string()
};
assert_eq!(config.privatelink_common.broker_rewrite_map, Some(btreemap));
let properties: BTreeMap<String, String> = btreemap! {
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
};
let config = KafkaConfig::from_btreemap(properties).unwrap();
assert_eq!(config.max_retry_num, 3);
assert_eq!(config.retry_interval, Duration::from_millis(100));
let properties: BTreeMap<String, String> = btreemap! {
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "upsert".to_string(),
"properties.retry.max".to_string() => "-20".to_string(), };
assert!(KafkaConfig::from_btreemap(properties).is_err());
let properties: BTreeMap<String, String> = btreemap! {
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "upsert".to_string(),
"properties.retry.interval".to_string() => "500minutes".to_string(), };
assert!(KafkaConfig::from_btreemap(properties).is_err());
}
#[ignore]
#[tokio::test]
async fn test_kafka_producer() -> Result<()> {
let properties = btreemap! {
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:29092".to_string(),
"type".to_string() => "append-only".to_string(),
"topic".to_string() => "test_topic".to_string(),
"properties.compression.codec".to_string() => "zstd".to_string(),
};
let schema = Schema::new(vec![
Field {
data_type: DataType::Int32,
name: "id".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: DataType::Varchar,
name: "v2".into(),
sub_fields: vec![],
type_name: "".into(),
},
]);
let kafka_config = KafkaConfig::from_btreemap(properties)?;
let sink = KafkaSinkWriter::new(
kafka_config.clone(),
SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(
None,
JsonEncoder::new(
schema,
None,
DateHandlingMode::FromCe,
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
JsonbHandlingMode::String,
),
)),
)
.await
.unwrap();
use crate::sink::log_store::DeliveryFutureManager;
let mut future_manager = DeliveryFutureManager::new(usize::MAX);
for i in 0..10 {
println!("epoch: {}", i);
for j in 0..100 {
let mut writer = KafkaPayloadWriter {
inner: &sink.inner,
add_future: future_manager.start_write_chunk(i, j),
config: &sink.config,
};
match writer
.send_result(
FutureRecord::to(kafka_config.common.topic.as_str())
.payload(format!("value-{}", j).as_bytes())
.key(format!("dummy_key_for_epoch-{}", i).as_bytes()),
)
.await
{
Ok(_) => {}
Err(e) => {
println!("{:?}", e);
break;
}
};
}
}
Ok(())
}
}