risingwave_connector/source/kafka/
mod.rs1use std::collections::HashMap;
16
17use serde::Deserialize;
18use serde_with::{DisplayFromStr, serde_as};
19
20use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
21use crate::enforce_secret::EnforceSecret;
22use crate::error::ConnectorResult;
23
24mod client_context;
25pub mod enumerator;
26pub mod private_link;
27pub mod source;
28pub mod split;
29pub mod stats;
30
31pub use client_context::*;
32pub use enumerator::*;
33pub use source::*;
34pub use split::*;
35use with_options::WithOptions;
36
37use crate::connector_common::{KafkaCommon, RdKafkaPropertiesCommon};
38use crate::source::SourceProperties;
39
40pub const KAFKA_CONNECTOR: &str = "kafka";
41pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
42pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
43pub const PRIVATELINK_CONNECTION: &str = "privatelink";
44
45#[serde_as]
50#[derive(Clone, Debug, Deserialize, Default, WithOptions)]
51pub struct RdKafkaPropertiesConsumer {
52 #[serde(rename = "properties.queued.min.messages")]
55 #[serde_as(as = "Option<DisplayFromStr>")]
56 pub queued_min_messages: Option<usize>,
57
58 #[serde(rename = "properties.queued.max.messages.kbytes")]
59 #[serde_as(as = "Option<DisplayFromStr>")]
60 pub queued_max_messages_kbytes: Option<usize>,
61
62 #[serde(rename = "properties.fetch.wait.max.ms")]
65 #[serde_as(as = "Option<DisplayFromStr>")]
66 pub fetch_wait_max_ms: Option<usize>,
67
68 #[serde(rename = "properties.fetch.min.bytes")]
70 #[serde_as(as = "Option<DisplayFromStr>")]
71 pub fetch_min_bytes: Option<usize>,
72
73 #[serde(rename = "properties.fetch.message.max.bytes")]
75 #[serde_as(as = "Option<DisplayFromStr>")]
76 pub fetch_message_max_bytes: Option<usize>,
77
78 #[serde(rename = "properties.fetch.queue.backoff.ms")]
84 #[serde_as(as = "Option<DisplayFromStr>")]
85 pub fetch_queue_backoff_ms: Option<usize>,
86
87 #[serde(rename = "properties.fetch.max.bytes")]
95 #[serde_as(as = "Option<DisplayFromStr>")]
96 pub fetch_max_bytes: Option<usize>,
97
98 #[serde(rename = "properties.enable.auto.commit")]
105 #[serde_as(as = "Option<DisplayFromStr>")]
106 pub enable_auto_commit: Option<bool>,
107}
108
109#[derive(Clone, Debug, Deserialize, WithOptions)]
110pub struct KafkaProperties {
111 #[serde(rename = "bytes.per.second", alias = "kafka.bytes.per.second")]
116 pub bytes_per_second: Option<String>,
117
118 #[serde(rename = "max.num.messages", alias = "kafka.max.num.messages")]
123 pub max_num_messages: Option<String>,
124
125 #[serde(rename = "scan.startup.mode", alias = "kafka.scan.startup.mode")]
126 pub scan_startup_mode: Option<String>,
127
128 #[serde(
129 rename = "scan.startup.timestamp.millis",
130 alias = "kafka.time.offset",
131 alias = "scan.startup.timestamp_millis" )]
133 pub time_offset: Option<String>,
134
135 #[serde(rename = "group.id.prefix")]
147 pub group_id_prefix: Option<String>,
148
149 #[serde(rename = "upsert")]
153 pub upsert: Option<String>,
154
155 #[serde(flatten)]
156 pub common: KafkaCommon,
157
158 #[serde(flatten)]
159 pub connection: KafkaConnectionProps,
160
161 #[serde(flatten)]
162 pub rdkafka_properties_common: RdKafkaPropertiesCommon,
163
164 #[serde(flatten)]
165 pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,
166
167 #[serde(flatten)]
168 pub privatelink_common: KafkaPrivateLinkCommon,
169
170 #[serde(flatten)]
171 pub aws_auth_props: AwsAuthProps,
172
173 #[serde(flatten)]
174 pub unknown_fields: HashMap<String, String>,
175}
176
177impl EnforceSecret for KafkaProperties {
178 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
179 for prop in prop_iter {
180 KafkaConnectionProps::enforce_one(prop)?;
181 AwsAuthProps::enforce_one(prop)?;
182 }
183 Ok(())
184 }
185}
186
187impl SourceProperties for KafkaProperties {
188 type Split = KafkaSplit;
189 type SplitEnumerator = KafkaSplitEnumerator;
190 type SplitReader = KafkaSplitReader;
191
192 const SOURCE_NAME: &'static str = KAFKA_CONNECTOR;
193}
194
195impl crate::source::UnknownFields for KafkaProperties {
196 fn unknown_fields(&self) -> HashMap<String, String> {
197 self.unknown_fields.clone()
198 }
199}
200
201impl KafkaProperties {
202 pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
203 self.rdkafka_properties_common.set_client(c);
204 self.rdkafka_properties_consumer.set_client(c);
205 }
206
207 pub fn group_id(&self, fragment_id: u32) -> String {
208 format!(
209 "{}-{}",
210 self.group_id_prefix.as_deref().unwrap_or("rw-consumer"),
211 fragment_id
212 )
213 }
214}
215
216const KAFKA_ISOLATION_LEVEL: &str = "read_committed";
217
218impl RdKafkaPropertiesConsumer {
219 pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
220 if let Some(v) = &self.queued_min_messages {
221 c.set("queued.min.messages", v.to_string());
222 }
223 if let Some(v) = &self.queued_max_messages_kbytes {
224 c.set("queued.max.messages.kbytes", v.to_string());
225 }
226 if let Some(v) = &self.fetch_wait_max_ms {
227 c.set("fetch.wait.max.ms", v.to_string());
228 }
229 if let Some(v) = &self.fetch_min_bytes {
230 c.set("fetch.min.bytes", v.to_string());
231 }
232 if let Some(v) = &self.fetch_message_max_bytes {
233 c.set("fetch.message.max.bytes", v.to_string());
234 }
235 if let Some(v) = &self.fetch_queue_backoff_ms {
236 c.set("fetch.queue.backoff.ms", v.to_string());
237 }
238 if let Some(v) = &self.fetch_max_bytes {
239 c.set("fetch.max.bytes", v.to_string());
240 }
241 if let Some(v) = &self.enable_auto_commit {
242 c.set("enable.auto.commit", v.to_string());
243 }
244 }
245}
246
247#[cfg(test)]
248mod test {
249 use std::collections::BTreeMap;
250
251 use maplit::btreemap;
252
253 use super::*;
254
255 #[test]
256 fn test_parse_config_consumer_common() {
257 let config: BTreeMap<String, String> = btreemap! {
258 "properties.bootstrap.server".to_owned() => "127.0.0.1:9092".to_owned(),
260 "topic".to_owned() => "test".to_owned(),
261 "scan.startup.mode".to_owned() => "earliest".to_owned(),
263 "properties.message.max.bytes".to_owned() => "12345".to_owned(),
265 "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
266 "properties.queued.min.messages".to_owned() => "114514".to_owned(),
268 "properties.queued.max.messages.kbytes".to_owned() => "114514".to_owned(),
269 "properties.fetch.wait.max.ms".to_owned() => "114514".to_owned(),
270 "properties.fetch.max.bytes".to_owned() => "114514".to_owned(),
271 "properties.enable.auto.commit".to_owned() => "true".to_owned(),
272 "properties.fetch.queue.backoff.ms".to_owned() => "114514".to_owned(),
273 "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
275 };
276
277 let props: KafkaProperties =
278 serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap();
279
280 assert_eq!(props.scan_startup_mode, Some("earliest".to_owned()));
281 assert_eq!(
282 props.rdkafka_properties_common.receive_message_max_bytes,
283 Some(54321)
284 );
285 assert_eq!(
286 props.rdkafka_properties_common.message_max_bytes,
287 Some(12345)
288 );
289 assert_eq!(
290 props.rdkafka_properties_consumer.queued_min_messages,
291 Some(114514)
292 );
293 assert_eq!(
294 props.rdkafka_properties_consumer.queued_max_messages_kbytes,
295 Some(114514)
296 );
297 assert_eq!(
298 props.rdkafka_properties_consumer.fetch_wait_max_ms,
299 Some(114514)
300 );
301 assert_eq!(
302 props.rdkafka_properties_consumer.fetch_max_bytes,
303 Some(114514)
304 );
305 assert_eq!(
306 props.rdkafka_properties_consumer.enable_auto_commit,
307 Some(true)
308 );
309 assert_eq!(
310 props.rdkafka_properties_consumer.fetch_queue_backoff_ms,
311 Some(114514)
312 );
313 let hashmap: BTreeMap<String, String> = btreemap! {
314 "broker1".to_owned() => "10.0.0.1:8001".to_owned()
315 };
316 assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap));
317 }
318}