risedev/
task.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compactor_service;
16mod compute_node_service;
17mod configure_tmux_service;
18mod docker_service;
19mod dummy_service;
20mod ensure_stop_service;
21mod frontend_service;
22mod grafana_service;
23mod kafka_service;
24mod meta_node_service;
25mod minio_service;
26mod mysql_service;
27mod postgres_service;
28mod prometheus_service;
29mod pubsub_service;
30mod redis_service;
31mod schema_registry_service;
32mod sql_server_service;
33mod task_configure_minio;
34mod task_db_ready_check;
35mod task_kafka_ready_check;
36mod task_log_ready_check;
37mod task_pubsub_emu_ready_check;
38mod task_redis_ready_check;
39mod task_tcp_ready_check;
40mod tempo_service;
41mod utils;
42
43use std::env;
44use std::net::{TcpStream, ToSocketAddrs};
45use std::path::{Path, PathBuf};
46use std::process::{Command, Output};
47use std::sync::Arc;
48use std::time::Duration;
49
50use anyhow::{Context, Result, anyhow};
51use indicatif::ProgressBar;
52use reqwest::blocking::{Client, Response};
53use tempfile::TempDir;
54pub use utils::*;
55
56pub use self::compactor_service::*;
57pub use self::compute_node_service::*;
58pub use self::configure_tmux_service::*;
59pub use self::dummy_service::DummyService;
60pub use self::ensure_stop_service::*;
61pub use self::frontend_service::*;
62pub use self::grafana_service::*;
63pub use self::kafka_service::*;
64pub use self::meta_node_service::*;
65pub use self::minio_service::*;
66pub use self::mysql_service::*;
67pub use self::postgres_service::*;
68pub use self::prometheus_service::*;
69pub use self::pubsub_service::*;
70pub use self::redis_service::*;
71pub use self::schema_registry_service::SchemaRegistryService;
72pub use self::sql_server_service::*;
73pub use self::task_configure_minio::*;
74pub use self::task_db_ready_check::*;
75pub use self::task_kafka_ready_check::*;
76pub use self::task_log_ready_check::*;
77pub use self::task_pubsub_emu_ready_check::*;
78pub use self::task_redis_ready_check::*;
79pub use self::task_tcp_ready_check::*;
80pub use self::tempo_service::*;
81use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name};
82use crate::wait::{wait, wait_tcp_available};
83
84pub trait Task: 'static + Send {
85    /// Execute the task
86    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()>;
87
88    /// Get task id used in progress bar
89    fn id(&self) -> String {
90        "<task>".into()
91    }
92}
93
94/// A context used in task execution
95pub struct ExecuteContext<W>
96where
97    W: std::io::Write,
98{
99    /// Global log file object. (aka. risedev.log)
100    pub log: W,
101
102    /// Progress bar on screen.
103    pub pb: ProgressBar,
104
105    /// The directory for checking status.
106    ///
107    /// `RiseDev` will instruct every task to output their status to a file in temporary folder. By
108    /// checking this file, we can know whether a task has early exited.
109    pub status_dir: Arc<TempDir>,
110
111    /// The current service id running in this context.
112    pub id: Option<String>,
113
114    /// The status file corresponding to the current context.
115    pub status_file: Option<PathBuf>,
116
117    /// The log file corresponding to the current context. (e.g. frontend-4566.log)
118    pub log_file: Option<PathBuf>,
119}
120
121impl<W> ExecuteContext<W>
122where
123    W: std::io::Write,
124{
125    pub fn new(log: W, pb: ProgressBar, status_dir: Arc<TempDir>) -> Self {
126        Self {
127            log,
128            pb,
129            status_dir,
130            status_file: None,
131            log_file: None,
132            id: None,
133        }
134    }
135
136    pub fn service(&mut self, task: &impl Task) {
137        let id = task.id();
138        if !id.is_empty() {
139            begin_spin(&self.pb);
140            self.pb.set_prefix(id.clone());
141            self.id = Some(id.clone());
142            self.status_file = Some(self.status_dir.path().join(format!("{}.status", id)));
143
144            // Remove the old log file if exists to avoid confusion.
145            let log_file = Path::new(&env::var("PREFIX_LOG").unwrap())
146                .join(format!("{}.log", self.id.as_ref().unwrap()));
147            fs_err::remove_file(&log_file).ok();
148            self.log_file = Some(log_file);
149        }
150    }
151
152    pub fn run_command(&mut self, mut cmd: Command) -> Result<Output> {
153        let program_name = get_program_name(&cmd);
154
155        writeln!(self.log, "> {} {}", program_name, get_program_args(&cmd))?;
156
157        let output = cmd.output()?;
158
159        let mut full_output = String::from_utf8_lossy(&output.stdout).to_string();
160        full_output.extend(String::from_utf8_lossy(&output.stderr).chars());
161
162        write!(self.log, "{}", full_output)?;
163
164        writeln!(
165            self.log,
166            "({} exited with {:?})",
167            program_name,
168            output.status.code()
169        )?;
170
171        writeln!(self.log, "---")?;
172
173        output.status.exit_ok().context(full_output)?;
174
175        Ok(output)
176    }
177
178    pub fn complete_spin(&mut self) {
179        complete_spin(&self.pb);
180    }
181
182    pub fn status_path(&self) -> PathBuf {
183        self.status_file.clone().unwrap()
184    }
185
186    pub fn log_path(&self) -> &Path {
187        self.log_file.as_ref().unwrap().as_path()
188    }
189
190    pub fn wait_tcp(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
191        let addr = server
192            .as_ref()
193            .to_socket_addrs()?
194            .next()
195            .with_context(|| format!("failed to resolve {}", server.as_ref()))?;
196        wait(
197            || {
198                TcpStream::connect_timeout(&addr, Duration::from_secs(1)).with_context(|| {
199                    format!("failed to establish tcp connection to {}", server.as_ref())
200                })?;
201                Ok(())
202            },
203            &mut self.log,
204            self.status_file.as_ref().unwrap(),
205            self.id.as_ref().unwrap(),
206            Some(Duration::from_secs(30)),
207            true,
208        )?;
209        Ok(())
210    }
211
212    fn wait_http_with_response_cb(
213        &mut self,
214        server: impl AsRef<str>,
215        cb: impl Fn(Response) -> anyhow::Result<()>,
216    ) -> anyhow::Result<()> {
217        let server = server.as_ref();
218        wait(
219            || {
220                let resp = Client::new()
221                    .get(server)
222                    .timeout(Duration::from_secs(1))
223                    .body("")
224                    .send()?
225                    .error_for_status()
226                    .with_context(|| {
227                        format!("failed to establish http connection to {}", server)
228                    })?;
229
230                cb(resp)
231            },
232            &mut self.log,
233            self.status_file.as_ref().unwrap(),
234            self.id.as_ref().unwrap(),
235            Some(Duration::from_secs(30)),
236            true,
237        )
238    }
239
240    pub fn wait_http(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
241        self.wait_http_with_response_cb(server, |_| Ok(()))
242    }
243
244    pub fn wait_http_with_text_cb(
245        &mut self,
246        server: impl AsRef<str>,
247        cb: impl Fn(&str) -> bool,
248    ) -> anyhow::Result<()> {
249        self.wait_http_with_response_cb(server, |resp| {
250            let data = resp.text()?;
251            if cb(&data) {
252                Ok(())
253            } else {
254                Err(anyhow!(
255                    "http health check callback failed with body: {:?}",
256                    data
257                ))
258            }
259        })
260    }
261
262    pub fn wait(&mut self, wait_func: impl FnMut() -> Result<()>) -> anyhow::Result<()> {
263        wait(
264            wait_func,
265            &mut self.log,
266            self.status_file.as_ref().unwrap(),
267            self.id.as_ref().unwrap(),
268            Some(Duration::from_secs(30)),
269            true,
270        )
271    }
272
273    /// Wait for a TCP port to close
274    pub fn wait_tcp_close(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
275        wait_tcp_available(server, Some(Duration::from_secs(30)))?;
276        Ok(())
277    }
278
279    /// Wait for a user-managed service to be available
280    pub fn wait_tcp_user(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
281        let addr = server
282            .as_ref()
283            .to_socket_addrs()?
284            .next()
285            .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref()));
286        wait(
287            || {
288                TcpStream::connect_timeout(&addr, Duration::from_secs(1))?;
289                Ok(())
290            },
291            &mut self.log,
292            self.status_file.as_ref().unwrap(),
293            self.id.as_ref().unwrap(),
294            None,
295            false,
296        )?;
297        Ok(())
298    }
299
300    pub fn tmux_run(&self, user_cmd: Command) -> anyhow::Result<Command> {
301        let prefix_path = env::var("PREFIX_BIN")?;
302        let mut cmd = new_tmux_command();
303        cmd.arg("new-window")
304            // Set target name
305            .arg("-t")
306            .arg(RISEDEV_NAME)
307            // Switch to background window
308            .arg("-d")
309            // Set session name for this window
310            .arg("-n")
311            .arg(self.id.as_ref().unwrap());
312        if let Some(dir) = user_cmd.get_current_dir() {
313            cmd.arg("-c").arg(dir);
314        }
315        for (k, v) in user_cmd.get_envs() {
316            cmd.arg("-e");
317            if let Some(v) = v {
318                cmd.arg(format!("{}={}", k.to_string_lossy(), v.to_string_lossy()));
319            } else {
320                cmd.arg(k);
321            }
322        }
323        cmd.arg(Path::new(&prefix_path).join("run_command.sh"));
324        cmd.arg(self.log_path());
325        cmd.arg(self.status_path());
326        cmd.arg(user_cmd.get_program());
327        for arg in user_cmd.get_args() {
328            cmd.arg(arg);
329        }
330        Ok(cmd)
331    }
332}