risingwave_simulation/
kafka.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::SystemTime;
17
18use itertools::Either;
19use rdkafka::ClientConfig;
20use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
21use rdkafka::error::{KafkaError, RDKafkaErrorCode};
22use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
23
24/// Create a kafka topic
25pub async fn create_topics(broker_addr: &str, topics: HashMap<String, i32>) {
26    let admin = ClientConfig::new()
27        .set("bootstrap.servers", broker_addr)
28        .create::<AdminClient<_>>()
29        .await
30        .expect("failed to create kafka admin client");
31
32    for (topic, partition) in topics {
33        println!("creating topic {}", topic);
34        admin
35            .create_topics(
36                &[NewTopic::new(
37                    topic.as_str(),
38                    partition,
39                    TopicReplication::Fixed(1),
40                )],
41                &AdminOptions::default(),
42            )
43            .await
44            .expect("failed to create topic");
45    }
46}
47
48/// Create a kafka producer for the topics and data in `datadir`.
49pub async fn producer(broker_addr: &str, datadir: String) {
50    /// Delimiter for key and value in a line.
51    /// equal to `kafkacat -K ^`
52    const KEY_DELIMITER: u8 = b'^';
53
54    let admin = ClientConfig::new()
55        .set("bootstrap.servers", broker_addr)
56        .create::<AdminClient<_>>()
57        .await
58        .expect("failed to create kafka admin client");
59
60    let producer = ClientConfig::new()
61        .set("bootstrap.servers", broker_addr)
62        .create::<BaseProducer>()
63        .await
64        .expect("failed to create kafka producer");
65
66    for file in std::fs::read_dir(datadir).unwrap() {
67        let file = file.unwrap();
68        let name = file.file_name().into_string().unwrap();
69        let Some((topic, partitions)) = name.split_once('.') else {
70            tracing::warn!("ignore file: {name:?}. expected format \"topic.partitions\"");
71            continue;
72        };
73        admin
74            .create_topics(
75                &[NewTopic::new(
76                    topic,
77                    partitions.parse().unwrap(),
78                    TopicReplication::Fixed(1),
79                )],
80                &AdminOptions::default(),
81            )
82            .await
83            .expect("failed to create topic");
84
85        let content = std::fs::read(file.path()).unwrap();
86        let msgs = if topic.ends_with("bin") {
87            // binary message data, a file is a message
88            Either::Left(std::iter::once((None, content.as_slice())))
89        } else {
90            // text message data, a line is a message
91            Either::Right(
92                content
93                    .split(|&b| b == b'\n')
94                    .filter(|line| !line.is_empty())
95                    .map(|line| match line.iter().position(|&b| b == KEY_DELIMITER) {
96                        Some(pos) => (Some(&line[..pos]), &line[pos + 1..]),
97                        None => (None, line),
98                    }),
99            )
100        };
101        for (key, payload) in msgs {
102            loop {
103                let ts = SystemTime::now()
104                    .duration_since(SystemTime::UNIX_EPOCH)
105                    .unwrap()
106                    .as_millis() as i64;
107                let mut record = BaseRecord::to(topic).payload(payload).timestamp(ts);
108                if let Some(key) = key {
109                    record = record.key(key);
110                }
111                match producer.send(record) {
112                    Ok(_) => break,
113                    Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => {
114                        producer.flush(None).await.expect("failed to flush");
115                    }
116                    Err((e, _)) => panic!("failed to send message: {}", e),
117                }
118            }
119        }
120        producer.flush(None).await.expect("failed to flush");
121    }
122}