1mod compactor_service;
16mod compute_node_service;
17mod configure_tmux_service;
18mod docker_service;
19mod dummy_service;
20mod ensure_stop_service;
21mod frontend_service;
22mod grafana_service;
23mod kafka_service;
24mod meta_node_service;
25mod minio_service;
26mod mysql_service;
27mod postgres_service;
28mod prometheus_service;
29mod pubsub_service;
30mod pulsar_service;
31mod redis_service;
32mod schema_registry_service;
33mod sql_server_service;
34mod task_configure_minio;
35mod task_db_ready_check;
36mod task_kafka_ready_check;
37mod task_log_ready_check;
38mod task_pubsub_emu_ready_check;
39mod task_redis_ready_check;
40mod task_tcp_ready_check;
41mod tempo_service;
42mod utils;
43
44use std::collections::HashMap;
45use std::env;
46use std::io::Write;
47use std::net::{TcpStream, ToSocketAddrs};
48use std::path::{Path, PathBuf};
49use std::process::{Command, Output};
50use std::sync::Arc;
51use std::time::Duration;
52
53use anyhow::{Context, Result, anyhow};
54use indicatif::ProgressBar;
55use reqwest::blocking::{Client, Response};
56use tempfile::TempDir;
57pub use utils::*;
58
59pub use self::compactor_service::*;
60pub use self::compute_node_service::*;
61pub use self::configure_tmux_service::*;
62pub use self::dummy_service::DummyService;
63pub use self::ensure_stop_service::*;
64pub use self::frontend_service::*;
65pub use self::grafana_service::*;
66pub use self::kafka_service::*;
67pub use self::meta_node_service::*;
68pub use self::minio_service::*;
69pub use self::mysql_service::*;
70pub use self::postgres_service::*;
71pub use self::prometheus_service::*;
72pub use self::pubsub_service::*;
73pub use self::pulsar_service::*;
74pub use self::redis_service::*;
75pub use self::schema_registry_service::SchemaRegistryService;
76pub use self::sql_server_service::*;
77pub use self::task_configure_minio::*;
78pub use self::task_db_ready_check::*;
79pub use self::task_kafka_ready_check::*;
80pub use self::task_log_ready_check::*;
81pub use self::task_pubsub_emu_ready_check::*;
82pub use self::task_redis_ready_check::*;
83pub use self::task_tcp_ready_check::*;
84pub use self::tempo_service::*;
85use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name};
86use crate::wait::{wait, wait_tcp_available};
87
88pub trait Task: 'static + Send {
89 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()>;
91
92 fn id(&self) -> String {
94 "<task>".into()
95 }
96}
97
98pub struct ExecuteContext<W>
100where
101 W: std::io::Write,
102{
103 pub log: W,
105
106 pub pb: ProgressBar,
108
109 pub status_dir: Arc<TempDir>,
114
115 pub id: Option<String>,
117
118 pub status_file: Option<PathBuf>,
120
121 pub log_file: Option<PathBuf>,
123}
124
125impl<W> ExecuteContext<W>
126where
127 W: std::io::Write,
128{
129 pub fn new(log: W, pb: ProgressBar, status_dir: Arc<TempDir>) -> Self {
130 Self {
131 log,
132 pb,
133 status_dir,
134 status_file: None,
135 log_file: None,
136 id: None,
137 }
138 }
139
140 pub fn service(&mut self, task: &impl Task) {
141 let id = task.id();
142 if !id.is_empty() {
143 begin_spin(&self.pb);
144 self.pb.set_prefix(id.clone());
145 self.id = Some(id.clone());
146 self.status_file = Some(self.status_dir.path().join(format!("{}.status", id)));
147
148 let log_file = Path::new(&env::var("PREFIX_LOG").unwrap())
150 .join(format!("{}.log", self.id.as_ref().unwrap()));
151 fs_err::remove_file(&log_file).ok();
152 self.log_file = Some(log_file);
153 }
154 }
155
156 pub fn run_command(&mut self, mut cmd: Command) -> Result<Output> {
157 let program_name = get_program_name(&cmd);
158
159 writeln!(self.log, "> {} {}", program_name, get_program_args(&cmd))?;
160
161 let node_id = self.id.as_ref();
163 if program_name == "tmux"
164 && let Some(node_id) = node_id
165 && node_id != "tmux-configure"
166 {
167 let cmd: HashMap<String, String> = HashMap::from_iter([(
168 node_id.clone(),
169 format!("{} {}", program_name, get_program_args(&cmd)),
170 )]);
171 let prefix_config = env::var("PREFIX_CONFIG").unwrap();
172 let path = Path::new(&prefix_config).join("risedev_commands.yaml");
173 let content = serde_yaml::to_string(&cmd)?;
174 fs_err::OpenOptions::new()
175 .create(true)
176 .append(true)
177 .open(path)?
178 .write_all(content.as_bytes())?;
179 }
180
181 let output = cmd.output()?;
182
183 let mut full_output = String::from_utf8_lossy(&output.stdout).to_string();
184 full_output.extend(String::from_utf8_lossy(&output.stderr).chars());
185
186 write!(self.log, "{}", full_output)?;
187
188 writeln!(
189 self.log,
190 "({} exited with {:?})",
191 program_name,
192 output.status.code()
193 )?;
194
195 writeln!(self.log, "---")?;
196
197 output.status.exit_ok().context(full_output)?;
198
199 Ok(output)
200 }
201
202 pub fn complete_spin(&mut self) {
203 complete_spin(&self.pb);
204 }
205
206 pub fn status_path(&self) -> PathBuf {
207 self.status_file.clone().unwrap()
208 }
209
210 pub fn log_path(&self) -> &Path {
211 self.log_file.as_ref().unwrap().as_path()
212 }
213
214 pub fn wait_tcp(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
215 let addr = server
216 .as_ref()
217 .to_socket_addrs()?
218 .next()
219 .with_context(|| format!("failed to resolve {}", server.as_ref()))?;
220 wait(
221 || {
222 TcpStream::connect_timeout(&addr, Duration::from_secs(1)).with_context(|| {
223 format!("failed to establish tcp connection to {}", server.as_ref())
224 })?;
225 Ok(())
226 },
227 &mut self.log,
228 self.status_file.as_ref().unwrap(),
229 self.id.as_ref().unwrap(),
230 Some(Duration::from_secs(30)),
231 true,
232 )?;
233 Ok(())
234 }
235
236 fn wait_http_with_response_cb(
237 &mut self,
238 server: impl AsRef<str>,
239 cb: impl Fn(Response) -> anyhow::Result<()>,
240 ) -> anyhow::Result<()> {
241 let server = server.as_ref();
242 wait(
243 || {
244 let resp = Client::new()
245 .get(server)
246 .timeout(Duration::from_secs(1))
247 .body("")
248 .send()?
249 .error_for_status()
250 .with_context(|| {
251 format!("failed to establish http connection to {}", server)
252 })?;
253
254 cb(resp)
255 },
256 &mut self.log,
257 self.status_file.as_ref().unwrap(),
258 self.id.as_ref().unwrap(),
259 Some(Duration::from_secs(30)),
260 true,
261 )
262 }
263
264 pub fn wait_http(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
265 self.wait_http_with_response_cb(server, |_| Ok(()))
266 }
267
268 pub fn wait_http_with_text_cb(
269 &mut self,
270 server: impl AsRef<str>,
271 cb: impl Fn(&str) -> bool,
272 ) -> anyhow::Result<()> {
273 self.wait_http_with_response_cb(server, |resp| {
274 let data = resp.text()?;
275 if cb(&data) {
276 Ok(())
277 } else {
278 Err(anyhow!(
279 "http health check callback failed with body: {:?}",
280 data
281 ))
282 }
283 })
284 }
285
286 pub fn wait(&mut self, wait_func: impl FnMut() -> Result<()>) -> anyhow::Result<()> {
287 wait(
288 wait_func,
289 &mut self.log,
290 self.status_file.as_ref().unwrap(),
291 self.id.as_ref().unwrap(),
292 Some(Duration::from_secs(30)),
293 true,
294 )
295 }
296
297 pub fn wait_tcp_close(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
299 wait_tcp_available(server, Some(Duration::from_secs(30)))?;
300 Ok(())
301 }
302
303 pub fn wait_tcp_user(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
305 let addr = server
306 .as_ref()
307 .to_socket_addrs()?
308 .next()
309 .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref()));
310 wait(
311 || {
312 TcpStream::connect_timeout(&addr, Duration::from_secs(1))?;
313 Ok(())
314 },
315 &mut self.log,
316 self.status_file.as_ref().unwrap(),
317 self.id.as_ref().unwrap(),
318 None,
319 false,
320 )?;
321 Ok(())
322 }
323
324 pub fn tmux_run(&self, user_cmd: Command) -> anyhow::Result<Command> {
325 let prefix_path = env::var("PREFIX_BIN")?;
326 let mut cmd = new_tmux_command();
327 cmd.arg("new-window")
328 .arg("-t")
330 .arg(RISEDEV_NAME)
331 .arg("-d")
333 .arg("-n")
335 .arg(self.id.as_ref().unwrap());
336
337 if let Some(dir) = user_cmd.get_current_dir() {
338 cmd.arg("-c").arg(dir);
339 }
340 for (k, v) in user_cmd.get_envs() {
341 cmd.arg("-e");
342 if let Some(v) = v {
343 cmd.arg(format!("{}={}", k.to_string_lossy(), v.to_string_lossy()));
344 } else {
345 cmd.arg(k);
346 }
347 }
348 cmd.arg(Path::new(&prefix_path).join("run_command.sh"));
349 cmd.arg(self.log_path());
350 cmd.arg(self.status_path());
351 cmd.arg(user_cmd.get_program());
352 for arg in user_cmd.get_args() {
353 cmd.arg(arg);
354 }
355
356 Ok(cmd)
357 }
358}