risedev/task/
kafka_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::docker_service::{DockerService, DockerServiceConfig};
16use crate::KafkaConfig;
17
18impl DockerServiceConfig for KafkaConfig {
19    fn id(&self) -> String {
20        self.id.clone()
21    }
22
23    fn is_user_managed(&self) -> bool {
24        self.user_managed
25    }
26
27    fn image(&self) -> String {
28        self.image.clone()
29    }
30
31    fn envs(&self) -> Vec<(String, String)> {
32        vec![
33            ("KAFKA_NODE_ID".to_owned(), self.node_id.to_string()),
34            (
35                "KAFKA_PROCESS_ROLES".to_owned(),
36                "controller,broker".to_owned(),
37            ),
38            (
39                "KAFKA_LISTENERS".to_owned(),
40                "HOST://:9092,CONTROLLER://:9093,DOCKER://:9094".to_owned(),
41            ),
42            (
43                "KAFKA_ADVERTISED_LISTENERS".to_owned(),
44                format!(
45                    "HOST://{}:{},DOCKER://host.docker.internal:{}",
46                    self.address, self.port, self.docker_port
47                ),
48            ),
49            (
50                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
51                "HOST:PLAINTEXT,CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT".to_owned(),
52            ),
53            (
54                "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(),
55                format!("{}@localhost:9093", self.node_id),
56            ),
57            (
58                "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(),
59                "CONTROLLER".to_owned(),
60            ),
61            (
62                "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
63                "HOST".to_owned(),
64            ),
65            // https://docs.confluent.io/platform/current/installation/docker/config-reference.html#example-configurations
66            (
67                "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
68                "1".to_owned(),
69            ),
70            ("CLUSTER_ID".to_owned(), "RiseDevRiseDevRiseDev1".to_owned()),
71        ]
72    }
73
74    fn ports(&self) -> Vec<(String, String)> {
75        vec![
76            (self.port.to_string(), "9092".to_owned()),
77            (self.docker_port.to_string(), "9094".to_owned()),
78        ]
79    }
80
81    fn data_path(&self) -> Option<String> {
82        self.persist_data.then(|| "/var/lib/kafka/data".to_owned())
83    }
84}
85
86/// Docker-backed Kafka service.
87pub type KafkaService = DockerService<KafkaConfig>;