1#![feature(trait_alias)]
16
17use std::collections::HashMap;
18use std::env;
19use std::fmt::Write;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::thread::JoinHandle;
23use std::time::{Duration, Instant};
24
25use anyhow::{Context, Result, anyhow};
26use console::style;
27use fs_err::OpenOptions;
28use indicatif::{MultiProgress, ProgressBar};
29use risedev::util::{begin_spin, complete_spin, fail_spin};
30use risedev::{
31 CompactorService, ComputeNodeService, ConfigExpander, ConfigureTmuxTask, DummyService,
32 EnsureStopService, ExecuteContext, FrontendService, GrafanaService, KafkaService,
33 MetaNodeService, MinioService, MySqlService, PostgresService, PrometheusService, PubsubService,
34 RISEDEV_NAME, RedisService, SchemaRegistryService, ServiceConfig, SqlServerService,
35 SqliteConfig, Task, TaskGroup, TempoService, generate_risedev_env, preflight_check,
36};
37use sqlx::mysql::MySqlConnectOptions;
38use sqlx::postgres::PgConnectOptions;
39use tempfile::tempdir;
40use thiserror_ext::AsReport;
41use tracing::level_filters::LevelFilter;
42use tracing_subscriber::EnvFilter;
43use yaml_rust::YamlEmitter;
44
45pub struct ProgressManager {
46 pa: MultiProgress,
47}
48
49impl ProgressManager {
50 pub fn new() -> Self {
51 let pa = MultiProgress::default();
52 pa.set_move_cursor(true);
53 Self { pa }
54 }
55
56 pub fn new_progress(&mut self) -> ProgressBar {
58 let pb = risedev::util::new_spinner().with_finish(indicatif::ProgressFinish::AndLeave);
59 pb.enable_steady_tick(Duration::from_millis(100));
60 self.pa.add(pb)
61 }
62}
63
64fn task_main(
65 manager: &mut ProgressManager,
66 services: &Vec<ServiceConfig>,
67 env: Vec<String>,
68) -> Result<(Vec<(String, Duration)>, String)> {
69 let log_path = env::var("PREFIX_LOG")?;
70
71 let mut logger = OpenOptions::new()
72 .write(true)
73 .create(true)
74 .truncate(true)
75 .open(Path::new(&log_path).join("risedev.log"))?;
76
77 let status_dir = Arc::new(tempdir()?);
78
79 let mut log_buffer = String::new();
80
81 {
83 let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
84 let mut service = ConfigureTmuxTask::new(env)?;
85 service.execute(&mut ctx)?;
86
87 writeln!(
88 log_buffer,
89 "* Run {} to attach to the tmux console.",
90 style(format!("tmux -L {RISEDEV_NAME} a -t {RISEDEV_NAME}"))
91 .blue()
92 .bold()
93 )?;
94 }
95
96 let mut ports = vec![];
98
99 for service in services {
100 if let Some(port) = service.port() {
101 ports.push((port, service.id().to_owned(), service.user_managed()));
102 }
103 }
104
105 {
106 let mut ctx = ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
107 let mut service = EnsureStopService::new(ports)?;
108 service.execute(&mut ctx)?;
109 }
110
111 let mut tasks = TaskScheduler::new();
114
115 for service in services {
116 if let ServiceConfig::Frontend(c) = service {
117 writeln!(
118 log_buffer,
119 "* Run {} to start Postgres interactive shell.",
120 style(format_args!(
121 "psql -h localhost -p {} -d dev -U root",
122 c.port
123 ))
124 .blue()
125 .bold()
126 )?;
127 }
128 let service_ = service.clone();
129 let progress_bar = manager.new_progress();
130 progress_bar.set_prefix(service.id().to_owned());
131 progress_bar.set_message("waiting for previous service to start...".to_owned());
132 let status_dir = status_dir.clone();
133 let closure = move || {
134 let mut log = Vec::new();
135 let start_time = Instant::now();
136 let mut ctx = ExecuteContext::new(&mut log, progress_bar, status_dir);
137 let service = service_;
138 let id = service.id().to_owned();
139 match service {
140 ServiceConfig::Minio(c) => {
141 let mut service = MinioService::new(c.clone())?;
142 service.execute(&mut ctx)?;
143
144 let mut task = risedev::ConfigureMinioTask::new(c.clone())?;
145 task.execute(&mut ctx)?;
146 }
147 ServiceConfig::Sqlite(c) => {
148 struct SqliteService(SqliteConfig);
149 impl Task for SqliteService {
150 fn execute(
151 &mut self,
152 _ctx: &mut ExecuteContext<impl std::io::Write>,
153 ) -> anyhow::Result<()> {
154 Ok(())
155 }
156
157 fn id(&self) -> String {
158 self.0.id.clone()
159 }
160 }
161
162 let prefix_data = env::var("PREFIX_DATA")?;
163 let file_dir = PathBuf::from(&prefix_data).join(&c.id);
164 std::fs::create_dir_all(&file_dir)?;
165 let file_path = file_dir.join(&c.file);
166
167 ctx.service(&SqliteService(c.clone()));
168 ctx.complete_spin();
169 ctx.pb
170 .set_message(format!("using local sqlite: {:?}", file_path));
171 }
172 ServiceConfig::Prometheus(c) => {
173 let mut service = PrometheusService::new(c.clone())?;
174 service.execute(&mut ctx)?;
175 let mut task =
176 risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, false)?;
177 task.execute(&mut ctx)?;
178 ctx.pb
179 .set_message(format!("api http://{}:{}/", c.address, c.port));
180 }
181 ServiceConfig::ComputeNode(c) => {
182 let mut service = ComputeNodeService::new(c.clone())?;
183 service.execute(&mut ctx)?;
184
185 let mut task =
186 risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
187 task.execute(&mut ctx)?;
188 ctx.pb
189 .set_message(format!("api grpc://{}:{}/", c.address, c.port));
190 }
191 ServiceConfig::MetaNode(c) => {
192 let mut service = MetaNodeService::new(c.clone())?;
193 service.execute(&mut ctx)?;
194 let mut task =
195 risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
196 task.execute(&mut ctx)?;
197 ctx.pb.set_message(format!(
198 "api grpc://{}:{}/, dashboard http://{}:{}/",
199 c.address, c.port, c.address, c.dashboard_port
200 ));
201 }
202 ServiceConfig::Frontend(c) => {
203 let mut service = FrontendService::new(c.clone())?;
204 service.execute(&mut ctx)?;
205 let mut task =
206 risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
207 task.execute(&mut ctx)?;
208 ctx.pb
209 .set_message(format!("api postgres://{}:{}/", c.address, c.port));
210 }
211 ServiceConfig::Compactor(c) => {
212 let mut service = CompactorService::new(c.clone())?;
213 service.execute(&mut ctx)?;
214 let mut task =
215 risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
216 task.execute(&mut ctx)?;
217 ctx.pb
218 .set_message(format!("compactor {}:{}", c.address, c.port));
219 }
220 ServiceConfig::Grafana(c) => {
221 let mut service = GrafanaService::new(c.clone())?;
222 service.execute(&mut ctx)?;
223 let mut task =
224 risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, false)?;
225 task.execute(&mut ctx)?;
226 ctx.pb
227 .set_message(format!("dashboard http://{}:{}/", c.address, c.port));
228 }
229 ServiceConfig::Tempo(c) => {
230 let mut service = TempoService::new(c.clone())?;
231 service.execute(&mut ctx)?;
232 let mut task =
233 risedev::TcpReadyCheckTask::new(c.listen_address.clone(), c.port, false)?;
234 task.execute(&mut ctx)?;
235 ctx.pb
236 .set_message(format!("api http://{}:{}/", c.listen_address, c.port));
237 }
238 ServiceConfig::AwsS3(c) => {
239 DummyService::new(&c.id).execute(&mut ctx)?;
240 ctx.pb
241 .set_message(format!("using AWS s3 bucket {}", c.bucket));
242 }
243 ServiceConfig::Opendal(c) => {
244 DummyService::new(&c.id).execute(&mut ctx)?;
245 ctx.pb
246 .set_message(format!("using Opendal, namenode = {}", c.namenode));
247 }
248 ServiceConfig::Kafka(c) => {
249 let mut service = KafkaService::new(c.clone());
250 service.execute(&mut ctx)?;
251 let mut task = risedev::KafkaReadyCheckTask::new(c.clone())?;
252 task.execute(&mut ctx)?;
253 ctx.pb
254 .set_message(format!("kafka {}:{}", c.address, c.port));
255 }
256 ServiceConfig::SchemaRegistry(c) => {
257 let mut service = SchemaRegistryService::new(c.clone());
258 service.execute(&mut ctx)?;
259 if c.user_managed {
260 let mut task = risedev::TcpReadyCheckTask::new(
261 c.address.clone(),
262 c.port,
263 c.user_managed,
264 )?;
265 task.execute(&mut ctx)?;
266 } else {
267 let mut task = risedev::LogReadyCheckTask::new(
268 "Server started, listening for requests",
269 )?;
270 task.execute(&mut ctx)?;
271 }
272 ctx.pb
273 .set_message(format!("schema registry http://{}:{}", c.address, c.port));
274 }
275
276 ServiceConfig::Pubsub(c) => {
277 let mut service = PubsubService::new(c.clone())?;
278 service.execute(&mut ctx)?;
279 let mut task = risedev::PubsubReadyTaskCheck::new(c.clone())?;
280 task.execute(&mut ctx)?;
281 ctx.pb
282 .set_message(format!("pubsub {}:{}", c.address, c.port));
283 }
284 ServiceConfig::RedPanda(_) => {
285 return Err(anyhow!("redpanda is only supported in RiseDev compose."));
286 }
287 ServiceConfig::Redis(c) => {
288 let mut service = RedisService::new(c.clone())?;
289 service.execute(&mut ctx)?;
290 let mut task = risedev::RedisReadyCheckTask::new(c.clone())?;
291 task.execute(&mut ctx)?;
292 ctx.pb
293 .set_message(format!("redis {}:{}", c.address, c.port));
294 }
295 ServiceConfig::MySql(c) => {
296 MySqlService::new(c.clone()).execute(&mut ctx)?;
297 let mut task = risedev::DbReadyCheckTask::new(
298 MySqlConnectOptions::new()
299 .host(&c.address)
300 .port(c.port)
301 .username(&c.user)
302 .password(&c.password),
303 );
304 task.execute(&mut ctx)?;
305 ctx.pb
306 .set_message(format!("mysql {}:{}", c.address, c.port));
307 }
308 ServiceConfig::Postgres(c) => {
309 PostgresService::new(c.clone()).execute(&mut ctx)?;
310 let mut task = risedev::DbReadyCheckTask::new(
311 PgConnectOptions::new()
312 .host(&c.address)
313 .port(c.port)
314 .database("template1")
315 .username(&c.user)
316 .password(&c.password),
317 );
318 task.execute(&mut ctx)?;
319 ctx.pb
320 .set_message(format!("postgres {}:{}", c.address, c.port));
321 }
322 ServiceConfig::SqlServer(c) => {
323 SqlServerService::new(c.clone()).execute(&mut ctx)?;
325 if c.user_managed {
326 let mut task = risedev::TcpReadyCheckTask::new(
327 c.address.clone(),
328 c.port,
329 c.user_managed,
330 )?;
331 task.execute(&mut ctx)?;
332 } else {
333 let mut task = risedev::LogReadyCheckTask::new(
334 "SQL Server is now ready for client connections.",
335 )?;
336 task.execute(&mut ctx)?;
337 }
338 ctx.pb
339 .set_message(format!("sqlserver {}:{}", c.address, c.port));
340 }
341 }
342
343 let duration = Instant::now() - start_time;
344 Ok(TaskResult {
345 id,
346 time: duration,
347 log: String::from_utf8(log)?,
348 })
349 };
350 tasks.add(service, closure);
351 }
352
353 let stat = tasks.run(&mut logger)?;
354
355 Ok((stat, log_buffer))
356}
357
358#[derive(Debug)]
359struct TaskResult {
360 id: String,
361 time: Duration,
362 log: String,
363}
364trait TaskFn = FnOnce() -> anyhow::Result<TaskResult> + Send + 'static;
365struct TaskScheduler {
366 task_groups: HashMap<TaskGroup, Vec<Box<dyn TaskFn>>>,
368}
369
370impl TaskScheduler {
371 fn new() -> Self {
372 Self {
373 task_groups: HashMap::new(),
374 }
375 }
376
377 fn add(&mut self, config: &ServiceConfig, task: impl TaskFn) {
378 self.task_groups
379 .entry(config.task_group())
380 .or_default()
381 .push(Box::new(task));
382 }
383
384 fn run(self, logger: &mut impl std::io::Write) -> anyhow::Result<Vec<(String, Duration)>> {
385 let mut handles: Vec<JoinHandle<anyhow::Result<Vec<TaskResult>>>> = vec![];
386 let mut stats = vec![];
387
388 let task_groups = self.task_groups;
389 for (_, tasks) in task_groups {
390 handles.push(std::thread::spawn(move || {
391 let mut res = vec![];
392 for task in tasks {
393 let res_ = task()?;
394 res.push(res_);
395 }
396 Ok(res)
397 }));
398 }
399 for handle in handles {
400 let join_res = handle.join();
401 let Ok(res) = join_res else {
402 let panic = join_res.unwrap_err();
403 anyhow::bail!(
404 "failed to join thread, likely panicked: {}",
405 panic_message::panic_message(&panic)
406 );
407 };
408 for TaskResult { id, time, log } in res? {
409 stats.push((id, time));
410 write!(logger, "{}", log)?;
411 }
412 }
413 Ok(stats)
414 }
415}
416
417fn main() -> Result<()> {
418 unsafe { std::env::set_var("RUST_BACKTRACE", "0") };
422
423 tracing_subscriber::fmt()
425 .with_env_filter(
426 EnvFilter::builder()
427 .with_default_directive(LevelFilter::INFO.into())
428 .with_env_var("RISEDEV_RUST_LOG")
429 .from_env_lossy()
430 .add_directive("librdkafka=off".parse().unwrap()),
432 )
433 .init();
434
435 preflight_check()?;
436
437 let profile = std::env::args()
438 .nth(1)
439 .unwrap_or_else(|| "default".to_owned());
440
441 let (config_path, env, risedev_config) = ConfigExpander::expand(".", &profile)?;
442
443 if let Some(config_path) = &config_path {
444 let target = Path::new(&env::var("PREFIX_CONFIG")?).join("risingwave.toml");
445 fs_err::copy(config_path, target).context("config file not found")?;
446 }
447
448 {
449 let mut out_str = String::new();
450 let mut emitter = YamlEmitter::new(&mut out_str);
451 emitter.dump(&risedev_config)?;
452 fs_err::write(
453 Path::new(&env::var("PREFIX_CONFIG")?).join("risedev-expanded.yml"),
454 &out_str,
455 )?;
456 }
457 let services = ConfigExpander::deserialize(&risedev_config)?;
458
459 let mut manager = ProgressManager::new();
460 let p = manager.new_progress();
463 begin_spin(&p);
464 p.set_prefix("dev cluster");
465 p.set_message(format!(
466 "starting {} services for {}...",
467 services.len(),
468 profile
469 ));
470 let task_result = task_main(&mut manager, &services, env);
471
472 match task_result {
473 Ok(_) => {
474 p.set_message(format!(
475 "done bootstrapping with config {}",
476 style(profile).bold()
477 ));
478 complete_spin(&p);
479 }
480 Err(_) => {
481 p.set_message(format!(
482 "failed to bootstrap with config {}",
483 style(profile).bold()
484 ));
485 fail_spin(&p);
486 }
487 }
488 p.finish();
489
490 use risedev::util::stylized_risedev_subcmd as r;
491
492 match task_result {
493 Ok((stat, log_buffer)) => {
494 println!("---- summary of startup time ----");
495 for (task_name, duration) in stat {
496 println!("{}: {:.2}s", task_name, duration.as_secs_f64());
497 }
498 println!("-------------------------------");
499 println!();
500
501 fs_err::write(
502 Path::new(&env::var("PREFIX_CONFIG")?).join("risedev-env"),
503 generate_risedev_env(&services),
504 )?;
505
506 println!("All services started successfully.");
507
508 print!("{}", log_buffer);
509
510 println!("* You may find logs using {} command", r("l"));
511
512 println!("* Run {} to kill cluster.", r("k"));
513
514 println!("* Run {} to run `risedev` anywhere!", r("install"));
515
516 Ok(())
517 }
518 Err(err) => {
519 println!(
520 "{} - Failed to start: {:#}", style("ERROR").red().bold(),
522 err.as_report(),
523 );
524 println!();
525 println!(
526 "* Use `{}` to enable new components, if they are missing.",
527 r("configure")
528 );
529 println!(
530 "* Use `{}` to view logs, or visit `{}`",
531 r("l"),
532 env::var("PREFIX_LOG")?
533 );
534 println!("* Run `{}` to clean up cluster.", r("k"));
535 println!(
536 "* Run `{}` to clean data, which might potentially fix the issue.",
537 r("clean-data")
538 );
539 println!("---");
540 println!();
541
542 Err(anyhow!(
545 "Failed to start all services. See details and instructions above."
546 ))
547 }
548 }
549}