risedev/task/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compactor_service;
16mod compute_node_service;
17mod configure_tmux_service;
18mod docker_service;
19mod dummy_service;
20mod ensure_stop_service;
21mod frontend_service;
22mod grafana_service;
23mod kafka_service;
24mod meta_node_service;
25mod minio_service;
26mod mysql_service;
27mod postgres_service;
28mod prometheus_service;
29mod pubsub_service;
30mod pulsar_service;
31mod redis_service;
32mod schema_registry_service;
33mod sql_server_service;
34mod task_configure_minio;
35mod task_db_ready_check;
36mod task_kafka_ready_check;
37mod task_log_ready_check;
38mod task_pubsub_emu_ready_check;
39mod task_redis_ready_check;
40mod task_tcp_ready_check;
41mod tempo_service;
42mod utils;
43
44use std::collections::HashMap;
45use std::env;
46use std::io::Write;
47use std::net::{TcpStream, ToSocketAddrs};
48use std::path::{Path, PathBuf};
49use std::process::{Command, Output};
50use std::sync::Arc;
51use std::time::Duration;
52
53use anyhow::{Context, Result, anyhow};
54use indicatif::ProgressBar;
55use reqwest::blocking::{Client, Response};
56use tempfile::TempDir;
57pub use utils::*;
58
59pub use self::compactor_service::*;
60pub use self::compute_node_service::*;
61pub use self::configure_tmux_service::*;
62pub use self::dummy_service::DummyService;
63pub use self::ensure_stop_service::*;
64pub use self::frontend_service::*;
65pub use self::grafana_service::*;
66pub use self::kafka_service::*;
67pub use self::meta_node_service::*;
68pub use self::minio_service::*;
69pub use self::mysql_service::*;
70pub use self::postgres_service::*;
71pub use self::prometheus_service::*;
72pub use self::pubsub_service::*;
73pub use self::pulsar_service::*;
74pub use self::redis_service::*;
75pub use self::schema_registry_service::SchemaRegistryService;
76pub use self::sql_server_service::*;
77pub use self::task_configure_minio::*;
78pub use self::task_db_ready_check::*;
79pub use self::task_kafka_ready_check::*;
80pub use self::task_log_ready_check::*;
81pub use self::task_pubsub_emu_ready_check::*;
82pub use self::task_redis_ready_check::*;
83pub use self::task_tcp_ready_check::*;
84pub use self::tempo_service::*;
85use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name};
86use crate::wait::{wait, wait_tcp_available};
87
88pub trait Task: 'static + Send {
89    /// Execute the task
90    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()>;
91
92    /// Get task id used in progress bar
93    fn id(&self) -> String {
94        "<task>".into()
95    }
96}
97
98/// A context used in task execution
99pub struct ExecuteContext<W>
100where
101    W: std::io::Write,
102{
103    /// Global log file object. (aka. risedev.log)
104    pub log: W,
105
106    /// Progress bar on screen.
107    pub pb: ProgressBar,
108
109    /// The directory for checking status.
110    ///
111    /// `RiseDev` will instruct every task to output their status to a file in temporary folder. By
112    /// checking this file, we can know whether a task has early exited.
113    pub status_dir: Arc<TempDir>,
114
115    /// The current service id running in this context.
116    pub id: Option<String>,
117
118    /// The status file corresponding to the current context.
119    pub status_file: Option<PathBuf>,
120
121    /// The log file corresponding to the current context. (e.g. frontend-4566.log)
122    pub log_file: Option<PathBuf>,
123}
124
125impl<W> ExecuteContext<W>
126where
127    W: std::io::Write,
128{
129    pub fn new(log: W, pb: ProgressBar, status_dir: Arc<TempDir>) -> Self {
130        Self {
131            log,
132            pb,
133            status_dir,
134            status_file: None,
135            log_file: None,
136            id: None,
137        }
138    }
139
140    pub fn service(&mut self, task: &impl Task) {
141        let id = task.id();
142        if !id.is_empty() {
143            begin_spin(&self.pb);
144            self.pb.set_prefix(id.clone());
145            self.id = Some(id.clone());
146            self.status_file = Some(self.status_dir.path().join(format!("{}.status", id)));
147
148            // Remove the old log file if exists to avoid confusion.
149            let log_file = Path::new(&env::var("PREFIX_LOG").unwrap())
150                .join(format!("{}.log", self.id.as_ref().unwrap()));
151            fs_err::remove_file(&log_file).ok();
152            self.log_file = Some(log_file);
153        }
154    }
155
156    pub fn run_command(&mut self, mut cmd: Command) -> Result<Output> {
157        let program_name = get_program_name(&cmd);
158
159        writeln!(self.log, "> {} {}", program_name, get_program_args(&cmd))?;
160
161        // Record a service command for `risedev restart`
162        let node_id = self.id.as_ref();
163        if program_name == "tmux"
164            && let Some(node_id) = node_id
165            && node_id != "tmux-configure"
166        {
167            let cmd: HashMap<String, String> = HashMap::from_iter([(
168                node_id.clone(),
169                format!("{} {}", program_name, get_program_args(&cmd)),
170            )]);
171            let prefix_config = env::var("PREFIX_CONFIG").unwrap();
172            let path = Path::new(&prefix_config).join("risedev_commands.yaml");
173            let content = serde_yaml::to_string(&cmd)?;
174            fs_err::OpenOptions::new()
175                .create(true)
176                .append(true)
177                .open(path)?
178                .write_all(content.as_bytes())?;
179        }
180
181        let output = cmd.output()?;
182
183        let mut full_output = String::from_utf8_lossy(&output.stdout).to_string();
184        full_output.extend(String::from_utf8_lossy(&output.stderr).chars());
185
186        write!(self.log, "{}", full_output)?;
187
188        writeln!(
189            self.log,
190            "({} exited with {:?})",
191            program_name,
192            output.status.code()
193        )?;
194
195        writeln!(self.log, "---")?;
196
197        output.status.exit_ok().context(full_output)?;
198
199        Ok(output)
200    }
201
202    pub fn complete_spin(&mut self) {
203        complete_spin(&self.pb);
204    }
205
206    pub fn status_path(&self) -> PathBuf {
207        self.status_file.clone().unwrap()
208    }
209
210    pub fn log_path(&self) -> &Path {
211        self.log_file.as_ref().unwrap().as_path()
212    }
213
214    pub fn wait_tcp(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
215        let addr = server
216            .as_ref()
217            .to_socket_addrs()?
218            .next()
219            .with_context(|| format!("failed to resolve {}", server.as_ref()))?;
220        wait(
221            || {
222                TcpStream::connect_timeout(&addr, Duration::from_secs(1)).with_context(|| {
223                    format!("failed to establish tcp connection to {}", server.as_ref())
224                })?;
225                Ok(())
226            },
227            &mut self.log,
228            self.status_file.as_ref().unwrap(),
229            self.id.as_ref().unwrap(),
230            Some(Duration::from_secs(30)),
231            true,
232        )?;
233        Ok(())
234    }
235
236    fn wait_http_with_response_cb(
237        &mut self,
238        server: impl AsRef<str>,
239        cb: impl Fn(Response) -> anyhow::Result<()>,
240    ) -> anyhow::Result<()> {
241        let server = server.as_ref();
242        wait(
243            || {
244                let resp = Client::new()
245                    .get(server)
246                    .timeout(Duration::from_secs(1))
247                    .body("")
248                    .send()?
249                    .error_for_status()
250                    .with_context(|| {
251                        format!("failed to establish http connection to {}", server)
252                    })?;
253
254                cb(resp)
255            },
256            &mut self.log,
257            self.status_file.as_ref().unwrap(),
258            self.id.as_ref().unwrap(),
259            Some(Duration::from_secs(30)),
260            true,
261        )
262    }
263
264    pub fn wait_http(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
265        self.wait_http_with_response_cb(server, |_| Ok(()))
266    }
267
268    pub fn wait_http_with_text_cb(
269        &mut self,
270        server: impl AsRef<str>,
271        cb: impl Fn(&str) -> bool,
272    ) -> anyhow::Result<()> {
273        self.wait_http_with_response_cb(server, |resp| {
274            let data = resp.text()?;
275            if cb(&data) {
276                Ok(())
277            } else {
278                Err(anyhow!(
279                    "http health check callback failed with body: {:?}",
280                    data
281                ))
282            }
283        })
284    }
285
286    pub fn wait(&mut self, wait_func: impl FnMut() -> Result<()>) -> anyhow::Result<()> {
287        wait(
288            wait_func,
289            &mut self.log,
290            self.status_file.as_ref().unwrap(),
291            self.id.as_ref().unwrap(),
292            Some(Duration::from_secs(30)),
293            true,
294        )
295    }
296
297    /// Wait for a TCP port to close
298    pub fn wait_tcp_close(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
299        wait_tcp_available(server, Some(Duration::from_secs(30)))?;
300        Ok(())
301    }
302
303    /// Wait for a user-managed service to be available
304    pub fn wait_tcp_user(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
305        let addr = server
306            .as_ref()
307            .to_socket_addrs()?
308            .next()
309            .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref()));
310        wait(
311            || {
312                TcpStream::connect_timeout(&addr, Duration::from_secs(1))?;
313                Ok(())
314            },
315            &mut self.log,
316            self.status_file.as_ref().unwrap(),
317            self.id.as_ref().unwrap(),
318            None,
319            false,
320        )?;
321        Ok(())
322    }
323
324    pub fn tmux_run(&self, user_cmd: Command) -> anyhow::Result<Command> {
325        let prefix_path = env::var("PREFIX_BIN")?;
326        let mut cmd = new_tmux_command();
327        cmd.arg("new-window")
328            // Set target name
329            .arg("-t")
330            .arg(RISEDEV_NAME)
331            // Switch to background window
332            .arg("-d")
333            // Set session name for this window
334            .arg("-n")
335            .arg(self.id.as_ref().unwrap());
336
337        if let Some(dir) = user_cmd.get_current_dir() {
338            cmd.arg("-c").arg(dir);
339        }
340        for (k, v) in user_cmd.get_envs() {
341            cmd.arg("-e");
342            if let Some(v) = v {
343                cmd.arg(format!("{}={}", k.to_string_lossy(), v.to_string_lossy()));
344            } else {
345                cmd.arg(k);
346            }
347        }
348        cmd.arg(Path::new(&prefix_path).join("run_command.sh"));
349        cmd.arg(self.log_path());
350        cmd.arg(self.status_path());
351        cmd.arg(user_cmd.get_program());
352        for arg in user_cmd.get_args() {
353            cmd.arg(arg);
354        }
355
356        Ok(cmd)
357    }
358}