1mod compactor_service;
16mod compute_node_service;
17mod configure_tmux_service;
18mod docker_service;
19mod dummy_service;
20mod ensure_stop_service;
21mod frontend_service;
22mod grafana_service;
23mod kafka_service;
24mod lakekeeper_service;
25mod meta_node_service;
26mod minio_service;
27mod moat_service;
28mod mqtt_service;
29mod mysql_service;
30mod nats_service;
31mod postgres_service;
32mod prometheus_service;
33mod pubsub_service;
34mod pulsar_service;
35mod redis_service;
36mod schema_registry_service;
37mod sql_server_service;
38mod task_configure_minio;
39mod task_db_ready_check;
40mod task_kafka_ready_check;
41mod task_log_ready_check;
42mod task_pubsub_emu_ready_check;
43mod task_redis_ready_check;
44mod task_tcp_ready_check;
45mod tempo_service;
46mod utils;
47
48use std::collections::HashMap;
49use std::env;
50use std::io::Write;
51use std::net::{TcpStream, ToSocketAddrs};
52use std::path::{Path, PathBuf};
53use std::process::{Command, Output};
54use std::sync::Arc;
55use std::time::Duration;
56
57use anyhow::{Context, Result, anyhow};
58use indicatif::ProgressBar;
59use reqwest::blocking::{Client, Response};
60use tempfile::TempDir;
61pub use utils::*;
62
63pub use self::compactor_service::*;
64pub use self::compute_node_service::*;
65pub use self::configure_tmux_service::*;
66pub use self::dummy_service::DummyService;
67pub use self::ensure_stop_service::*;
68pub use self::frontend_service::*;
69pub use self::grafana_service::*;
70pub use self::kafka_service::*;
71pub use self::lakekeeper_service::*;
72pub use self::meta_node_service::*;
73pub use self::minio_service::*;
74pub use self::moat_service::*;
75pub use self::mqtt_service::*;
76pub use self::mysql_service::*;
77pub use self::nats_service::*;
78pub use self::postgres_service::*;
79pub use self::prometheus_service::*;
80pub use self::pubsub_service::*;
81pub use self::pulsar_service::*;
82pub use self::redis_service::*;
83pub use self::schema_registry_service::SchemaRegistryService;
84pub use self::sql_server_service::*;
85pub use self::task_configure_minio::*;
86pub use self::task_db_ready_check::*;
87pub use self::task_kafka_ready_check::*;
88pub use self::task_log_ready_check::*;
89pub use self::task_pubsub_emu_ready_check::*;
90pub use self::task_redis_ready_check::*;
91pub use self::task_tcp_ready_check::*;
92pub use self::tempo_service::*;
93use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name};
94use crate::wait::{wait, wait_tcp_available};
95
96pub trait Task: 'static + Send {
97 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()>;
99
100 fn id(&self) -> String {
102 "<task>".into()
103 }
104}
105
106pub struct ExecuteContext<W>
108where
109 W: std::io::Write,
110{
111 pub log: W,
113
114 pub pb: ProgressBar,
116
117 pub status_dir: Arc<TempDir>,
122
123 pub id: Option<String>,
125
126 pub status_file: Option<PathBuf>,
128
129 pub log_file: Option<PathBuf>,
131}
132
133impl<W> ExecuteContext<W>
134where
135 W: std::io::Write,
136{
137 pub fn new(log: W, pb: ProgressBar, status_dir: Arc<TempDir>) -> Self {
138 Self {
139 log,
140 pb,
141 status_dir,
142 status_file: None,
143 log_file: None,
144 id: None,
145 }
146 }
147
148 pub fn service(&mut self, task: &impl Task) {
149 let id = task.id();
150 if !id.is_empty() {
151 begin_spin(&self.pb);
152 self.pb.set_prefix(id.clone());
153 self.id = Some(id.clone());
154 self.status_file = Some(self.status_dir.path().join(format!("{}.status", id)));
155
156 let log_file = Path::new(&env::var("PREFIX_LOG").unwrap())
158 .join(format!("{}.log", self.id.as_ref().unwrap()));
159 fs_err::remove_file(&log_file).ok();
160 self.log_file = Some(log_file);
161 }
162 }
163
164 pub fn run_command(&mut self, mut cmd: Command) -> Result<Output> {
165 let program_name = get_program_name(&cmd);
166
167 writeln!(self.log, "> {} {}", program_name, get_program_args(&cmd))?;
168
169 let node_id = self.id.as_ref();
171 if program_name == "tmux"
172 && let Some(node_id) = node_id
173 && node_id != "tmux-configure"
174 {
175 let cmd: HashMap<String, String> = HashMap::from_iter([(
176 node_id.clone(),
177 format!("{} {}", program_name, get_program_args(&cmd)),
178 )]);
179 let prefix_config = env::var("PREFIX_CONFIG").unwrap();
180 let path = Path::new(&prefix_config).join("risedev_commands.yaml");
181 let content = serde_yaml::to_string(&cmd)?;
182 fs_err::OpenOptions::new()
183 .create(true)
184 .append(true)
185 .open(path)?
186 .write_all(content.as_bytes())?;
187 }
188
189 let output = cmd.output()?;
190
191 let mut full_output = String::from_utf8_lossy(&output.stdout).to_string();
192 full_output.extend(String::from_utf8_lossy(&output.stderr).chars());
193
194 write!(self.log, "{}", full_output)?;
195
196 writeln!(
197 self.log,
198 "({} exited with {:?})",
199 program_name,
200 output.status.code()
201 )?;
202
203 writeln!(self.log, "---")?;
204
205 output.status.exit_ok().context(full_output)?;
206
207 Ok(output)
208 }
209
210 pub fn complete_spin(&mut self) {
211 complete_spin(&self.pb);
212 }
213
214 pub fn status_path(&self) -> PathBuf {
215 self.status_file.clone().unwrap()
216 }
217
218 pub fn log_path(&self) -> &Path {
219 self.log_file.as_ref().unwrap().as_path()
220 }
221
222 pub fn wait_tcp(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
223 let addr = server
224 .as_ref()
225 .to_socket_addrs()?
226 .next()
227 .with_context(|| format!("failed to resolve {}", server.as_ref()))?;
228 wait(
229 || {
230 TcpStream::connect_timeout(&addr, Duration::from_secs(1)).with_context(|| {
231 format!("failed to establish tcp connection to {}", server.as_ref())
232 })?;
233 Ok(())
234 },
235 &mut self.log,
236 self.status_file.as_ref().unwrap(),
237 self.id.as_ref().unwrap(),
238 Some(Duration::from_secs(30)),
239 true,
240 )?;
241 Ok(())
242 }
243
244 fn wait_http_with_response_cb(
245 &mut self,
246 server: impl AsRef<str>,
247 cb: impl Fn(Response) -> anyhow::Result<()>,
248 ) -> anyhow::Result<()> {
249 let server = server.as_ref();
250 wait(
251 || {
252 let resp = Client::new()
253 .get(server)
254 .timeout(Duration::from_secs(1))
255 .body("")
256 .send()?
257 .error_for_status()
258 .with_context(|| {
259 format!("failed to establish http connection to {}", server)
260 })?;
261
262 cb(resp)
263 },
264 &mut self.log,
265 self.status_file.as_ref().unwrap(),
266 self.id.as_ref().unwrap(),
267 Some(Duration::from_secs(30)),
268 true,
269 )
270 }
271
272 pub fn wait_http(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
273 self.wait_http_with_response_cb(server, |_| Ok(()))
274 }
275
276 pub fn wait_http_with_text_cb(
277 &mut self,
278 server: impl AsRef<str>,
279 cb: impl Fn(&str) -> bool,
280 ) -> anyhow::Result<()> {
281 self.wait_http_with_response_cb(server, |resp| {
282 let data = resp.text()?;
283 if cb(&data) {
284 Ok(())
285 } else {
286 Err(anyhow!(
287 "http health check callback failed with body: {:?}",
288 data
289 ))
290 }
291 })
292 }
293
294 pub fn wait(&mut self, wait_func: impl FnMut() -> Result<()>) -> anyhow::Result<()> {
295 wait(
296 wait_func,
297 &mut self.log,
298 self.status_file.as_ref().unwrap(),
299 self.id.as_ref().unwrap(),
300 Some(Duration::from_secs(30)),
301 true,
302 )
303 }
304
305 pub fn wait_tcp_close(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
307 wait_tcp_available(server, Some(Duration::from_secs(30)))?;
308 Ok(())
309 }
310
311 pub fn wait_tcp_user(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
313 let addr = server
314 .as_ref()
315 .to_socket_addrs()?
316 .next()
317 .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref()));
318 wait(
319 || {
320 TcpStream::connect_timeout(&addr, Duration::from_secs(1))?;
321 Ok(())
322 },
323 &mut self.log,
324 self.status_file.as_ref().unwrap(),
325 self.id.as_ref().unwrap(),
326 None,
327 false,
328 )?;
329 Ok(())
330 }
331
332 pub fn tmux_run(&self, user_cmd: Command) -> anyhow::Result<Command> {
333 let prefix_path = env::var("PREFIX_BIN")?;
334 let mut cmd = new_tmux_command();
335 cmd.arg("new-window")
336 .arg("-t")
338 .arg(RISEDEV_NAME)
339 .arg("-d")
341 .arg("-n")
343 .arg(self.id.as_ref().unwrap());
344
345 if let Some(dir) = user_cmd.get_current_dir() {
346 cmd.arg("-c").arg(dir);
347 }
348 for (k, v) in user_cmd.get_envs() {
349 cmd.arg("-e");
350 if let Some(v) = v {
351 cmd.arg(format!("{}={}", k.to_string_lossy(), v.to_string_lossy()));
352 } else {
353 cmd.arg(k);
354 }
355 }
356 cmd.arg(Path::new(&prefix_path).join("run_command.sh"));
357 cmd.arg(self.log_path());
358 cmd.arg(self.status_path());
359 cmd.arg(user_cmd.get_program());
360 for arg in user_cmd.get_args() {
361 cmd.arg(arg);
362 }
363
364 Ok(cmd)
365 }
366}