risedev/task/
docker_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::process::{Command, Stdio};
17use std::{env, thread};
18
19use anyhow::{Context, Result, anyhow};
20
21use super::{ExecuteContext, Task};
22use crate::DummyService;
23
24/// Configuration for a docker-backed service.
25///
26/// The trait can be implemented for the configuration struct of a service.
27/// After that, use `DockerService<Config>` as the service type.
28pub trait DockerServiceConfig: Send + 'static {
29    /// The unique identifier of the service.
30    fn id(&self) -> String;
31
32    /// Whether the service is managed by the user.
33    ///
34    /// If true, no docker container will be started and the service will be forwarded to
35    /// the [`DummyService`].
36    fn is_user_managed(&self) -> bool;
37
38    /// The docker image to use, e.g. `mysql:5.7`.
39    fn image(&self) -> String;
40
41    /// Additional arguments to pass to the docker container.
42    fn args(&self) -> Vec<String> {
43        vec![]
44    }
45
46    /// The environment variables to pass to the docker container.
47    fn envs(&self) -> Vec<(String, String)> {
48        vec![]
49    }
50
51    /// The ports to expose on the host, e.g. `("0.0.0.0:23306", "3306")`.
52    ///
53    /// The first element of the tuple is the host port (or address), the second is the
54    /// container port.
55    fn ports(&self) -> Vec<(String, String)> {
56        vec![]
57    }
58
59    /// The path in the container to persist data to, e.g. `/var/lib/mysql`.
60    ///
61    /// `Some` if the service is specified to persist data, `None` otherwise.
62    fn data_path(&self) -> Option<String> {
63        None
64    }
65
66    /// Network latency in milliseconds to add to the container.
67    ///
68    /// `Some` if latency should be added, `None` otherwise.
69    fn latency_ms(&self) -> u32 {
70        0
71    }
72}
73
74/// A service that runs a docker container with the given configuration.
75pub struct DockerService<B> {
76    config: B,
77}
78
79impl<B> DockerService<B>
80where
81    B: DockerServiceConfig,
82{
83    pub fn new(config: B) -> Self {
84        Self { config }
85    }
86
87    /// Run `docker image inspect <image>` to check if the image exists locally.
88    ///
89    /// `docker run --pull=missing` does the same thing, but as we split the pull and run
90    /// into two commands while `pull` does not provide such an option, we need to check
91    /// the image existence manually.
92    fn check_image_exists(&self) -> bool {
93        Command::new("docker")
94            .arg("image")
95            .arg("inspect")
96            .arg(self.config.image())
97            .stdout(Stdio::null())
98            .stderr(Stdio::null())
99            .status()
100            .map(|status| status.success())
101            .unwrap_or(false)
102    }
103
104    fn docker_pull(&self) -> Command {
105        let mut cmd = Command::new("docker");
106        cmd.arg("pull").arg(self.config.image());
107        cmd
108    }
109
110    fn docker_run(&self) -> Result<Command> {
111        let mut cmd = Command::new("docker");
112        cmd.arg("run")
113            .arg("--rm")
114            .arg("--name")
115            .arg(format!("risedev-{}", self.id()))
116            .arg("--add-host")
117            .arg("host.docker.internal:host-gateway");
118
119        // Add capabilities for traffic control if latency is configured
120        if self.config.latency_ms() > 0 {
121            cmd.arg("--cap-add=NET_ADMIN").arg("--cap-add=NET_RAW");
122        }
123
124        for (k, v) in self.config.envs() {
125            cmd.arg("-e").arg(format!("{k}={v}"));
126        }
127        for (container, host) in self.config.ports() {
128            cmd.arg("-p").arg(format!("{container}:{host}"));
129        }
130
131        if let Some(data_path) = self.config.data_path() {
132            let path = Path::new(&env::var("PREFIX_DATA")?).join(self.id());
133            fs_err::create_dir_all(&path)?;
134            cmd.arg("-v")
135                .arg(format!("{}:{}", path.to_string_lossy(), data_path));
136        }
137
138        cmd.arg(self.config.image());
139
140        cmd.args(self.config.args());
141
142        Ok(cmd)
143    }
144}
145
146impl<B> Task for DockerService<B>
147where
148    B: DockerServiceConfig,
149{
150    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
151        if self.config.is_user_managed() {
152            return DummyService::new(&self.id()).execute(ctx);
153        }
154
155        ctx.service(self);
156
157        check_docker_installed()?;
158
159        if !self.check_image_exists() {
160            ctx.pb
161                .set_message(format!("pulling image `{}`...", self.config.image()));
162            ctx.run_command(self.docker_pull())?;
163        }
164
165        ctx.pb.set_message("starting...");
166        ctx.run_command(ctx.tmux_run(self.docker_run()?)?)?;
167
168        // If latency is configured, add it after the container starts
169        if self.config.latency_ms() > 0 {
170            ctx.pb.set_message("configuring network latency...");
171
172            // Wait a moment for the container to be ready
173            thread::sleep(std::time::Duration::from_secs(2));
174
175            // Add latency using docker exec
176            let mut tc_cmd = Command::new("docker");
177            tc_cmd
178                .arg("exec")
179                .arg(format!("risedev-{}", self.id()))
180                .arg("sh")
181                .arg("-c")
182                // add the `netem` queueing discipline to the network interface `eth0`` to emulate latency
183                .arg(format!(
184                    "apk add --no-cache iproute2 && tc qdisc add dev eth0 root netem delay {}ms",
185                    self.config.latency_ms()
186                ));
187
188            match tc_cmd.output() {
189                Ok(output) => {
190                    if !output.status.success() {
191                        let stderr = String::from_utf8_lossy(&output.stderr);
192                        return Err(
193                            anyhow!("{}", stderr).context("failed to configure network latency")
194                        );
195                    }
196                }
197                Err(e) => {
198                    return Err(anyhow!(e).context("failed to configure network latency"));
199                }
200            }
201        }
202
203        ctx.pb.set_message("started");
204
205        Ok(())
206    }
207
208    fn id(&self) -> String {
209        self.config.id()
210    }
211}
212
213fn check_docker_installed() -> Result<()> {
214    Command::new("docker")
215        .arg("--version")
216        .stdout(Stdio::null())
217        .stderr(Stdio::null())
218        .status()
219        .map_err(anyhow::Error::from)
220        .and_then(|status| status.exit_ok().map_err(anyhow::Error::from))
221        .context("service requires docker to be installed")
222}