risingwave_simulation/
kafka.rs1use std::collections::HashMap;
16use std::time::SystemTime;
17
18use itertools::Either;
19use rdkafka::ClientConfig;
20use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
21use rdkafka::error::{KafkaError, RDKafkaErrorCode};
22use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
23
24pub async fn create_topics(broker_addr: &str, topics: HashMap<String, i32>) {
26 let admin = ClientConfig::new()
27 .set("bootstrap.servers", broker_addr)
28 .create::<AdminClient<_>>()
29 .await
30 .expect("failed to create kafka admin client");
31
32 for (topic, partition) in topics {
33 println!("creating topic {}", topic);
34 admin
35 .create_topics(
36 &[NewTopic::new(
37 topic.as_str(),
38 partition,
39 TopicReplication::Fixed(1),
40 )],
41 &AdminOptions::default(),
42 )
43 .await
44 .expect("failed to create topic");
45 }
46}
47
48pub async fn producer(broker_addr: &str, datadir: String) {
50 const KEY_DELIMITER: u8 = b'^';
53
54 let admin = ClientConfig::new()
55 .set("bootstrap.servers", broker_addr)
56 .create::<AdminClient<_>>()
57 .await
58 .expect("failed to create kafka admin client");
59
60 let producer = ClientConfig::new()
61 .set("bootstrap.servers", broker_addr)
62 .create::<BaseProducer>()
63 .await
64 .expect("failed to create kafka producer");
65
66 for file in std::fs::read_dir(datadir).unwrap() {
67 let file = file.unwrap();
68 let name = file.file_name().into_string().unwrap();
69 let Some((topic, partitions)) = name.split_once('.') else {
70 tracing::warn!("ignore file: {name:?}. expected format \"topic.partitions\"");
71 continue;
72 };
73 admin
74 .create_topics(
75 &[NewTopic::new(
76 topic,
77 partitions.parse().unwrap(),
78 TopicReplication::Fixed(1),
79 )],
80 &AdminOptions::default(),
81 )
82 .await
83 .expect("failed to create topic");
84
85 let content = std::fs::read(file.path()).unwrap();
86 let msgs = if topic.ends_with("bin") {
87 Either::Left(std::iter::once((None, content.as_slice())))
89 } else {
90 Either::Right(
92 content
93 .split(|&b| b == b'\n')
94 .filter(|line| !line.is_empty())
95 .map(|line| match line.iter().position(|&b| b == KEY_DELIMITER) {
96 Some(pos) => (Some(&line[..pos]), &line[pos + 1..]),
97 None => (None, line),
98 }),
99 )
100 };
101 for (key, payload) in msgs {
102 loop {
103 let ts = SystemTime::now()
104 .duration_since(SystemTime::UNIX_EPOCH)
105 .unwrap()
106 .as_millis() as i64;
107 let mut record = BaseRecord::to(topic).payload(payload).timestamp(ts);
108 if let Some(key) = key {
109 record = record.key(key);
110 }
111 match producer.send(record) {
112 Ok(_) => break,
113 Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => {
114 producer.flush(None).await.expect("failed to flush");
115 }
116 Err((e, _)) => panic!("failed to send message: {}", e),
117 }
118 }
119 }
120 producer.flush(None).await.expect("failed to flush");
121 }
122}