risedev/task/
schema_registry_service.rs1use super::docker_service::{DockerService, DockerServiceConfig};
16use crate::SchemaRegistryConfig;
17
18const SCHEMA_REGISTRY_LISTENER_PORT: &str = "8081";
20
21impl DockerServiceConfig for SchemaRegistryConfig {
22 fn id(&self) -> String {
23 self.id.clone()
24 }
25
26 fn is_user_managed(&self) -> bool {
27 self.user_managed
28 }
29
30 fn image(&self) -> String {
31 self.image.clone()
32 }
33
34 fn envs(&self) -> Vec<(String, String)> {
35 let kafka = self
38 .provide_kafka
39 .as_ref()
40 .expect("Kafka is required for Schema Registry");
41 if kafka.len() != 1 {
42 panic!("More than one Kafka is not supported yet");
43 }
44 let kafka = &kafka[0];
45 if kafka.user_managed {
46 panic!(
47 "user-managed Kafka with docker Schema Registry is not supported yet. Please make them both or neither user-managed."
48 );
49 }
50 vec![
51 ("SCHEMA_REGISTRY_HOST_NAME".to_owned(), self.address.clone()),
52 (
53 "SCHEMA_REGISTRY_LISTENERS".to_owned(),
54 format!("http://{}:{}", "0.0.0.0", SCHEMA_REGISTRY_LISTENER_PORT),
55 ),
56 (
57 "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS".to_owned(),
58 format!("host.docker.internal:{}", kafka.docker_port),
59 ),
60 ]
61 }
62
63 fn ports(&self) -> Vec<(String, String)> {
64 vec![(
65 self.port.to_string(),
66 SCHEMA_REGISTRY_LISTENER_PORT.to_owned(),
67 )]
68 }
69
70 fn data_path(&self) -> Option<String> {
71 None
72 }
73}
74
75pub type SchemaRegistryService = DockerService<SchemaRegistryConfig>;