risingwave_connector/source/kafka/
mod.rs1use std::collections::HashMap;
16
17use serde::Deserialize;
18use serde_with::{DisplayFromStr, serde_as};
19
20use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
21use crate::enforce_secret::EnforceSecret;
22use crate::error::ConnectorResult;
23
24mod client_context;
25pub mod enumerator;
26pub mod private_link;
27pub mod source;
28pub mod split;
29pub mod stats;
30
31pub use client_context::*;
32pub use enumerator::*;
33use risingwave_common::id::FragmentId;
34pub use source::*;
35pub use split::*;
36use with_options::WithOptions;
37
38use crate::connector_common::{KafkaCommon, RdKafkaPropertiesCommon};
39use crate::source::SourceProperties;
40
41pub const KAFKA_CONNECTOR: &str = "kafka";
42pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
43pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
44pub const PRIVATELINK_CONNECTION: &str = "privatelink";
45
46#[serde_as]
51#[derive(Clone, Debug, Deserialize, Default, WithOptions)]
52pub struct RdKafkaPropertiesConsumer {
53 #[serde(rename = "properties.queued.min.messages")]
56 #[serde_as(as = "Option<DisplayFromStr>")]
57 #[with_option(allow_alter_on_fly)]
58 pub queued_min_messages: Option<usize>,
59
60 #[serde(rename = "properties.queued.max.messages.kbytes")]
61 #[serde_as(as = "Option<DisplayFromStr>")]
62 #[with_option(allow_alter_on_fly)]
63 pub queued_max_messages_kbytes: Option<usize>,
64
65 #[serde(rename = "properties.fetch.wait.max.ms")]
68 #[serde_as(as = "Option<DisplayFromStr>")]
69 #[with_option(allow_alter_on_fly)]
70 pub fetch_wait_max_ms: Option<usize>,
71
72 #[serde(rename = "properties.fetch.min.bytes")]
74 #[serde_as(as = "Option<DisplayFromStr>")]
75 pub fetch_min_bytes: Option<usize>,
76
77 #[serde(rename = "properties.fetch.message.max.bytes")]
79 #[serde_as(as = "Option<DisplayFromStr>")]
80 pub fetch_message_max_bytes: Option<usize>,
81
82 #[serde(rename = "properties.fetch.queue.backoff.ms")]
88 #[serde_as(as = "Option<DisplayFromStr>")]
89 #[with_option(allow_alter_on_fly)]
90 pub fetch_queue_backoff_ms: Option<usize>,
91
92 #[serde(rename = "properties.fetch.max.bytes")]
100 #[serde_as(as = "Option<DisplayFromStr>")]
101 #[with_option(allow_alter_on_fly)]
102 pub fetch_max_bytes: Option<usize>,
103
104 #[serde(rename = "properties.enable.auto.commit")]
111 #[serde_as(as = "Option<DisplayFromStr>")]
112 #[with_option(allow_alter_on_fly)]
113 pub enable_auto_commit: Option<bool>,
114}
115
116#[derive(Clone, Debug, Deserialize, WithOptions)]
117pub struct KafkaProperties {
118 #[serde(rename = "bytes.per.second", alias = "kafka.bytes.per.second")]
123 pub bytes_per_second: Option<String>,
124
125 #[serde(rename = "max.num.messages", alias = "kafka.max.num.messages")]
130 pub max_num_messages: Option<String>,
131
132 #[serde(rename = "scan.startup.mode", alias = "kafka.scan.startup.mode")]
133 pub scan_startup_mode: Option<String>,
134
135 #[serde(
136 rename = "scan.startup.timestamp.millis",
137 alias = "kafka.time.offset",
138 alias = "scan.startup.timestamp_millis" )]
140 pub time_offset: Option<String>,
141
142 #[serde(rename = "group.id.prefix")]
154 #[with_option(allow_alter_on_fly)]
155 pub group_id_prefix: Option<String>,
156
157 #[serde(rename = "upsert")]
161 pub upsert: Option<String>,
162
163 #[serde(flatten)]
164 pub common: KafkaCommon,
165
166 #[serde(flatten)]
167 pub connection: KafkaConnectionProps,
168
169 #[serde(flatten)]
170 pub rdkafka_properties_common: RdKafkaPropertiesCommon,
171
172 #[serde(flatten)]
173 pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,
174
175 #[serde(flatten)]
176 pub privatelink_common: KafkaPrivateLinkCommon,
177
178 #[serde(flatten)]
179 pub aws_auth_props: AwsAuthProps,
180
181 #[serde(flatten)]
182 pub unknown_fields: HashMap<String, String>,
183}
184
185impl EnforceSecret for KafkaProperties {
186 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
187 for prop in prop_iter {
188 KafkaConnectionProps::enforce_one(prop)?;
189 AwsAuthProps::enforce_one(prop)?;
190 }
191 Ok(())
192 }
193}
194
195impl SourceProperties for KafkaProperties {
196 type Split = KafkaSplit;
197 type SplitEnumerator = KafkaSplitEnumerator;
198 type SplitReader = KafkaSplitReader;
199
200 const SOURCE_NAME: &'static str = KAFKA_CONNECTOR;
201}
202
203impl crate::source::UnknownFields for KafkaProperties {
204 fn unknown_fields(&self) -> HashMap<String, String> {
205 self.unknown_fields.clone()
206 }
207}
208
209impl KafkaProperties {
210 pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
211 self.rdkafka_properties_common.set_client(c);
212 self.rdkafka_properties_consumer.set_client(c);
213 }
214
215 pub fn group_id(&self, fragment_id: FragmentId) -> String {
216 format!(
217 "{}-{}",
218 self.group_id_prefix.as_deref().unwrap_or("rw-consumer"),
219 fragment_id
220 )
221 }
222}
223
224const KAFKA_ISOLATION_LEVEL: &str = "read_committed";
225
226impl RdKafkaPropertiesConsumer {
227 pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
228 if let Some(v) = &self.queued_min_messages {
229 c.set("queued.min.messages", v.to_string());
230 }
231 if let Some(v) = &self.queued_max_messages_kbytes {
232 c.set("queued.max.messages.kbytes", v.to_string());
233 }
234 if let Some(v) = &self.fetch_wait_max_ms {
235 c.set("fetch.wait.max.ms", v.to_string());
236 }
237 if let Some(v) = &self.fetch_min_bytes {
238 c.set("fetch.min.bytes", v.to_string());
239 }
240 if let Some(v) = &self.fetch_message_max_bytes {
241 c.set("fetch.message.max.bytes", v.to_string());
242 }
243 if let Some(v) = &self.fetch_queue_backoff_ms {
244 c.set("fetch.queue.backoff.ms", v.to_string());
245 }
246 if let Some(v) = &self.fetch_max_bytes {
247 c.set("fetch.max.bytes", v.to_string());
248 }
249 if let Some(v) = &self.enable_auto_commit {
250 c.set("enable.auto.commit", v.to_string());
251 }
252 }
253}
254
255#[cfg(test)]
256mod test {
257 use std::collections::BTreeMap;
258
259 use maplit::btreemap;
260
261 use super::*;
262
263 #[test]
264 fn test_parse_config_consumer_common() {
265 let config: BTreeMap<String, String> = btreemap! {
266 "properties.bootstrap.server".to_owned() => "127.0.0.1:9092".to_owned(),
268 "topic".to_owned() => "test".to_owned(),
269 "scan.startup.mode".to_owned() => "earliest".to_owned(),
271 "properties.message.max.bytes".to_owned() => "12345".to_owned(),
273 "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
274 "properties.queued.min.messages".to_owned() => "114514".to_owned(),
276 "properties.queued.max.messages.kbytes".to_owned() => "114514".to_owned(),
277 "properties.fetch.wait.max.ms".to_owned() => "114514".to_owned(),
278 "properties.fetch.max.bytes".to_owned() => "114514".to_owned(),
279 "properties.enable.auto.commit".to_owned() => "true".to_owned(),
280 "properties.fetch.queue.backoff.ms".to_owned() => "114514".to_owned(),
281 "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
283 };
284
285 let props: KafkaProperties =
286 serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap();
287
288 assert_eq!(props.scan_startup_mode, Some("earliest".to_owned()));
289 assert_eq!(
290 props.rdkafka_properties_common.receive_message_max_bytes,
291 Some(54321)
292 );
293 assert_eq!(
294 props.rdkafka_properties_common.message_max_bytes,
295 Some(12345)
296 );
297 assert_eq!(
298 props.rdkafka_properties_consumer.queued_min_messages,
299 Some(114514)
300 );
301 assert_eq!(
302 props.rdkafka_properties_consumer.queued_max_messages_kbytes,
303 Some(114514)
304 );
305 assert_eq!(
306 props.rdkafka_properties_consumer.fetch_wait_max_ms,
307 Some(114514)
308 );
309 assert_eq!(
310 props.rdkafka_properties_consumer.fetch_max_bytes,
311 Some(114514)
312 );
313 assert_eq!(
314 props.rdkafka_properties_consumer.enable_auto_commit,
315 Some(true)
316 );
317 assert_eq!(
318 props.rdkafka_properties_consumer.fetch_queue_backoff_ms,
319 Some(114514)
320 );
321 let hashmap: BTreeMap<String, String> = btreemap! {
322 "broker1".to_owned() => "10.0.0.1:8001".to_owned()
323 };
324 assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap));
325 }
326}