1mod compactor_service;
16mod compute_node_service;
17mod configure_tmux_service;
18mod docker_service;
19mod dummy_service;
20mod ensure_stop_service;
21mod frontend_service;
22mod grafana_service;
23mod kafka_service;
24mod meta_node_service;
25mod minio_service;
26mod mysql_service;
27mod postgres_service;
28mod prometheus_service;
29mod pubsub_service;
30mod redis_service;
31mod schema_registry_service;
32mod sql_server_service;
33mod task_configure_minio;
34mod task_db_ready_check;
35mod task_kafka_ready_check;
36mod task_log_ready_check;
37mod task_pubsub_emu_ready_check;
38mod task_redis_ready_check;
39mod task_tcp_ready_check;
40mod tempo_service;
41mod utils;
42
43use std::env;
44use std::net::{TcpStream, ToSocketAddrs};
45use std::path::{Path, PathBuf};
46use std::process::{Command, Output};
47use std::sync::Arc;
48use std::time::Duration;
49
50use anyhow::{Context, Result, anyhow};
51use indicatif::ProgressBar;
52use reqwest::blocking::{Client, Response};
53use tempfile::TempDir;
54pub use utils::*;
55
56pub use self::compactor_service::*;
57pub use self::compute_node_service::*;
58pub use self::configure_tmux_service::*;
59pub use self::dummy_service::DummyService;
60pub use self::ensure_stop_service::*;
61pub use self::frontend_service::*;
62pub use self::grafana_service::*;
63pub use self::kafka_service::*;
64pub use self::meta_node_service::*;
65pub use self::minio_service::*;
66pub use self::mysql_service::*;
67pub use self::postgres_service::*;
68pub use self::prometheus_service::*;
69pub use self::pubsub_service::*;
70pub use self::redis_service::*;
71pub use self::schema_registry_service::SchemaRegistryService;
72pub use self::sql_server_service::*;
73pub use self::task_configure_minio::*;
74pub use self::task_db_ready_check::*;
75pub use self::task_kafka_ready_check::*;
76pub use self::task_log_ready_check::*;
77pub use self::task_pubsub_emu_ready_check::*;
78pub use self::task_redis_ready_check::*;
79pub use self::task_tcp_ready_check::*;
80pub use self::tempo_service::*;
81use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name};
82use crate::wait::{wait, wait_tcp_available};
83
84pub trait Task: 'static + Send {
85 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()>;
87
88 fn id(&self) -> String {
90 "<task>".into()
91 }
92}
93
94pub struct ExecuteContext<W>
96where
97 W: std::io::Write,
98{
99 pub log: W,
101
102 pub pb: ProgressBar,
104
105 pub status_dir: Arc<TempDir>,
110
111 pub id: Option<String>,
113
114 pub status_file: Option<PathBuf>,
116
117 pub log_file: Option<PathBuf>,
119}
120
121impl<W> ExecuteContext<W>
122where
123 W: std::io::Write,
124{
125 pub fn new(log: W, pb: ProgressBar, status_dir: Arc<TempDir>) -> Self {
126 Self {
127 log,
128 pb,
129 status_dir,
130 status_file: None,
131 log_file: None,
132 id: None,
133 }
134 }
135
136 pub fn service(&mut self, task: &impl Task) {
137 let id = task.id();
138 if !id.is_empty() {
139 begin_spin(&self.pb);
140 self.pb.set_prefix(id.clone());
141 self.id = Some(id.clone());
142 self.status_file = Some(self.status_dir.path().join(format!("{}.status", id)));
143
144 let log_file = Path::new(&env::var("PREFIX_LOG").unwrap())
146 .join(format!("{}.log", self.id.as_ref().unwrap()));
147 fs_err::remove_file(&log_file).ok();
148 self.log_file = Some(log_file);
149 }
150 }
151
152 pub fn run_command(&mut self, mut cmd: Command) -> Result<Output> {
153 let program_name = get_program_name(&cmd);
154
155 writeln!(self.log, "> {} {}", program_name, get_program_args(&cmd))?;
156
157 let output = cmd.output()?;
158
159 let mut full_output = String::from_utf8_lossy(&output.stdout).to_string();
160 full_output.extend(String::from_utf8_lossy(&output.stderr).chars());
161
162 write!(self.log, "{}", full_output)?;
163
164 writeln!(
165 self.log,
166 "({} exited with {:?})",
167 program_name,
168 output.status.code()
169 )?;
170
171 writeln!(self.log, "---")?;
172
173 output.status.exit_ok().context(full_output)?;
174
175 Ok(output)
176 }
177
178 pub fn complete_spin(&mut self) {
179 complete_spin(&self.pb);
180 }
181
182 pub fn status_path(&self) -> PathBuf {
183 self.status_file.clone().unwrap()
184 }
185
186 pub fn log_path(&self) -> &Path {
187 self.log_file.as_ref().unwrap().as_path()
188 }
189
190 pub fn wait_tcp(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
191 let addr = server
192 .as_ref()
193 .to_socket_addrs()?
194 .next()
195 .with_context(|| format!("failed to resolve {}", server.as_ref()))?;
196 wait(
197 || {
198 TcpStream::connect_timeout(&addr, Duration::from_secs(1)).with_context(|| {
199 format!("failed to establish tcp connection to {}", server.as_ref())
200 })?;
201 Ok(())
202 },
203 &mut self.log,
204 self.status_file.as_ref().unwrap(),
205 self.id.as_ref().unwrap(),
206 Some(Duration::from_secs(30)),
207 true,
208 )?;
209 Ok(())
210 }
211
212 fn wait_http_with_response_cb(
213 &mut self,
214 server: impl AsRef<str>,
215 cb: impl Fn(Response) -> anyhow::Result<()>,
216 ) -> anyhow::Result<()> {
217 let server = server.as_ref();
218 wait(
219 || {
220 let resp = Client::new()
221 .get(server)
222 .timeout(Duration::from_secs(1))
223 .body("")
224 .send()?
225 .error_for_status()
226 .with_context(|| {
227 format!("failed to establish http connection to {}", server)
228 })?;
229
230 cb(resp)
231 },
232 &mut self.log,
233 self.status_file.as_ref().unwrap(),
234 self.id.as_ref().unwrap(),
235 Some(Duration::from_secs(30)),
236 true,
237 )
238 }
239
240 pub fn wait_http(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
241 self.wait_http_with_response_cb(server, |_| Ok(()))
242 }
243
244 pub fn wait_http_with_text_cb(
245 &mut self,
246 server: impl AsRef<str>,
247 cb: impl Fn(&str) -> bool,
248 ) -> anyhow::Result<()> {
249 self.wait_http_with_response_cb(server, |resp| {
250 let data = resp.text()?;
251 if cb(&data) {
252 Ok(())
253 } else {
254 Err(anyhow!(
255 "http health check callback failed with body: {:?}",
256 data
257 ))
258 }
259 })
260 }
261
262 pub fn wait(&mut self, wait_func: impl FnMut() -> Result<()>) -> anyhow::Result<()> {
263 wait(
264 wait_func,
265 &mut self.log,
266 self.status_file.as_ref().unwrap(),
267 self.id.as_ref().unwrap(),
268 Some(Duration::from_secs(30)),
269 true,
270 )
271 }
272
273 pub fn wait_tcp_close(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
275 wait_tcp_available(server, Some(Duration::from_secs(30)))?;
276 Ok(())
277 }
278
279 pub fn wait_tcp_user(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
281 let addr = server
282 .as_ref()
283 .to_socket_addrs()?
284 .next()
285 .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref()));
286 wait(
287 || {
288 TcpStream::connect_timeout(&addr, Duration::from_secs(1))?;
289 Ok(())
290 },
291 &mut self.log,
292 self.status_file.as_ref().unwrap(),
293 self.id.as_ref().unwrap(),
294 None,
295 false,
296 )?;
297 Ok(())
298 }
299
300 pub fn tmux_run(&self, user_cmd: Command) -> anyhow::Result<Command> {
301 let prefix_path = env::var("PREFIX_BIN")?;
302 let mut cmd = new_tmux_command();
303 cmd.arg("new-window")
304 .arg("-t")
306 .arg(RISEDEV_NAME)
307 .arg("-d")
309 .arg("-n")
311 .arg(self.id.as_ref().unwrap());
312 if let Some(dir) = user_cmd.get_current_dir() {
313 cmd.arg("-c").arg(dir);
314 }
315 for (k, v) in user_cmd.get_envs() {
316 cmd.arg("-e");
317 if let Some(v) = v {
318 cmd.arg(format!("{}={}", k.to_string_lossy(), v.to_string_lossy()));
319 } else {
320 cmd.arg(k);
321 }
322 }
323 cmd.arg(Path::new(&prefix_path).join("run_command.sh"));
324 cmd.arg(self.log_path());
325 cmd.arg(self.status_path());
326 cmd.arg(user_cmd.get_program());
327 for arg in user_cmd.get_args() {
328 cmd.arg(arg);
329 }
330 Ok(cmd)
331 }
332}