risedev/task/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compactor_service;
16mod compute_node_service;
17mod configure_tmux_service;
18mod docker_service;
19mod dummy_service;
20mod ensure_stop_service;
21mod frontend_service;
22mod grafana_service;
23mod kafka_service;
24mod lakekeeper_service;
25mod meta_node_service;
26mod minio_service;
27mod moat_service;
28mod mysql_service;
29mod postgres_service;
30mod prometheus_service;
31mod pubsub_service;
32mod pulsar_service;
33mod redis_service;
34mod schema_registry_service;
35mod sql_server_service;
36mod task_configure_minio;
37mod task_db_ready_check;
38mod task_kafka_ready_check;
39mod task_log_ready_check;
40mod task_pubsub_emu_ready_check;
41mod task_redis_ready_check;
42mod task_tcp_ready_check;
43mod tempo_service;
44mod utils;
45
46use std::collections::HashMap;
47use std::env;
48use std::io::Write;
49use std::net::{TcpStream, ToSocketAddrs};
50use std::path::{Path, PathBuf};
51use std::process::{Command, Output};
52use std::sync::Arc;
53use std::time::Duration;
54
55use anyhow::{Context, Result, anyhow};
56use indicatif::ProgressBar;
57use reqwest::blocking::{Client, Response};
58use tempfile::TempDir;
59pub use utils::*;
60
61pub use self::compactor_service::*;
62pub use self::compute_node_service::*;
63pub use self::configure_tmux_service::*;
64pub use self::dummy_service::DummyService;
65pub use self::ensure_stop_service::*;
66pub use self::frontend_service::*;
67pub use self::grafana_service::*;
68pub use self::kafka_service::*;
69pub use self::lakekeeper_service::*;
70pub use self::meta_node_service::*;
71pub use self::minio_service::*;
72pub use self::moat_service::*;
73pub use self::mysql_service::*;
74pub use self::postgres_service::*;
75pub use self::prometheus_service::*;
76pub use self::pubsub_service::*;
77pub use self::pulsar_service::*;
78pub use self::redis_service::*;
79pub use self::schema_registry_service::SchemaRegistryService;
80pub use self::sql_server_service::*;
81pub use self::task_configure_minio::*;
82pub use self::task_db_ready_check::*;
83pub use self::task_kafka_ready_check::*;
84pub use self::task_log_ready_check::*;
85pub use self::task_pubsub_emu_ready_check::*;
86pub use self::task_redis_ready_check::*;
87pub use self::task_tcp_ready_check::*;
88pub use self::tempo_service::*;
89use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name};
90use crate::wait::{wait, wait_tcp_available};
91
92pub trait Task: 'static + Send {
93    /// Execute the task
94    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()>;
95
96    /// Get task id used in progress bar
97    fn id(&self) -> String {
98        "<task>".into()
99    }
100}
101
102/// A context used in task execution
103pub struct ExecuteContext<W>
104where
105    W: std::io::Write,
106{
107    /// Global log file object. (aka. risedev.log)
108    pub log: W,
109
110    /// Progress bar on screen.
111    pub pb: ProgressBar,
112
113    /// The directory for checking status.
114    ///
115    /// `RiseDev` will instruct every task to output their status to a file in temporary folder. By
116    /// checking this file, we can know whether a task has early exited.
117    pub status_dir: Arc<TempDir>,
118
119    /// The current service id running in this context.
120    pub id: Option<String>,
121
122    /// The status file corresponding to the current context.
123    pub status_file: Option<PathBuf>,
124
125    /// The log file corresponding to the current context. (e.g. frontend-4566.log)
126    pub log_file: Option<PathBuf>,
127}
128
129impl<W> ExecuteContext<W>
130where
131    W: std::io::Write,
132{
133    pub fn new(log: W, pb: ProgressBar, status_dir: Arc<TempDir>) -> Self {
134        Self {
135            log,
136            pb,
137            status_dir,
138            status_file: None,
139            log_file: None,
140            id: None,
141        }
142    }
143
144    pub fn service(&mut self, task: &impl Task) {
145        let id = task.id();
146        if !id.is_empty() {
147            begin_spin(&self.pb);
148            self.pb.set_prefix(id.clone());
149            self.id = Some(id.clone());
150            self.status_file = Some(self.status_dir.path().join(format!("{}.status", id)));
151
152            // Remove the old log file if exists to avoid confusion.
153            let log_file = Path::new(&env::var("PREFIX_LOG").unwrap())
154                .join(format!("{}.log", self.id.as_ref().unwrap()));
155            fs_err::remove_file(&log_file).ok();
156            self.log_file = Some(log_file);
157        }
158    }
159
160    pub fn run_command(&mut self, mut cmd: Command) -> Result<Output> {
161        let program_name = get_program_name(&cmd);
162
163        writeln!(self.log, "> {} {}", program_name, get_program_args(&cmd))?;
164
165        // Record a service command for `risedev restart`
166        let node_id = self.id.as_ref();
167        if program_name == "tmux"
168            && let Some(node_id) = node_id
169            && node_id != "tmux-configure"
170        {
171            let cmd: HashMap<String, String> = HashMap::from_iter([(
172                node_id.clone(),
173                format!("{} {}", program_name, get_program_args(&cmd)),
174            )]);
175            let prefix_config = env::var("PREFIX_CONFIG").unwrap();
176            let path = Path::new(&prefix_config).join("risedev_commands.yaml");
177            let content = serde_yaml::to_string(&cmd)?;
178            fs_err::OpenOptions::new()
179                .create(true)
180                .append(true)
181                .open(path)?
182                .write_all(content.as_bytes())?;
183        }
184
185        let output = cmd.output()?;
186
187        let mut full_output = String::from_utf8_lossy(&output.stdout).to_string();
188        full_output.extend(String::from_utf8_lossy(&output.stderr).chars());
189
190        write!(self.log, "{}", full_output)?;
191
192        writeln!(
193            self.log,
194            "({} exited with {:?})",
195            program_name,
196            output.status.code()
197        )?;
198
199        writeln!(self.log, "---")?;
200
201        output.status.exit_ok().context(full_output)?;
202
203        Ok(output)
204    }
205
206    pub fn complete_spin(&mut self) {
207        complete_spin(&self.pb);
208    }
209
210    pub fn status_path(&self) -> PathBuf {
211        self.status_file.clone().unwrap()
212    }
213
214    pub fn log_path(&self) -> &Path {
215        self.log_file.as_ref().unwrap().as_path()
216    }
217
218    pub fn wait_tcp(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
219        let addr = server
220            .as_ref()
221            .to_socket_addrs()?
222            .next()
223            .with_context(|| format!("failed to resolve {}", server.as_ref()))?;
224        wait(
225            || {
226                TcpStream::connect_timeout(&addr, Duration::from_secs(1)).with_context(|| {
227                    format!("failed to establish tcp connection to {}", server.as_ref())
228                })?;
229                Ok(())
230            },
231            &mut self.log,
232            self.status_file.as_ref().unwrap(),
233            self.id.as_ref().unwrap(),
234            Some(Duration::from_secs(30)),
235            true,
236        )?;
237        Ok(())
238    }
239
240    fn wait_http_with_response_cb(
241        &mut self,
242        server: impl AsRef<str>,
243        cb: impl Fn(Response) -> anyhow::Result<()>,
244    ) -> anyhow::Result<()> {
245        let server = server.as_ref();
246        wait(
247            || {
248                let resp = Client::new()
249                    .get(server)
250                    .timeout(Duration::from_secs(1))
251                    .body("")
252                    .send()?
253                    .error_for_status()
254                    .with_context(|| {
255                        format!("failed to establish http connection to {}", server)
256                    })?;
257
258                cb(resp)
259            },
260            &mut self.log,
261            self.status_file.as_ref().unwrap(),
262            self.id.as_ref().unwrap(),
263            Some(Duration::from_secs(30)),
264            true,
265        )
266    }
267
268    pub fn wait_http(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
269        self.wait_http_with_response_cb(server, |_| Ok(()))
270    }
271
272    pub fn wait_http_with_text_cb(
273        &mut self,
274        server: impl AsRef<str>,
275        cb: impl Fn(&str) -> bool,
276    ) -> anyhow::Result<()> {
277        self.wait_http_with_response_cb(server, |resp| {
278            let data = resp.text()?;
279            if cb(&data) {
280                Ok(())
281            } else {
282                Err(anyhow!(
283                    "http health check callback failed with body: {:?}",
284                    data
285                ))
286            }
287        })
288    }
289
290    pub fn wait(&mut self, wait_func: impl FnMut() -> Result<()>) -> anyhow::Result<()> {
291        wait(
292            wait_func,
293            &mut self.log,
294            self.status_file.as_ref().unwrap(),
295            self.id.as_ref().unwrap(),
296            Some(Duration::from_secs(30)),
297            true,
298        )
299    }
300
301    /// Wait for a TCP port to close
302    pub fn wait_tcp_close(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
303        wait_tcp_available(server, Some(Duration::from_secs(30)))?;
304        Ok(())
305    }
306
307    /// Wait for a user-managed service to be available
308    pub fn wait_tcp_user(&mut self, server: impl AsRef<str>) -> anyhow::Result<()> {
309        let addr = server
310            .as_ref()
311            .to_socket_addrs()?
312            .next()
313            .unwrap_or_else(|| panic!("failed to resolve {}", server.as_ref()));
314        wait(
315            || {
316                TcpStream::connect_timeout(&addr, Duration::from_secs(1))?;
317                Ok(())
318            },
319            &mut self.log,
320            self.status_file.as_ref().unwrap(),
321            self.id.as_ref().unwrap(),
322            None,
323            false,
324        )?;
325        Ok(())
326    }
327
328    pub fn tmux_run(&self, user_cmd: Command) -> anyhow::Result<Command> {
329        let prefix_path = env::var("PREFIX_BIN")?;
330        let mut cmd = new_tmux_command();
331        cmd.arg("new-window")
332            // Set target name
333            .arg("-t")
334            .arg(RISEDEV_NAME)
335            // Switch to background window
336            .arg("-d")
337            // Set session name for this window
338            .arg("-n")
339            .arg(self.id.as_ref().unwrap());
340
341        if let Some(dir) = user_cmd.get_current_dir() {
342            cmd.arg("-c").arg(dir);
343        }
344        for (k, v) in user_cmd.get_envs() {
345            cmd.arg("-e");
346            if let Some(v) = v {
347                cmd.arg(format!("{}={}", k.to_string_lossy(), v.to_string_lossy()));
348            } else {
349                cmd.arg(k);
350            }
351        }
352        cmd.arg(Path::new(&prefix_path).join("run_command.sh"));
353        cmd.arg(self.log_path());
354        cmd.arg(self.status_path());
355        cmd.arg(user_cmd.get_program());
356        for arg in user_cmd.get_args() {
357            cmd.arg(arg);
358        }
359
360        Ok(cmd)
361    }
362}