risingwave_connector/source/kafka/
mod.rs1use std::collections::HashMap;
16
17use serde::Deserialize;
18use serde_with::{DisplayFromStr, serde_as};
19
20use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
21
22mod client_context;
23pub mod enumerator;
24pub mod private_link;
25pub mod source;
26pub mod split;
27pub mod stats;
28
29pub use client_context::*;
30pub use enumerator::*;
31pub use source::*;
32pub use split::*;
33use with_options::WithOptions;
34
35use crate::connector_common::{KafkaCommon, RdKafkaPropertiesCommon};
36use crate::source::SourceProperties;
37
38pub const KAFKA_CONNECTOR: &str = "kafka";
39pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
40pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
41pub const PRIVATELINK_CONNECTION: &str = "privatelink";
42
43#[serde_as]
48#[derive(Clone, Debug, Deserialize, Default, WithOptions)]
49pub struct RdKafkaPropertiesConsumer {
50 #[serde(rename = "properties.queued.min.messages")]
53 #[serde_as(as = "Option<DisplayFromStr>")]
54 pub queued_min_messages: Option<usize>,
55
56 #[serde(rename = "properties.queued.max.messages.kbytes")]
57 #[serde_as(as = "Option<DisplayFromStr>")]
58 pub queued_max_messages_kbytes: Option<usize>,
59
60 #[serde(rename = "properties.fetch.wait.max.ms")]
63 #[serde_as(as = "Option<DisplayFromStr>")]
64 pub fetch_wait_max_ms: Option<usize>,
65
66 #[serde(rename = "properties.fetch.min.bytes")]
68 #[serde_as(as = "Option<DisplayFromStr>")]
69 pub fetch_min_bytes: Option<usize>,
70
71 #[serde(rename = "properties.fetch.message.max.bytes")]
73 #[serde_as(as = "Option<DisplayFromStr>")]
74 pub fetch_message_max_bytes: Option<usize>,
75
76 #[serde(rename = "properties.fetch.queue.backoff.ms")]
82 #[serde_as(as = "Option<DisplayFromStr>")]
83 pub fetch_queue_backoff_ms: Option<usize>,
84
85 #[serde(rename = "properties.fetch.max.bytes")]
93 #[serde_as(as = "Option<DisplayFromStr>")]
94 pub fetch_max_bytes: Option<usize>,
95
96 #[serde(rename = "properties.enable.auto.commit")]
103 #[serde_as(as = "Option<DisplayFromStr>")]
104 pub enable_auto_commit: Option<bool>,
105}
106
107#[derive(Clone, Debug, Deserialize, WithOptions)]
108pub struct KafkaProperties {
109 #[serde(rename = "bytes.per.second", alias = "kafka.bytes.per.second")]
114 pub bytes_per_second: Option<String>,
115
116 #[serde(rename = "max.num.messages", alias = "kafka.max.num.messages")]
121 pub max_num_messages: Option<String>,
122
123 #[serde(rename = "scan.startup.mode", alias = "kafka.scan.startup.mode")]
124 pub scan_startup_mode: Option<String>,
125
126 #[serde(
127 rename = "scan.startup.timestamp.millis",
128 alias = "kafka.time.offset",
129 alias = "scan.startup.timestamp_millis" )]
131 pub time_offset: Option<String>,
132
133 #[serde(rename = "group.id.prefix")]
145 pub group_id_prefix: Option<String>,
146
147 #[serde(rename = "upsert")]
151 pub upsert: Option<String>,
152
153 #[serde(flatten)]
154 pub common: KafkaCommon,
155
156 #[serde(flatten)]
157 pub connection: KafkaConnectionProps,
158
159 #[serde(flatten)]
160 pub rdkafka_properties_common: RdKafkaPropertiesCommon,
161
162 #[serde(flatten)]
163 pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,
164
165 #[serde(flatten)]
166 pub privatelink_common: KafkaPrivateLinkCommon,
167
168 #[serde(flatten)]
169 pub aws_auth_props: AwsAuthProps,
170
171 #[serde(flatten)]
172 pub unknown_fields: HashMap<String, String>,
173}
174
175impl SourceProperties for KafkaProperties {
176 type Split = KafkaSplit;
177 type SplitEnumerator = KafkaSplitEnumerator;
178 type SplitReader = KafkaSplitReader;
179
180 const SOURCE_NAME: &'static str = KAFKA_CONNECTOR;
181}
182
183impl crate::source::UnknownFields for KafkaProperties {
184 fn unknown_fields(&self) -> HashMap<String, String> {
185 self.unknown_fields.clone()
186 }
187}
188
189impl KafkaProperties {
190 pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
191 self.rdkafka_properties_common.set_client(c);
192 self.rdkafka_properties_consumer.set_client(c);
193 }
194
195 pub fn group_id(&self, fragment_id: u32) -> String {
196 format!(
197 "{}-{}",
198 self.group_id_prefix.as_deref().unwrap_or("rw-consumer"),
199 fragment_id
200 )
201 }
202}
203
204const KAFKA_ISOLATION_LEVEL: &str = "read_committed";
205
206impl RdKafkaPropertiesConsumer {
207 pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
208 if let Some(v) = &self.queued_min_messages {
209 c.set("queued.min.messages", v.to_string());
210 }
211 if let Some(v) = &self.queued_max_messages_kbytes {
212 c.set("queued.max.messages.kbytes", v.to_string());
213 }
214 if let Some(v) = &self.fetch_wait_max_ms {
215 c.set("fetch.wait.max.ms", v.to_string());
216 }
217 if let Some(v) = &self.fetch_min_bytes {
218 c.set("fetch.min.bytes", v.to_string());
219 }
220 if let Some(v) = &self.fetch_message_max_bytes {
221 c.set("fetch.message.max.bytes", v.to_string());
222 }
223 if let Some(v) = &self.fetch_queue_backoff_ms {
224 c.set("fetch.queue.backoff.ms", v.to_string());
225 }
226 if let Some(v) = &self.fetch_max_bytes {
227 c.set("fetch.max.bytes", v.to_string());
228 }
229 if let Some(v) = &self.enable_auto_commit {
230 c.set("enable.auto.commit", v.to_string());
231 }
232 }
233}
234
235#[cfg(test)]
236mod test {
237 use std::collections::BTreeMap;
238
239 use maplit::btreemap;
240
241 use super::*;
242
243 #[test]
244 fn test_parse_config_consumer_common() {
245 let config: BTreeMap<String, String> = btreemap! {
246 "properties.bootstrap.server".to_owned() => "127.0.0.1:9092".to_owned(),
248 "topic".to_owned() => "test".to_owned(),
249 "scan.startup.mode".to_owned() => "earliest".to_owned(),
251 "properties.message.max.bytes".to_owned() => "12345".to_owned(),
253 "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
254 "properties.queued.min.messages".to_owned() => "114514".to_owned(),
256 "properties.queued.max.messages.kbytes".to_owned() => "114514".to_owned(),
257 "properties.fetch.wait.max.ms".to_owned() => "114514".to_owned(),
258 "properties.fetch.max.bytes".to_owned() => "114514".to_owned(),
259 "properties.enable.auto.commit".to_owned() => "true".to_owned(),
260 "properties.fetch.queue.backoff.ms".to_owned() => "114514".to_owned(),
261 "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
263 };
264
265 let props: KafkaProperties =
266 serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap();
267
268 assert_eq!(props.scan_startup_mode, Some("earliest".to_owned()));
269 assert_eq!(
270 props.rdkafka_properties_common.receive_message_max_bytes,
271 Some(54321)
272 );
273 assert_eq!(
274 props.rdkafka_properties_common.message_max_bytes,
275 Some(12345)
276 );
277 assert_eq!(
278 props.rdkafka_properties_consumer.queued_min_messages,
279 Some(114514)
280 );
281 assert_eq!(
282 props.rdkafka_properties_consumer.queued_max_messages_kbytes,
283 Some(114514)
284 );
285 assert_eq!(
286 props.rdkafka_properties_consumer.fetch_wait_max_ms,
287 Some(114514)
288 );
289 assert_eq!(
290 props.rdkafka_properties_consumer.fetch_max_bytes,
291 Some(114514)
292 );
293 assert_eq!(
294 props.rdkafka_properties_consumer.enable_auto_commit,
295 Some(true)
296 );
297 assert_eq!(
298 props.rdkafka_properties_consumer.fetch_queue_backoff_ms,
299 Some(114514)
300 );
301 let hashmap: BTreeMap<String, String> = btreemap! {
302 "broker1".to_owned() => "10.0.0.1:8001".to_owned()
303 };
304 assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap));
305 }
306}