risedev/task/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compactor_service;
16mod compute_node_service;
17mod configure_tmux_service;
18mod docker_service;
19mod dummy_service;
20mod ensure_stop_service;
21mod frontend_service;
22mod grafana_service;
23mod kafka_service;
24mod lakekeeper_service;
25mod meta_node_service;
26mod minio_service;
27mod moat_service;
28mod mqtt_service;
29mod mysql_service;
30mod nats_service;
31mod postgres_service;
32mod prometheus_service;
33mod pubsub_service;
34mod pulsar_service;
35mod redis_service;
36mod schema_registry_service;
37mod sql_server_service;
38mod task_configure_minio;
39mod task_db_ready_check;
40mod task_kafka_ready_check;
41mod task_log_ready_check;
42mod task_pubsub_emu_ready_check;
43mod task_redis_ready_check;
44mod task_tcp_ready_check;
45mod tempo_service;
46mod utils;
47
48use std::collections::HashMap;
49use std::env;
50use std::io::Write;
51use std::net::{TcpStream, ToSocketAddrs};
52use std::path::{Path, PathBuf};
53use std::process::{Command, Output};
54use std::sync::Arc;
55use std::time::Duration;
56
57use anyhow::{Context, Result, anyhow};
58use indicatif::ProgressBar;
59use reqwest::blocking::{Client, Response};
60use tempfile::TempDir;
61pub use utils::*;
62
63pub use self::compactor_service::*;
64pub use self::compute_node_service::*;
65pub use self::configure_tmux_service::*;
66pub use self::dummy_service::DummyService;
67pub use self::ensure_stop_service::*;
68pub use self::frontend_service::*;
69pub use self::grafana_service::*;
70pub use self::kafka_service::*;
71pub use self::lakekeeper_service::*;
72pub use self::meta_node_service::*;
73pub use self::minio_service::*;
74pub use self::moat_service::*;
75pub use self::mqtt_service::*;
76pub use self::mysql_service::*;
77pub use self::nats_service::*;
78pub use self::postgres_service::*;
79pub use self::prometheus_service::*;
80pub use self::pubsub_service::*;
81pub use self::pulsar_service::*;
82pub use self::redis_service::*;
83pub use self::schema_registry_service::SchemaRegistryService;
84pub use self::sql_server_service::*;
85pub use self::task_configure_minio::*;
86pub use self::task_db_ready_check::*;
87pub use self::task_kafka_ready_check::*;
88pub use self::task_log_ready_check::*;
89pub use self::task_pubsub_emu_ready_check::*;
90pub use self::task_redis_ready_check::*;
91pub use self::task_tcp_ready_check::*;
92pub use self::tempo_service::*;
93use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name};
94use crate::wait::{wait, wait_tcp_available};
95
96pub trait Task: 'static + Send {
97    /// Execute the task
98    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()>;
99
100    /// Get task id used in progress bar
101    fn id(&self) -> String {
102        "<task>".into()
103    }
104}
105
106/// A context used in task execution
107pub struct ExecuteContext<W>
108where
109    W: std::io::Write,
110{
111    /// Global log file object. (aka. risedev.log)
112    pub log: W,
113
114    /// Progress bar on screen.
115    pub pb: ProgressBar,
116
117    /// The directory for checking status.
118    ///
119    /// `RiseDev` will instruct every task to output their status to a file in temporary folder. By
120    /// checking this file, we can know whether a task has early exited.
121    pub status_dir: Arc<TempDir>,
122
123    /// The current service id running in this context.
124    pub id: Option<String>,
125
126    /// The status file corresponding to the current context.
127    pub status_file: Option<PathBuf>,
128
129    /// The log file corresponding to the current context. (e.g. frontend-4566.log)
130    pub log_file: Option<PathBuf>,
131}
132
133impl<W> ExecuteContext<W>
134where
135    W: std::io::Write,
136{
137    pub fn new(log: W, pb: ProgressBar, status_dir: Arc<TempDir>) -> Self {
138        Self {
139            log,
140            pb,
141            status_dir,
142            status_file: None,
143            log_file: None,
144            id: None,
145        }
146    }
147
148    pub fn service(&mut self, task: &impl Task) {
149        let id = task.id();
150        if !id.is_empty() {
151            begin_spin(&self.pb);
152            self.pb.set_prefix(id.clone());
153            self.id = Some(id.clone());
154            self.status_file = Some(self.status_dir.path().join(format!("{}.status", id)));
155
156            // Remove the old log file if exists to avoid confusion.
157            let log_file = Path::new(&env::var("PREFIX_LOG").unwrap())
158                .join(format!("{}.log", self.id.as_ref().unwrap()));
159            fs_err::remove_file(&log_file).ok();
160            self.log_file = Some(log_file);
161        }
162    }
163
164    pub fn run_command(&mut self, mut cmd: Command) -> Result<Output> {
165        let program_name = get_program_name(&cmd);
166
167        writeln!(self.log, "> {} {}", program_name, get_program_args(&cmd))?;
168
169        // Record a service command for `risedev restart`
170        let node_id = self.id.as_ref();
171        if program_name == "tmux"
172            && let Some(node_id) = node_id
173            && node_id != "tmux-configure"
174        {
175            let cmd: HashMap<String, String> = HashMap::from_iter([(
176                node_id.clone(),
177                format!("{} {}", program_name, get_program_args(&cmd)),
178            )]);
179            let prefix_config = env::var("PREFIX_CONFIG").unwrap();
180            let path = Path::new(&prefix_config).join("risedev_commands.yaml");
181            let content = serde_yaml::to_string(&cmd)?;
182            fs_err::OpenOptions::new()
183                .create(true)
184                .append(true)
185                .open(path)?
186                .write_all(content.as_bytes())?;
187        }
188
189        let output = cmd.output()?;
190
191        let mut full_output = String::from_utf8_lossy(&output.stdout).to_string();
192        full_output.extend(String::from_utf8_lossy(&output.stderr).chars());
193
194        write!(self.log, "{}", full_output)?;
195
196        writeln!(
197            self.log,
198            "({} exited with {:?})",
199            program_name,
200            output.status.code()
201        )?;
202
203        writeln!(self.log, "---")?;
204
205        output.status.exit_ok().context(full_output)?;
206
207        Ok(output)
208    }
209
210    pub fn complete_spin(&mut self) {
211        complete_spin(&self.pb);
212    }
213
214    pub fn status_path(&self) -> PathBuf {
215        self.status_file.clone().unwrap()
216    }
217
218    pub fn log_path(&self) -> &Path {
219        self.log_file.as_ref().unwrap().as_path()
220    }
221
222    pub fn wait_tcp(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
223        let addr = server
224            .as_ref()
225            .to_socket_addrs()?
226            .next()
227            .with_context(|| format!("failed to resolve {}", server.as_ref()))?;
228        wait(
229            || {
230                TcpStream::connect_timeout(&addr, Duration::from_secs(1)).with_context(|| {
231                    format!("failed to establish tcp connection to {}", server.as_ref())
232                })?;
233                Ok(())
234            },
235            &mut self.log,
236            self.status_file.as_ref().unwrap(),
237            self.id.as_ref().unwrap(),
238            Some(Duration::from_secs(30)),
239            true,
240        )?;
241        Ok(())
242    }
243
244    fn wait_http_with_response_cb(
245        &mut self,
246        server: impl AsRef<str>,
247        cb: impl Fn(Response) -> anyhow::Result<()>,
248    ) -> anyhow::Result<()> {
249        let server = server.as_ref();
250        wait(
251            || {
252                let resp = Client::new()
253                    .get(server)
254                    .timeout(Duration::from_secs(1))
255                    .body("")
256                    .send()?
257                    .error_for_status()
258                    .with_context(|| {
259                        format!("failed to establish http connection to {}", server)
260                    })?;
261
262                cb(resp)
263            },
264            &mut self.log,
265            self.status_file.as_ref().unwrap(),
266            self.id.as_ref().unwrap(),
267            Some(Duration::from_secs(30)),
268            true,
269        )
270    }
271
272    pub fn wait_http(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
273        self.wait_http_with_response_cb(server, |_| Ok(()))
274    }
275
276    pub fn wait_http_with_text_cb(
277        &mut self,
278        server: impl AsRef<str>,
279        cb: impl Fn(&str) -> bool,
280    ) -> anyhow::Result<()> {
281        self.wait_http_with_response_cb(server, |resp| {
282            let data = resp.text()?;
283            if cb(&data) {
284                Ok(())
285            } else {
286                Err(anyhow!(
287                    "http health check callback failed with body: {:?}",
288                    data
289                ))
290            }
291        })
292    }
293
294    pub fn wait(&mut self, wait_func: impl FnMut() -> Result<()>) -> anyhow::Result<()> {
295        wait(
296            wait_func,
297            &mut self.log,
298            self.status_file.as_ref().unwrap(),
299            self.id.as_ref().unwrap(),
300            Some(Duration::from_secs(30)),
301            true,
302        )
303    }
304
305    /// Wait for a TCP port to close
306    pub fn wait_tcp_close(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
307        wait_tcp_available(server, Some(Duration::from_secs(30)))?;
308        Ok(())
309    }
310
311    /// Wait for a user-managed service to be available
312    pub fn wait_tcp_user(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
313        let addr = server
314            .as_ref()
315            .to_socket_addrs()?
316            .next()
317            .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref()));
318        wait(
319            || {
320                TcpStream::connect_timeout(&addr, Duration::from_secs(1))?;
321                Ok(())
322            },
323            &mut self.log,
324            self.status_file.as_ref().unwrap(),
325            self.id.as_ref().unwrap(),
326            None,
327            false,
328        )?;
329        Ok(())
330    }
331
332    pub fn tmux_run(&self, user_cmd: Command) -> anyhow::Result<Command> {
333        let prefix_path = env::var("PREFIX_BIN")?;
334        let mut cmd = new_tmux_command();
335        cmd.arg("new-window")
336            // Set target name
337            .arg("-t")
338            .arg(RISEDEV_NAME)
339            // Switch to background window
340            .arg("-d")
341            // Set session name for this window
342            .arg("-n")
343            .arg(self.id.as_ref().unwrap());
344
345        if let Some(dir) = user_cmd.get_current_dir() {
346            cmd.arg("-c").arg(dir);
347        }
348        for (k, v) in user_cmd.get_envs() {
349            cmd.arg("-e");
350            if let Some(v) = v {
351                cmd.arg(format!("{}={}", k.to_string_lossy(), v.to_string_lossy()));
352            } else {
353                cmd.arg(k);
354            }
355        }
356        cmd.arg(Path::new(&prefix_path).join("run_command.sh"));
357        cmd.arg(self.log_path());
358        cmd.arg(self.status_path());
359        cmd.arg(user_cmd.get_program());
360        for arg in user_cmd.get_args() {
361            cmd.arg(arg);
362        }
363
364        Ok(cmd)
365    }
366}