risedev/task/
kafka_service.rs1use super::docker_service::{DockerService, DockerServiceConfig};
16use crate::KafkaConfig;
17
18impl DockerServiceConfig for KafkaConfig {
19 fn id(&self) -> String {
20 self.id.clone()
21 }
22
23 fn is_user_managed(&self) -> bool {
24 self.user_managed
25 }
26
27 fn image(&self) -> String {
28 self.image.clone()
29 }
30
31 fn envs(&self) -> Vec<(String, String)> {
32 vec![
33 ("KAFKA_NODE_ID".to_owned(), self.node_id.to_string()),
34 (
35 "KAFKA_PROCESS_ROLES".to_owned(),
36 "controller,broker".to_owned(),
37 ),
38 (
39 "KAFKA_LISTENERS".to_owned(),
40 "HOST://:9092,CONTROLLER://:9093,DOCKER://:9094".to_owned(),
41 ),
42 (
43 "KAFKA_ADVERTISED_LISTENERS".to_owned(),
44 format!(
45 "HOST://{}:{},DOCKER://host.docker.internal:{}",
46 self.address, self.port, self.docker_port
47 ),
48 ),
49 (
50 "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
51 "HOST:PLAINTEXT,CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT".to_owned(),
52 ),
53 (
54 "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(),
55 format!("{}@localhost:9093", self.node_id),
56 ),
57 (
58 "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(),
59 "CONTROLLER".to_owned(),
60 ),
61 (
62 "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
63 "HOST".to_owned(),
64 ),
65 (
67 "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
68 "1".to_owned(),
69 ),
70 ("CLUSTER_ID".to_owned(), "RiseDevRiseDevRiseDev1".to_owned()),
71 ]
72 }
73
74 fn ports(&self) -> Vec<(String, String)> {
75 vec![
76 (self.port.to_string(), "9092".to_owned()),
77 (self.docker_port.to_string(), "9094".to_owned()),
78 ]
79 }
80
81 fn data_path(&self) -> Option<String> {
82 self.persist_data.then(|| "/var/lib/kafka/data".to_owned())
83 }
84}
85
86pub type KafkaService = DockerService<KafkaConfig>;