risedev_dev/
risedev-dev.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(trait_alias)]
16
17use std::collections::HashMap;
18use std::env;
19use std::fmt::Write;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::thread::JoinHandle;
23use std::time::{Duration, Instant};
24
25use anyhow::{Context, Result, anyhow};
26use console::style;
27use fs_err::OpenOptions;
28use indicatif::{MultiProgress, ProgressBar};
29use risedev::util::{begin_spin, complete_spin, fail_spin};
30use risedev::{
31    CompactorService, ComputeNodeService, ConfigExpander, ConfigureTmuxTask, DummyService,
32    EnsureStopService, ExecuteContext, FrontendService, GrafanaService, KafkaService,
33    MetaNodeService, MinioService, MySqlService, PostgresService, PrometheusService, PubsubService,
34    RISEDEV_NAME, RedisService, SchemaRegistryService, ServiceConfig, SqlServerService,
35    SqliteConfig, Task, TaskGroup, TempoService, generate_risedev_env, preflight_check,
36};
37use sqlx::mysql::MySqlConnectOptions;
38use sqlx::postgres::PgConnectOptions;
39use tempfile::tempdir;
40use thiserror_ext::AsReport;
41use tracing::level_filters::LevelFilter;
42use tracing_subscriber::EnvFilter;
43use yaml_rust::YamlEmitter;
44
45pub struct ProgressManager {
46    pa: MultiProgress,
47}
48
49impl ProgressManager {
50    pub fn new() -> Self {
51        let pa = MultiProgress::default();
52        pa.set_move_cursor(true);
53        Self { pa }
54    }
55
56    /// Create a new progress bar from task
57    pub fn new_progress(&mut self) -> ProgressBar {
58        let pb = risedev::util::new_spinner().with_finish(indicatif::ProgressFinish::AndLeave);
59        pb.enable_steady_tick(Duration::from_millis(100));
60        self.pa.add(pb)
61    }
62}
63
64fn task_main(
65    manager: &mut ProgressManager,
66    services: &Vec<ServiceConfig>,
67    env: Vec<String>,
68) -> Result<(Vec<(String, Duration)>, String)> {
69    let log_path = env::var("PREFIX_LOG")?;
70
71    let mut logger = OpenOptions::new()
72        .write(true)
73        .create(true)
74        .truncate(true)
75        .open(Path::new(&log_path).join("risedev.log"))?;
76
77    let status_dir = Arc::new(tempdir()?);
78
79    let mut log_buffer = String::new();
80
81    // Start Tmux and kill previous services
82    {
83        let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
84        let mut service = ConfigureTmuxTask::new(env)?;
85        service.execute(&mut ctx)?;
86
87        writeln!(
88            log_buffer,
89            "* Run {} to attach to the tmux console.",
90            style(format!("tmux -L {RISEDEV_NAME} a -t {RISEDEV_NAME}"))
91                .blue()
92                .bold()
93        )?;
94    }
95
96    // Firstly, ensure that all ports needed is not occupied by previous runs.
97    let mut ports = vec![];
98
99    for service in services {
100        if let Some(port) = service.port() {
101            ports.push((port, service.id().to_owned(), service.user_managed()));
102        }
103    }
104
105    {
106        let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
107        let mut service = EnsureStopService::new(ports)?;
108        service.execute(&mut ctx)?;
109    }
110
111    // Then, start services one by one
112
113    let mut tasks = TaskScheduler::new();
114
115    for service in services {
116        if let ServiceConfig::Frontend(c) = service {
117            writeln!(
118                log_buffer,
119                "* Run {} to start Postgres interactive shell.",
120                style(format_args!(
121                    "psql -h localhost -p {} -d dev -U root",
122                    c.port
123                ))
124                .blue()
125                .bold()
126            )?;
127        }
128        let service_ = service.clone();
129        let progress_bar = manager.new_progress();
130        progress_bar.set_prefix(service.id().to_owned());
131        progress_bar.set_message("waiting for previous service to start...".to_owned());
132        let status_dir = status_dir.clone();
133        let closure = move || {
134            let mut log = Vec::new();
135            let start_time = Instant::now();
136            let mut ctx = ExecuteContext::new(&mut log, progress_bar, status_dir);
137            let service = service_;
138            let id = service.id().to_owned();
139            match service {
140                ServiceConfig::Minio(c) => {
141                    let mut service = MinioService::new(c.clone())?;
142                    service.execute(&mut ctx)?;
143
144                    let mut task = risedev::ConfigureMinioTask::new(c.clone())?;
145                    task.execute(&mut ctx)?;
146                }
147                ServiceConfig::Sqlite(c) => {
148                    struct SqliteService(SqliteConfig);
149                    impl Task for SqliteService {
150                        fn execute(
151                            &mut self,
152                            _ctx: &mut ExecuteContext<impl std::io::Write>,
153                        ) -> anyhow::Result<()> {
154                            Ok(())
155                        }
156
157                        fn id(&self) -> String {
158                            self.0.id.clone()
159                        }
160                    }
161
162                    let prefix_data = env::var("PREFIX_DATA")?;
163                    let file_dir = PathBuf::from(&prefix_data).join(&c.id);
164                    std::fs::create_dir_all(&file_dir)?;
165                    let file_path = file_dir.join(&c.file);
166
167                    ctx.service(&SqliteService(c.clone()));
168                    ctx.complete_spin();
169                    ctx.pb
170                        .set_message(format!("using local sqlite: {:?}", file_path));
171                }
172                ServiceConfig::Prometheus(c) => {
173                    let mut service = PrometheusService::new(c.clone())?;
174                    service.execute(&mut ctx)?;
175                    let mut task =
176                        risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, false)?;
177                    task.execute(&mut ctx)?;
178                    ctx.pb
179                        .set_message(format!("api http://{}:{}/", c.address, c.port));
180                }
181                ServiceConfig::ComputeNode(c) => {
182                    let mut service = ComputeNodeService::new(c.clone())?;
183                    service.execute(&mut ctx)?;
184
185                    let mut task =
186                        risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
187                    task.execute(&mut ctx)?;
188                    ctx.pb
189                        .set_message(format!("api grpc://{}:{}/", c.address, c.port));
190                }
191                ServiceConfig::MetaNode(c) => {
192                    let mut service = MetaNodeService::new(c.clone())?;
193                    service.execute(&mut ctx)?;
194                    let mut task =
195                        risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
196                    task.execute(&mut ctx)?;
197                    ctx.pb.set_message(format!(
198                        "api grpc://{}:{}/, dashboard http://{}:{}/",
199                        c.address, c.port, c.address, c.dashboard_port
200                    ));
201                }
202                ServiceConfig::Frontend(c) => {
203                    let mut service = FrontendService::new(c.clone())?;
204                    service.execute(&mut ctx)?;
205                    let mut task =
206                        risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
207                    task.execute(&mut ctx)?;
208                    ctx.pb
209                        .set_message(format!("api postgres://{}:{}/", c.address, c.port));
210                }
211                ServiceConfig::Compactor(c) => {
212                    let mut service = CompactorService::new(c.clone())?;
213                    service.execute(&mut ctx)?;
214                    let mut task =
215                        risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
216                    task.execute(&mut ctx)?;
217                    ctx.pb
218                        .set_message(format!("compactor {}:{}", c.address, c.port));
219                }
220                ServiceConfig::Grafana(c) => {
221                    let mut service = GrafanaService::new(c.clone())?;
222                    service.execute(&mut ctx)?;
223                    let mut task =
224                        risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, false)?;
225                    task.execute(&mut ctx)?;
226                    ctx.pb
227                        .set_message(format!("dashboard http://{}:{}/", c.address, c.port));
228                }
229                ServiceConfig::Tempo(c) => {
230                    let mut service = TempoService::new(c.clone())?;
231                    service.execute(&mut ctx)?;
232                    let mut task =
233                        risedev::TcpReadyCheckTask::new(c.listen_address.clone(), c.port, false)?;
234                    task.execute(&mut ctx)?;
235                    ctx.pb
236                        .set_message(format!("api http://{}:{}/", c.listen_address, c.port));
237                }
238                ServiceConfig::AwsS3(c) => {
239                    DummyService::new(&c.id).execute(&mut ctx)?;
240                    ctx.pb
241                        .set_message(format!("using AWS s3 bucket {}", c.bucket));
242                }
243                ServiceConfig::Opendal(c) => {
244                    DummyService::new(&c.id).execute(&mut ctx)?;
245                    ctx.pb
246                        .set_message(format!("using Opendal, namenode =  {}", c.namenode));
247                }
248                ServiceConfig::Kafka(c) => {
249                    let mut service = KafkaService::new(c.clone());
250                    service.execute(&mut ctx)?;
251                    let mut task = risedev::KafkaReadyCheckTask::new(c.clone())?;
252                    task.execute(&mut ctx)?;
253                    ctx.pb
254                        .set_message(format!("kafka {}:{}", c.address, c.port));
255                }
256                ServiceConfig::SchemaRegistry(c) => {
257                    let mut service = SchemaRegistryService::new(c.clone());
258                    service.execute(&mut ctx)?;
259                    if c.user_managed {
260                        let mut task = risedev::TcpReadyCheckTask::new(
261                            c.address.clone(),
262                            c.port,
263                            c.user_managed,
264                        )?;
265                        task.execute(&mut ctx)?;
266                    } else {
267                        let mut task = risedev::LogReadyCheckTask::new(
268                            "Server started, listening for requests",
269                        )?;
270                        task.execute(&mut ctx)?;
271                    }
272                    ctx.pb
273                        .set_message(format!("schema registry http://{}:{}", c.address, c.port));
274                }
275
276                ServiceConfig::Pubsub(c) => {
277                    let mut service = PubsubService::new(c.clone())?;
278                    service.execute(&mut ctx)?;
279                    let mut task = risedev::PubsubReadyTaskCheck::new(c.clone())?;
280                    task.execute(&mut ctx)?;
281                    ctx.pb
282                        .set_message(format!("pubsub {}:{}", c.address, c.port));
283                }
284                ServiceConfig::RedPanda(_) => {
285                    return Err(anyhow!("redpanda is only supported in RiseDev compose."));
286                }
287                ServiceConfig::Redis(c) => {
288                    let mut service = RedisService::new(c.clone())?;
289                    service.execute(&mut ctx)?;
290                    let mut task = risedev::RedisReadyCheckTask::new(c.clone())?;
291                    task.execute(&mut ctx)?;
292                    ctx.pb
293                        .set_message(format!("redis {}:{}", c.address, c.port));
294                }
295                ServiceConfig::MySql(c) => {
296                    MySqlService::new(c.clone()).execute(&mut ctx)?;
297                    let mut task = risedev::DbReadyCheckTask::new(
298                        MySqlConnectOptions::new()
299                            .host(&c.address)
300                            .port(c.port)
301                            .username(&c.user)
302                            .password(&c.password),
303                    );
304                    task.execute(&mut ctx)?;
305                    ctx.pb
306                        .set_message(format!("mysql {}:{}", c.address, c.port));
307                }
308                ServiceConfig::Postgres(c) => {
309                    PostgresService::new(c.clone()).execute(&mut ctx)?;
310                    let mut task = risedev::DbReadyCheckTask::new(
311                        PgConnectOptions::new()
312                            .host(&c.address)
313                            .port(c.port)
314                            .database("template1")
315                            .username(&c.user)
316                            .password(&c.password),
317                    );
318                    task.execute(&mut ctx)?;
319                    ctx.pb
320                        .set_message(format!("postgres {}:{}", c.address, c.port));
321                }
322                ServiceConfig::SqlServer(c) => {
323                    // only `c.password` will be used in `SqlServerService` as the password for user `sa`.
324                    SqlServerService::new(c.clone()).execute(&mut ctx)?;
325                    if c.user_managed {
326                        let mut task = risedev::TcpReadyCheckTask::new(
327                            c.address.clone(),
328                            c.port,
329                            c.user_managed,
330                        )?;
331                        task.execute(&mut ctx)?;
332                    } else {
333                        let mut task = risedev::LogReadyCheckTask::new(
334                            "SQL Server is now ready for client connections.",
335                        )?;
336                        task.execute(&mut ctx)?;
337                    }
338                    ctx.pb
339                        .set_message(format!("sqlserver {}:{}", c.address, c.port));
340                }
341            }
342
343            let duration = Instant::now() - start_time;
344            Ok(TaskResult {
345                id,
346                time: duration,
347                log: String::from_utf8(log)?,
348            })
349        };
350        tasks.add(service, closure);
351    }
352
353    let stat = tasks.run(&mut logger)?;
354
355    Ok((stat, log_buffer))
356}
357
358#[derive(Debug)]
359struct TaskResult {
360    id: String,
361    time: Duration,
362    log: String,
363}
364trait TaskFn = FnOnce() -> anyhow::Result<TaskResult> + Send + 'static;
365struct TaskScheduler {
366    /// In each group, the tasks are executed in sequence.
367    task_groups: HashMap<TaskGroup, Vec<Box<dyn TaskFn>>>,
368}
369
370impl TaskScheduler {
371    fn new() -> Self {
372        Self {
373            task_groups: HashMap::new(),
374        }
375    }
376
377    fn add(&mut self, config: &ServiceConfig, task: impl TaskFn) {
378        self.task_groups
379            .entry(config.task_group())
380            .or_default()
381            .push(Box::new(task));
382    }
383
384    fn run(self, logger: &mut impl std::io::Write) -> anyhow::Result<Vec<(String, Duration)>> {
385        let mut handles: Vec<JoinHandle<anyhow::Result<Vec<TaskResult>>>> = vec![];
386        let mut stats = vec![];
387
388        let task_groups = self.task_groups;
389        for (_, tasks) in task_groups {
390            handles.push(std::thread::spawn(move || {
391                let mut res = vec![];
392                for task in tasks {
393                    let res_ = task()?;
394                    res.push(res_);
395                }
396                Ok(res)
397            }));
398        }
399        for handle in handles {
400            let join_res = handle.join();
401            let Ok(res) = join_res else {
402                let panic = join_res.unwrap_err();
403                anyhow::bail!(
404                    "failed to join thread, likely panicked: {}",
405                    panic_message::panic_message(&panic)
406                );
407            };
408            for TaskResult { id, time, log } in res? {
409                stats.push((id, time));
410                write!(logger, "{}", log)?;
411            }
412        }
413        Ok(stats)
414    }
415}
416
417fn main() -> Result<()> {
418    // Intentionally disable backtrace to provide more compact error message for `risedev dev`.
419    // Backtraces for RisingWave components are enabled in `Task::execute`.
420    // safety: single-threaded code.
421    unsafe { std::env::set_var("RUST_BACKTRACE", "0") };
422
423    // Init logger from a customized env var.
424    tracing_subscriber::fmt()
425        .with_env_filter(
426            EnvFilter::builder()
427                .with_default_directive(LevelFilter::INFO.into())
428                .with_env_var("RISEDEV_RUST_LOG")
429                .from_env_lossy()
430                // This log may pollute the progress bar.
431                .add_directive("librdkafka=off".parse().unwrap()),
432        )
433        .init();
434
435    preflight_check()?;
436
437    let profile = std::env::args()
438        .nth(1)
439        .unwrap_or_else(|| "default".to_owned());
440
441    let (config_path, env, risedev_config) = ConfigExpander::expand(".", &profile)?;
442
443    if let Some(config_path) = &config_path {
444        let target = Path::new(&env::var("PREFIX_CONFIG")?).join("risingwave.toml");
445        fs_err::copy(config_path, target).context("config file not found")?;
446    }
447
448    {
449        let mut out_str = String::new();
450        let mut emitter = YamlEmitter::new(&mut out_str);
451        emitter.dump(&risedev_config)?;
452        fs_err::write(
453            Path::new(&env::var("PREFIX_CONFIG")?).join("risedev-expanded.yml"),
454            &out_str,
455        )?;
456    }
457    let services = ConfigExpander::deserialize(&risedev_config)?;
458
459    let mut manager = ProgressManager::new();
460    // Always create a progress before calling `task_main`. Otherwise the progress bar won't be
461    // shown.
462    let p = manager.new_progress();
463    begin_spin(&p);
464    p.set_prefix("dev cluster");
465    p.set_message(format!(
466        "starting {} services for {}...",
467        services.len(),
468        profile
469    ));
470    let task_result = task_main(&mut manager, &services, env);
471
472    match task_result {
473        Ok(_) => {
474            p.set_message(format!(
475                "done bootstrapping with config {}",
476                style(profile).bold()
477            ));
478            complete_spin(&p);
479        }
480        Err(_) => {
481            p.set_message(format!(
482                "failed to bootstrap with config {}",
483                style(profile).bold()
484            ));
485            fail_spin(&p);
486        }
487    }
488    p.finish();
489
490    use risedev::util::stylized_risedev_subcmd as r;
491
492    match task_result {
493        Ok((stat, log_buffer)) => {
494            println!("---- summary of startup time ----");
495            for (task_name, duration) in stat {
496                println!("{}: {:.2}s", task_name, duration.as_secs_f64());
497            }
498            println!("-------------------------------");
499            println!();
500
501            fs_err::write(
502                Path::new(&env::var("PREFIX_CONFIG")?).join("risedev-env"),
503                generate_risedev_env(&services),
504            )?;
505
506            println!("All services started successfully.");
507
508            print!("{}", log_buffer);
509
510            println!("* You may find logs using {} command", r("l"));
511
512            println!("* Run {} to kill cluster.", r("k"));
513
514            println!("* Run {} to run `risedev` anywhere!", r("install"));
515
516            Ok(())
517        }
518        Err(err) => {
519            println!(
520                "{} - Failed to start: {:#}", // pretty with `Caused by`
521                style("ERROR").red().bold(),
522                err.as_report(),
523            );
524            println!();
525            println!(
526                "* Use `{}` to enable new components, if they are missing.",
527                r("configure")
528            );
529            println!(
530                "* Use `{}` to view logs, or visit `{}`",
531                r("l"),
532                env::var("PREFIX_LOG")?
533            );
534            println!("* Run `{}` to clean up cluster.", r("k"));
535            println!(
536                "* Run `{}` to clean data, which might potentially fix the issue.",
537                r("clean-data")
538            );
539            println!("---");
540            println!();
541
542            // As we have already printed the error above, we don't need to print that error again.
543            // However, to return with a proper exit code, still return an error here.
544            Err(anyhow!(
545                "Failed to start all services. See details and instructions above."
546            ))
547        }
548    }
549}