risedev/task/
docker_service.rs1use std::path::Path;
16use std::process::{Command, Stdio};
17use std::{env, thread};
18
19use anyhow::{Context, Result, anyhow};
20
21use super::{ExecuteContext, Task};
22use crate::DummyService;
23
24pub trait DockerServiceConfig: Send + 'static {
29 fn id(&self) -> String;
31
32 fn is_user_managed(&self) -> bool;
37
38 fn image(&self) -> String;
40
41 fn args(&self) -> Vec<String> {
43 vec![]
44 }
45
46 fn envs(&self) -> Vec<(String, String)> {
48 vec![]
49 }
50
51 fn ports(&self) -> Vec<(String, String)> {
56 vec![]
57 }
58
59 fn data_path(&self) -> Option<String> {
63 None
64 }
65
66 fn latency_ms(&self) -> u32 {
70 0
71 }
72}
73
74pub struct DockerService<B> {
76 config: B,
77}
78
79impl<B> DockerService<B>
80where
81 B: DockerServiceConfig,
82{
83 pub fn new(config: B) -> Self {
84 Self { config }
85 }
86
87 fn check_image_exists(&self) -> bool {
93 Command::new("docker")
94 .arg("image")
95 .arg("inspect")
96 .arg(self.config.image())
97 .stdout(Stdio::null())
98 .stderr(Stdio::null())
99 .status()
100 .map(|status| status.success())
101 .unwrap_or(false)
102 }
103
104 fn docker_pull(&self) -> Command {
105 let mut cmd = Command::new("docker");
106 cmd.arg("pull").arg(self.config.image());
107 cmd
108 }
109
110 fn docker_run(&self) -> Result<Command> {
111 let mut cmd = Command::new("docker");
112 cmd.arg("run")
113 .arg("--rm")
114 .arg("--name")
115 .arg(format!("risedev-{}", self.id()))
116 .arg("--add-host")
117 .arg("host.docker.internal:host-gateway");
118
119 if self.config.latency_ms() > 0 {
121 cmd.arg("--cap-add=NET_ADMIN").arg("--cap-add=NET_RAW");
122 }
123
124 for (k, v) in self.config.envs() {
125 cmd.arg("-e").arg(format!("{k}={v}"));
126 }
127 for (container, host) in self.config.ports() {
128 cmd.arg("-p").arg(format!("{container}:{host}"));
129 }
130
131 if let Some(data_path) = self.config.data_path() {
132 let path = Path::new(&env::var("PREFIX_DATA")?).join(self.id());
133 fs_err::create_dir_all(&path)?;
134 cmd.arg("-v")
135 .arg(format!("{}:{}", path.to_string_lossy(), data_path));
136 }
137
138 cmd.arg(self.config.image());
139
140 cmd.args(self.config.args());
141
142 Ok(cmd)
143 }
144}
145
146impl<B> Task for DockerService<B>
147where
148 B: DockerServiceConfig,
149{
150 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
151 if self.config.is_user_managed() {
152 return DummyService::new(&self.id()).execute(ctx);
153 }
154
155 ctx.service(self);
156
157 check_docker_installed()?;
158
159 if !self.check_image_exists() {
160 ctx.pb
161 .set_message(format!("pulling image `{}`...", self.config.image()));
162 ctx.run_command(self.docker_pull())?;
163 }
164
165 ctx.pb.set_message("starting...");
166 ctx.run_command(ctx.tmux_run(self.docker_run()?)?)?;
167
168 if self.config.latency_ms() > 0 {
170 ctx.pb.set_message("configuring network latency...");
171
172 thread::sleep(std::time::Duration::from_secs(2));
174
175 let mut tc_cmd = Command::new("docker");
177 tc_cmd
178 .arg("exec")
179 .arg(format!("risedev-{}", self.id()))
180 .arg("sh")
181 .arg("-c")
182 .arg(format!(
184 "apk add --no-cache iproute2 && tc qdisc add dev eth0 root netem delay {}ms",
185 self.config.latency_ms()
186 ));
187
188 match tc_cmd.output() {
189 Ok(output) => {
190 if !output.status.success() {
191 let stderr = String::from_utf8_lossy(&output.stderr);
192 return Err(
193 anyhow!("{}", stderr).context("failed to configure network latency")
194 );
195 }
196 }
197 Err(e) => {
198 return Err(anyhow!(e).context("failed to configure network latency"));
199 }
200 }
201 }
202
203 ctx.pb.set_message("started");
204
205 Ok(())
206 }
207
208 fn id(&self) -> String {
209 self.config.id()
210 }
211}
212
213fn check_docker_installed() -> Result<()> {
214 Command::new("docker")
215 .arg("--version")
216 .stdout(Stdio::null())
217 .stderr(Stdio::null())
218 .status()
219 .map_err(anyhow::Error::from)
220 .and_then(|status| status.exit_ok().map_err(anyhow::Error::from))
221 .context("service requires docker to be installed")
222}