1mod compactor_service;
16mod compute_node_service;
17mod configure_tmux_service;
18mod docker_service;
19mod dummy_service;
20mod ensure_stop_service;
21mod frontend_service;
22mod grafana_service;
23mod kafka_service;
24mod lakekeeper_service;
25mod meta_node_service;
26mod minio_service;
27mod moat_service;
28mod mysql_service;
29mod postgres_service;
30mod prometheus_service;
31mod pubsub_service;
32mod pulsar_service;
33mod redis_service;
34mod schema_registry_service;
35mod sql_server_service;
36mod task_configure_minio;
37mod task_db_ready_check;
38mod task_kafka_ready_check;
39mod task_log_ready_check;
40mod task_pubsub_emu_ready_check;
41mod task_redis_ready_check;
42mod task_tcp_ready_check;
43mod tempo_service;
44mod utils;
45
46use std::collections::HashMap;
47use std::env;
48use std::io::Write;
49use std::net::{TcpStream, ToSocketAddrs};
50use std::path::{Path, PathBuf};
51use std::process::{Command, Output};
52use std::sync::Arc;
53use std::time::Duration;
54
55use anyhow::{Context, Result, anyhow};
56use indicatif::ProgressBar;
57use reqwest::blocking::{Client, Response};
58use tempfile::TempDir;
59pub use utils::*;
60
61pub use self::compactor_service::*;
62pub use self::compute_node_service::*;
63pub use self::configure_tmux_service::*;
64pub use self::dummy_service::DummyService;
65pub use self::ensure_stop_service::*;
66pub use self::frontend_service::*;
67pub use self::grafana_service::*;
68pub use self::kafka_service::*;
69pub use self::lakekeeper_service::*;
70pub use self::meta_node_service::*;
71pub use self::minio_service::*;
72pub use self::moat_service::*;
73pub use self::mysql_service::*;
74pub use self::postgres_service::*;
75pub use self::prometheus_service::*;
76pub use self::pubsub_service::*;
77pub use self::pulsar_service::*;
78pub use self::redis_service::*;
79pub use self::schema_registry_service::SchemaRegistryService;
80pub use self::sql_server_service::*;
81pub use self::task_configure_minio::*;
82pub use self::task_db_ready_check::*;
83pub use self::task_kafka_ready_check::*;
84pub use self::task_log_ready_check::*;
85pub use self::task_pubsub_emu_ready_check::*;
86pub use self::task_redis_ready_check::*;
87pub use self::task_tcp_ready_check::*;
88pub use self::tempo_service::*;
89use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name};
90use crate::wait::{wait, wait_tcp_available};
91
92pub trait Task: 'static + Send {
93 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()>;
95
96 fn id(&self) -> String {
98 "<task>".into()
99 }
100}
101
102pub struct ExecuteContext<W>
104where
105 W: std::io::Write,
106{
107 pub log: W,
109
110 pub pb: ProgressBar,
112
113 pub status_dir: Arc<TempDir>,
118
119 pub id: Option<String>,
121
122 pub status_file: Option<PathBuf>,
124
125 pub log_file: Option<PathBuf>,
127}
128
129impl<W> ExecuteContext<W>
130where
131 W: std::io::Write,
132{
133 pub fn new(log: W, pb: ProgressBar, status_dir: Arc<TempDir>) -> Self {
134 Self {
135 log,
136 pb,
137 status_dir,
138 status_file: None,
139 log_file: None,
140 id: None,
141 }
142 }
143
144 pub fn service(&mut self, task: &impl Task) {
145 let id = task.id();
146 if !id.is_empty() {
147 begin_spin(&self.pb);
148 self.pb.set_prefix(id.clone());
149 self.id = Some(id.clone());
150 self.status_file = Some(self.status_dir.path().join(format!("{}.status", id)));
151
152 let log_file = Path::new(&env::var("PREFIX_LOG").unwrap())
154 .join(format!("{}.log", self.id.as_ref().unwrap()));
155 fs_err::remove_file(&log_file).ok();
156 self.log_file = Some(log_file);
157 }
158 }
159
160 pub fn run_command(&mut self, mut cmd: Command) -> Result<Output> {
161 let program_name = get_program_name(&cmd);
162
163 writeln!(self.log, "> {} {}", program_name, get_program_args(&cmd))?;
164
165 let node_id = self.id.as_ref();
167 if program_name == "tmux"
168 && let Some(node_id) = node_id
169 && node_id != "tmux-configure"
170 {
171 let cmd: HashMap<String, String> = HashMap::from_iter([(
172 node_id.clone(),
173 format!("{} {}", program_name, get_program_args(&cmd)),
174 )]);
175 let prefix_config = env::var("PREFIX_CONFIG").unwrap();
176 let path = Path::new(&prefix_config).join("risedev_commands.yaml");
177 let content = serde_yaml::to_string(&cmd)?;
178 fs_err::OpenOptions::new()
179 .create(true)
180 .append(true)
181 .open(path)?
182 .write_all(content.as_bytes())?;
183 }
184
185 let output = cmd.output()?;
186
187 let mut full_output = String::from_utf8_lossy(&output.stdout).to_string();
188 full_output.extend(String::from_utf8_lossy(&output.stderr).chars());
189
190 write!(self.log, "{}", full_output)?;
191
192 writeln!(
193 self.log,
194 "({} exited with {:?})",
195 program_name,
196 output.status.code()
197 )?;
198
199 writeln!(self.log, "---")?;
200
201 output.status.exit_ok().context(full_output)?;
202
203 Ok(output)
204 }
205
206 pub fn complete_spin(&mut self) {
207 complete_spin(&self.pb);
208 }
209
210 pub fn status_path(&self) -> PathBuf {
211 self.status_file.clone().unwrap()
212 }
213
214 pub fn log_path(&self) -> &Path {
215 self.log_file.as_ref().unwrap().as_path()
216 }
217
218 pub fn wait_tcp(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
219 let addr = server
220 .as_ref()
221 .to_socket_addrs()?
222 .next()
223 .with_context(|| format!("failed to resolve {}", server.as_ref()))?;
224 wait(
225 || {
226 TcpStream::connect_timeout(&addr, Duration::from_secs(1)).with_context(|| {
227 format!("failed to establish tcp connection to {}", server.as_ref())
228 })?;
229 Ok(())
230 },
231 &mut self.log,
232 self.status_file.as_ref().unwrap(),
233 self.id.as_ref().unwrap(),
234 Some(Duration::from_secs(30)),
235 true,
236 )?;
237 Ok(())
238 }
239
240 fn wait_http_with_response_cb(
241 &mut self,
242 server: impl AsRef<str>,
243 cb: impl Fn(Response) -> anyhow::Result<()>,
244 ) -> anyhow::Result<()> {
245 let server = server.as_ref();
246 wait(
247 || {
248 let resp = Client::new()
249 .get(server)
250 .timeout(Duration::from_secs(1))
251 .body("")
252 .send()?
253 .error_for_status()
254 .with_context(|| {
255 format!("failed to establish http connection to {}", server)
256 })?;
257
258 cb(resp)
259 },
260 &mut self.log,
261 self.status_file.as_ref().unwrap(),
262 self.id.as_ref().unwrap(),
263 Some(Duration::from_secs(30)),
264 true,
265 )
266 }
267
268 pub fn wait_http(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
269 self.wait_http_with_response_cb(server, |_| Ok(()))
270 }
271
272 pub fn wait_http_with_text_cb(
273 &mut self,
274 server: impl AsRef<str>,
275 cb: impl Fn(&str) -> bool,
276 ) -> anyhow::Result<()> {
277 self.wait_http_with_response_cb(server, |resp| {
278 let data = resp.text()?;
279 if cb(&data) {
280 Ok(())
281 } else {
282 Err(anyhow!(
283 "http health check callback failed with body: {:?}",
284 data
285 ))
286 }
287 })
288 }
289
290 pub fn wait(&mut self, wait_func: impl FnMut() -> Result<()>) -> anyhow::Result<()> {
291 wait(
292 wait_func,
293 &mut self.log,
294 self.status_file.as_ref().unwrap(),
295 self.id.as_ref().unwrap(),
296 Some(Duration::from_secs(30)),
297 true,
298 )
299 }
300
301 pub fn wait_tcp_close(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
303 wait_tcp_available(server, Some(Duration::from_secs(30)))?;
304 Ok(())
305 }
306
307 pub fn wait_tcp_user(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
309 let addr = server
310 .as_ref()
311 .to_socket_addrs()?
312 .next()
313 .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref()));
314 wait(
315 || {
316 TcpStream::connect_timeout(&addr, Duration::from_secs(1))?;
317 Ok(())
318 },
319 &mut self.log,
320 self.status_file.as_ref().unwrap(),
321 self.id.as_ref().unwrap(),
322 None,
323 false,
324 )?;
325 Ok(())
326 }
327
328 pub fn tmux_run(&self, user_cmd: Command) -> anyhow::Result<Command> {
329 let prefix_path = env::var("PREFIX_BIN")?;
330 let mut cmd = new_tmux_command();
331 cmd.arg("new-window")
332 .arg("-t")
334 .arg(RISEDEV_NAME)
335 .arg("-d")
337 .arg("-n")
339 .arg(self.id.as_ref().unwrap());
340
341 if let Some(dir) = user_cmd.get_current_dir() {
342 cmd.arg("-c").arg(dir);
343 }
344 for (k, v) in user_cmd.get_envs() {
345 cmd.arg("-e");
346 if let Some(v) = v {
347 cmd.arg(format!("{}={}", k.to_string_lossy(), v.to_string_lossy()));
348 } else {
349 cmd.arg(k);
350 }
351 }
352 cmd.arg(Path::new(&prefix_path).join("run_command.sh"));
353 cmd.arg(self.log_path());
354 cmd.arg(self.status_path());
355 cmd.arg(user_cmd.get_program());
356 for arg in user_cmd.get_args() {
357 cmd.arg(arg);
358 }
359
360 Ok(cmd)
361 }
362}