risingwave_connector/source/kafka/
mod.rs1use std::collections::HashMap;
16
17use serde::Deserialize;
18use serde_with::{DisplayFromStr, serde_as};
19
20use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
21use crate::enforce_secret::EnforceSecret;
22use crate::error::ConnectorResult;
23
24mod client_context;
25pub mod enumerator;
26pub mod private_link;
27pub mod source;
28pub mod split;
29pub mod stats;
30
31pub use client_context::*;
32pub use enumerator::*;
33pub use source::*;
34pub use split::*;
35use with_options::WithOptions;
36
37use crate::connector_common::{KafkaCommon, RdKafkaPropertiesCommon};
38use crate::source::SourceProperties;
39
40pub const KAFKA_CONNECTOR: &str = "kafka";
41pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
42pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
43pub const PRIVATELINK_CONNECTION: &str = "privatelink";
44
45#[serde_as]
50#[derive(Clone, Debug, Deserialize, Default, WithOptions)]
51pub struct RdKafkaPropertiesConsumer {
52 #[serde(rename = "properties.queued.min.messages")]
55 #[serde_as(as = "Option<DisplayFromStr>")]
56 #[with_option(allow_alter_on_fly)]
57 pub queued_min_messages: Option<usize>,
58
59 #[serde(rename = "properties.queued.max.messages.kbytes")]
60 #[serde_as(as = "Option<DisplayFromStr>")]
61 #[with_option(allow_alter_on_fly)]
62 pub queued_max_messages_kbytes: Option<usize>,
63
64 #[serde(rename = "properties.fetch.wait.max.ms")]
67 #[serde_as(as = "Option<DisplayFromStr>")]
68 #[with_option(allow_alter_on_fly)]
69 pub fetch_wait_max_ms: Option<usize>,
70
71 #[serde(rename = "properties.fetch.min.bytes")]
73 #[serde_as(as = "Option<DisplayFromStr>")]
74 pub fetch_min_bytes: Option<usize>,
75
76 #[serde(rename = "properties.fetch.message.max.bytes")]
78 #[serde_as(as = "Option<DisplayFromStr>")]
79 pub fetch_message_max_bytes: Option<usize>,
80
81 #[serde(rename = "properties.fetch.queue.backoff.ms")]
87 #[serde_as(as = "Option<DisplayFromStr>")]
88 #[with_option(allow_alter_on_fly)]
89 pub fetch_queue_backoff_ms: Option<usize>,
90
91 #[serde(rename = "properties.fetch.max.bytes")]
99 #[serde_as(as = "Option<DisplayFromStr>")]
100 #[with_option(allow_alter_on_fly)]
101 pub fetch_max_bytes: Option<usize>,
102
103 #[serde(rename = "properties.enable.auto.commit")]
110 #[serde_as(as = "Option<DisplayFromStr>")]
111 #[with_option(allow_alter_on_fly)]
112 pub enable_auto_commit: Option<bool>,
113}
114
115#[derive(Clone, Debug, Deserialize, WithOptions)]
116pub struct KafkaProperties {
117 #[serde(rename = "bytes.per.second", alias = "kafka.bytes.per.second")]
122 pub bytes_per_second: Option<String>,
123
124 #[serde(rename = "max.num.messages", alias = "kafka.max.num.messages")]
129 pub max_num_messages: Option<String>,
130
131 #[serde(rename = "scan.startup.mode", alias = "kafka.scan.startup.mode")]
132 pub scan_startup_mode: Option<String>,
133
134 #[serde(
135 rename = "scan.startup.timestamp.millis",
136 alias = "kafka.time.offset",
137 alias = "scan.startup.timestamp_millis" )]
139 pub time_offset: Option<String>,
140
141 #[serde(rename = "group.id.prefix")]
153 #[with_option(allow_alter_on_fly)]
154 pub group_id_prefix: Option<String>,
155
156 #[serde(rename = "upsert")]
160 pub upsert: Option<String>,
161
162 #[serde(flatten)]
163 pub common: KafkaCommon,
164
165 #[serde(flatten)]
166 pub connection: KafkaConnectionProps,
167
168 #[serde(flatten)]
169 pub rdkafka_properties_common: RdKafkaPropertiesCommon,
170
171 #[serde(flatten)]
172 pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,
173
174 #[serde(flatten)]
175 pub privatelink_common: KafkaPrivateLinkCommon,
176
177 #[serde(flatten)]
178 pub aws_auth_props: AwsAuthProps,
179
180 #[serde(flatten)]
181 pub unknown_fields: HashMap<String, String>,
182}
183
184impl EnforceSecret for KafkaProperties {
185 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
186 for prop in prop_iter {
187 KafkaConnectionProps::enforce_one(prop)?;
188 AwsAuthProps::enforce_one(prop)?;
189 }
190 Ok(())
191 }
192}
193
194impl SourceProperties for KafkaProperties {
195 type Split = KafkaSplit;
196 type SplitEnumerator = KafkaSplitEnumerator;
197 type SplitReader = KafkaSplitReader;
198
199 const SOURCE_NAME: &'static str = KAFKA_CONNECTOR;
200}
201
202impl crate::source::UnknownFields for KafkaProperties {
203 fn unknown_fields(&self) -> HashMap<String, String> {
204 self.unknown_fields.clone()
205 }
206}
207
208impl KafkaProperties {
209 pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
210 self.rdkafka_properties_common.set_client(c);
211 self.rdkafka_properties_consumer.set_client(c);
212 }
213
214 pub fn group_id(&self, fragment_id: u32) -> String {
215 format!(
216 "{}-{}",
217 self.group_id_prefix.as_deref().unwrap_or("rw-consumer"),
218 fragment_id
219 )
220 }
221}
222
223const KAFKA_ISOLATION_LEVEL: &str = "read_committed";
224
225impl RdKafkaPropertiesConsumer {
226 pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
227 if let Some(v) = &self.queued_min_messages {
228 c.set("queued.min.messages", v.to_string());
229 }
230 if let Some(v) = &self.queued_max_messages_kbytes {
231 c.set("queued.max.messages.kbytes", v.to_string());
232 }
233 if let Some(v) = &self.fetch_wait_max_ms {
234 c.set("fetch.wait.max.ms", v.to_string());
235 }
236 if let Some(v) = &self.fetch_min_bytes {
237 c.set("fetch.min.bytes", v.to_string());
238 }
239 if let Some(v) = &self.fetch_message_max_bytes {
240 c.set("fetch.message.max.bytes", v.to_string());
241 }
242 if let Some(v) = &self.fetch_queue_backoff_ms {
243 c.set("fetch.queue.backoff.ms", v.to_string());
244 }
245 if let Some(v) = &self.fetch_max_bytes {
246 c.set("fetch.max.bytes", v.to_string());
247 }
248 if let Some(v) = &self.enable_auto_commit {
249 c.set("enable.auto.commit", v.to_string());
250 }
251 }
252}
253
254#[cfg(test)]
255mod test {
256 use std::collections::BTreeMap;
257
258 use maplit::btreemap;
259
260 use super::*;
261
262 #[test]
263 fn test_parse_config_consumer_common() {
264 let config: BTreeMap<String, String> = btreemap! {
265 "properties.bootstrap.server".to_owned() => "127.0.0.1:9092".to_owned(),
267 "topic".to_owned() => "test".to_owned(),
268 "scan.startup.mode".to_owned() => "earliest".to_owned(),
270 "properties.message.max.bytes".to_owned() => "12345".to_owned(),
272 "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
273 "properties.queued.min.messages".to_owned() => "114514".to_owned(),
275 "properties.queued.max.messages.kbytes".to_owned() => "114514".to_owned(),
276 "properties.fetch.wait.max.ms".to_owned() => "114514".to_owned(),
277 "properties.fetch.max.bytes".to_owned() => "114514".to_owned(),
278 "properties.enable.auto.commit".to_owned() => "true".to_owned(),
279 "properties.fetch.queue.backoff.ms".to_owned() => "114514".to_owned(),
280 "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
282 };
283
284 let props: KafkaProperties =
285 serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap();
286
287 assert_eq!(props.scan_startup_mode, Some("earliest".to_owned()));
288 assert_eq!(
289 props.rdkafka_properties_common.receive_message_max_bytes,
290 Some(54321)
291 );
292 assert_eq!(
293 props.rdkafka_properties_common.message_max_bytes,
294 Some(12345)
295 );
296 assert_eq!(
297 props.rdkafka_properties_consumer.queued_min_messages,
298 Some(114514)
299 );
300 assert_eq!(
301 props.rdkafka_properties_consumer.queued_max_messages_kbytes,
302 Some(114514)
303 );
304 assert_eq!(
305 props.rdkafka_properties_consumer.fetch_wait_max_ms,
306 Some(114514)
307 );
308 assert_eq!(
309 props.rdkafka_properties_consumer.fetch_max_bytes,
310 Some(114514)
311 );
312 assert_eq!(
313 props.rdkafka_properties_consumer.enable_auto_commit,
314 Some(true)
315 );
316 assert_eq!(
317 props.rdkafka_properties_consumer.fetch_queue_backoff_ms,
318 Some(114514)
319 );
320 let hashmap: BTreeMap<String, String> = btreemap! {
321 "broker1".to_owned() => "10.0.0.1:8001".to_owned()
322 };
323 assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap));
324 }
325}