risedev/task/
schema_registry_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::docker_service::{DockerService, DockerServiceConfig};
16use crate::SchemaRegistryConfig;
17
18/// Schema Registry listener port in the container.
19const SCHEMA_REGISTRY_LISTENER_PORT: &str = "8081";
20
21impl DockerServiceConfig for SchemaRegistryConfig {
22    fn id(&self) -> String {
23        self.id.clone()
24    }
25
26    fn is_user_managed(&self) -> bool {
27        self.user_managed
28    }
29
30    fn image(&self) -> String {
31        self.image.clone()
32    }
33
34    fn envs(&self) -> Vec<(String, String)> {
35        // https://docs.confluent.io/platform/current/installation/docker/config-reference.html#sr-long-configuration
36        // https://docs.confluent.io/platform/current/schema-registry/installation/config.html
37        let kafka = self
38            .provide_kafka
39            .as_ref()
40            .expect("Kafka is required for Schema Registry");
41        if kafka.len() != 1 {
42            panic!("More than one Kafka is not supported yet");
43        }
44        let kafka = &kafka[0];
45        if kafka.user_managed {
46            panic!(
47                "user-managed Kafka with docker Schema Registry is not supported yet. Please make them both or neither user-managed."
48            );
49        }
50        vec![
51            ("SCHEMA_REGISTRY_HOST_NAME".to_owned(), self.address.clone()),
52            (
53                "SCHEMA_REGISTRY_LISTENERS".to_owned(),
54                format!("http://{}:{}", "0.0.0.0", SCHEMA_REGISTRY_LISTENER_PORT),
55            ),
56            (
57                "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS".to_owned(),
58                format!("host.docker.internal:{}", kafka.docker_port),
59            ),
60        ]
61    }
62
63    fn ports(&self) -> Vec<(String, String)> {
64        vec![(
65            self.port.to_string(),
66            SCHEMA_REGISTRY_LISTENER_PORT.to_owned(),
67        )]
68    }
69
70    fn data_path(&self) -> Option<String> {
71        None
72    }
73}
74
75/// Docker-backed Schema Registry service.
76pub type SchemaRegistryService = DockerService<SchemaRegistryConfig>;