1use std::env;
16use std::fmt::Write;
17use std::future::Future;
18use std::path::Path;
19
20use clap::Parser;
21use risingwave_common::config::{MetaBackend, load_config};
22use risingwave_common::license::LicenseManager;
23use risingwave_common::util::env_var::env_var_is_true;
24use risingwave_common::util::meta_addr::MetaAddressStrategy;
25use risingwave_common::util::runtime::BackgroundShutdownRuntime;
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_compactor::CompactorOpts;
28use risingwave_compute::ComputeNodeOpts;
29use risingwave_frontend::FrontendOpts;
30use risingwave_meta_node::MetaNodeOpts;
31use shell_words::split;
32
33use crate::common::osstrs;
34
35#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
36#[command(
37 version,
38 about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service",
39 hide = true
40)]
41pub struct StandaloneOpts {
42 #[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")]
45 compute_opts: Option<String>,
46
47 #[clap(short, long, env = "RW_STANDALONE_META_OPTS")]
48 meta_opts: Option<String>,
51
52 #[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")]
53 frontend_opts: Option<String>,
56
57 #[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")]
58 compactor_opts: Option<String>,
61
62 #[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")]
63 prometheus_listener_addr: Option<String>,
67
68 #[clap(long, env = "RW_STANDALONE_CONFIG_PATH")]
69 config_path: Option<String>,
73}
74
75#[derive(Debug)]
76pub struct ParsedStandaloneOpts {
77 pub meta_opts: Option<MetaNodeOpts>,
78 pub compute_opts: Option<ComputeNodeOpts>,
79 pub frontend_opts: Option<FrontendOpts>,
80 pub compactor_opts: Option<CompactorOpts>,
81}
82
83impl risingwave_common::opts::Opts for ParsedStandaloneOpts {
84 fn name() -> &'static str {
85 "standalone"
86 }
87
88 fn meta_addr(&self) -> MetaAddressStrategy {
89 if let Some(opts) = self.meta_opts.as_ref() {
90 opts.meta_addr()
91 } else if let Some(opts) = self.compute_opts.as_ref() {
92 opts.meta_addr()
93 } else if let Some(opts) = self.frontend_opts.as_ref() {
94 opts.meta_addr()
95 } else if let Some(opts) = self.compactor_opts.as_ref() {
96 opts.meta_addr()
97 } else {
98 unreachable!("at least one service should be specified as checked during parsing")
99 }
100 }
101}
102
103pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
104 let meta_opts = opts.meta_opts.as_ref().map(|s| {
105 let mut s = split(s).unwrap();
106 s.insert(0, "meta-node".into());
107 s
108 });
109 let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o)));
110
111 let compute_opts = opts.compute_opts.as_ref().map(|s| {
112 let mut s = split(s).unwrap();
113 s.insert(0, "compute-node".into());
114 s
115 });
116 let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o)));
117
118 let frontend_opts = opts.frontend_opts.as_ref().map(|s| {
119 let mut s = split(s).unwrap();
120 s.insert(0, "frontend-node".into());
121 s
122 });
123 let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o)));
124
125 let compactor_opts = opts.compactor_opts.as_ref().map(|s| {
126 let mut s = split(s).unwrap();
127 s.insert(0, "compactor-node".into());
128 s
129 });
130 let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o)));
131
132 if let Some(config_path) = opts.config_path.as_ref() {
133 if let Some(meta_opts) = meta_opts.as_mut() {
134 meta_opts.config_path.clone_from(config_path);
135 }
136 if let Some(compute_opts) = compute_opts.as_mut() {
137 compute_opts.config_path.clone_from(config_path);
138 }
139 if let Some(frontend_opts) = frontend_opts.as_mut() {
140 frontend_opts.config_path.clone_from(config_path);
141 }
142 if let Some(compactor_opts) = compactor_opts.as_mut() {
143 compactor_opts.config_path.clone_from(config_path);
144 }
145 }
146 if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() {
147 if let Some(compute_opts) = compute_opts.as_mut() {
148 compute_opts
149 .prometheus_listener_addr
150 .clone_from(prometheus_listener_addr);
151 }
152 if let Some(frontend_opts) = frontend_opts.as_mut() {
153 frontend_opts
154 .prometheus_listener_addr
155 .clone_from(prometheus_listener_addr);
156 }
157 if let Some(compactor_opts) = compactor_opts.as_mut() {
158 compactor_opts
159 .prometheus_listener_addr
160 .clone_from(prometheus_listener_addr);
161 }
162 if let Some(meta_opts) = meta_opts.as_mut() {
163 meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
164 }
165 }
166
167 if meta_opts.is_none()
168 && compute_opts.is_none()
169 && frontend_opts.is_none()
170 && compactor_opts.is_none()
171 {
172 panic!("No service is specified to start.");
173 }
174
175 ParsedStandaloneOpts {
176 meta_opts,
177 compute_opts,
178 frontend_opts,
179 compactor_opts,
180 }
181}
182
183struct Service {
185 name: &'static str,
186 runtime: BackgroundShutdownRuntime,
187 main_task: tokio::task::JoinHandle<()>,
188 shutdown: CancellationToken,
189}
190
191impl Service {
192 fn spawn<F, Fut>(name: &'static str, f: F) -> Self
199 where
200 F: FnOnce(CancellationToken) -> Fut,
201 Fut: Future<Output = ()> + Send + 'static,
202 {
203 let runtime = tokio::runtime::Builder::new_multi_thread()
204 .thread_name(format!("rw-standalone-{name}"))
205 .enable_all()
206 .build()
207 .unwrap();
208 let shutdown = CancellationToken::new();
209 let main_task = runtime.spawn(f(shutdown.clone()));
210
211 Self {
212 name,
213 runtime: runtime.into(),
214 main_task,
215 shutdown,
216 }
217 }
218
219 async fn shutdown(self) {
225 tracing::info!("stopping {} service...", self.name);
226
227 self.shutdown.cancel();
228 let _ = self.main_task.await;
229 drop(self.runtime); tracing::info!("{} service stopped", self.name);
232 }
233}
234
235pub async fn standalone(
243 ParsedStandaloneOpts {
244 meta_opts,
245 compute_opts,
246 frontend_opts,
247 compactor_opts,
248 }: ParsedStandaloneOpts,
249 shutdown: CancellationToken,
250) {
251 tracing::info!("launching Risingwave in standalone mode");
252
253 if let Some(opts) = meta_opts.as_ref() {
256 let config = load_config(&opts.config_path, opts);
257 let is_local_meta_backend =
258 matches!(config.meta.backend, MetaBackend::Mem | MetaBackend::Sqlite)
259 || (matches!(config.meta.backend, MetaBackend::Sql)
260 && opts
261 .sql_endpoint
262 .as_ref()
263 .map(|endpoint| endpoint.expose_secret().starts_with("sqlite:"))
264 .unwrap_or(false));
265 if is_local_meta_backend {
266 LicenseManager::get().set_ignore_resource_limit(true);
267 tracing::info!(
268 "standalone meta backend {:?}; ignore license resource limits",
269 config.meta.backend
270 );
271 }
272 }
273
274 let (meta, is_in_memory) = if let Some(opts) = meta_opts.clone() {
275 let is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
276 tracing::info!("starting meta-node thread with cli args: {:?}", opts);
277 let service = Service::spawn("meta", |shutdown| {
278 risingwave_meta_node::start(opts, shutdown)
279 });
280
281 let mut tries = 0;
283 while !risingwave_meta_node::is_server_started() {
284 if tries % 50 == 0 {
285 tracing::info!("waiting for meta service to be ready...");
286 }
287 if service.main_task.is_finished() {
288 tracing::error!("meta service failed to start, exiting...");
289 return;
290 }
291 tries += 1;
292 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
293 }
294
295 (Some(service), is_in_memory)
296 } else {
297 (None, false)
298 };
299
300 let compute = if let Some(opts) = compute_opts {
301 tracing::info!("starting compute-node thread with cli args: {:?}", opts);
302 let service = Service::spawn("compute", |shutdown| {
303 risingwave_compute::start(opts, shutdown)
304 });
305 Some(service)
306 } else {
307 None
308 };
309
310 let frontend = if let Some(opts) = frontend_opts.clone() {
311 tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
312 let service = Service::spawn("frontend", |shutdown| {
313 risingwave_frontend::start(opts, shutdown)
314 });
315 Some(service)
316 } else {
317 None
318 };
319
320 let compactor = if let Some(opts) = compactor_opts {
321 tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
322 let service = Service::spawn("compactor", |shutdown| {
323 risingwave_compactor::start(opts, shutdown)
324 });
325 Some(service)
326 } else {
327 None
328 };
329
330 tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
332
333 eprintln!("----------------------------------------");
334 eprintln!("| RisingWave standalone mode is ready. |");
335 eprintln!("----------------------------------------");
336 if is_in_memory {
337 eprintln!(
338 "{}",
339 console::style(
340 "WARNING: You are using RisingWave's in-memory mode.
341It SHOULD NEVER be used in benchmarks and production environment!!!"
342 )
343 .red()
344 .bold()
345 );
346 }
347
348 let mut risedev_env = String::new();
350
351 if let Some(opts) = &frontend_opts {
352 let host = opts.listen_addr.split(':').next().unwrap_or("localhost");
353 let port = opts.listen_addr.split(':').next_back().unwrap_or("4566");
354 let database = "dev";
355 let user = "root";
356
357 writeln!(
358 risedev_env,
359 r#"RISEDEV_RW_FRONTEND_LISTEN_ADDRESS="{host}""#
360 )
361 .unwrap();
362 writeln!(risedev_env, r#"RISEDEV_RW_FRONTEND_PORT="{port}""#).unwrap();
363
364 eprintln!();
365 eprintln!("Connect to the RisingWave instance via psql:");
366 eprintln!(
367 "{}",
368 console::style(format!(
369 " psql -h {host} -p {port} -d {database} -U {user}"
370 ))
371 .blue()
372 );
373 }
374
375 if let Some(opts) = &meta_opts {
376 let meta_addr = &opts.listen_addr;
377 writeln!(risedev_env, r#"RW_META_ADDR="http://{meta_addr}""#).unwrap();
378 }
379
380 if env_var_is_true("RISEDEV") {
382 let env_path = Path::new(
383 &env::var("PREFIX_CONFIG").expect("env var `PREFIX_CONFIG` must be set by RiseDev"),
384 )
385 .join("risedev-env");
386 std::fs::write(env_path, risedev_env).unwrap();
387 }
388
389 let meta_stopped = meta
390 .as_ref()
391 .map(|m| m.shutdown.clone())
392 .unwrap_or_else(CancellationToken::new)
394 .cancelled_owned();
395
396 tokio::select! {
398 _ = meta_stopped => {
401 tracing::info!("meta service is stopped, terminating...");
402 }
403
404 _ = shutdown.cancelled() => {
406 for service in [compactor, frontend, compute, meta].into_iter().flatten() {
407 service.shutdown().await;
408 }
409 tracing::info!("all services stopped, bye");
410 }
411 }
412}
413
414#[cfg(test)]
415mod test {
416 use super::*;
417
418 #[allow(clippy::assertions_on_constants)]
419 #[test]
420 fn test_parse_opt_args() {
421 let raw_opts = "
423--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/
424--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/
425--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368
426--prometheus-listener-addr=127.0.0.1:1234
427--config-path=src/config/test.toml
428";
429 let actual = StandaloneOpts::parse_from(raw_opts.lines());
430 let opts = StandaloneOpts {
431 compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()),
432 meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()),
433 frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ),
434 compactor_opts: None,
435 prometheus_listener_addr: Some("127.0.0.1:1234".into()),
436 config_path: Some("src/config/test.toml".into()),
437 };
438 assert_eq!(actual, opts);
439
440 let actual = parse_standalone_opt_args(&opts);
442
443 if let Some(compute_opts) = &actual.compute_opts {
444 assert_eq!(compute_opts.listen_addr, "127.0.0.1:8000");
445 assert_eq!(compute_opts.total_memory_bytes, 34359738368);
446 assert_eq!(compute_opts.parallelism, 10);
447 assert_eq!(compute_opts.temp_secret_file_dir, "./compute/secrets/");
448 assert_eq!(compute_opts.prometheus_listener_addr, "127.0.0.1:1234");
449 assert_eq!(compute_opts.config_path, "src/config/test.toml");
450 } else {
451 assert!(false);
452 }
453 if let Some(meta_opts) = &actual.meta_opts {
454 assert_eq!(meta_opts.listen_addr, "127.0.0.1:8001");
455 assert_eq!(meta_opts.advertise_addr, "127.0.0.1:9999");
456 assert_eq!(
457 meta_opts.data_directory,
458 Some("some path with spaces".to_owned())
459 );
460 assert_eq!(meta_opts.temp_secret_file_dir, "./meta/secrets/");
461 assert_eq!(
462 meta_opts.prometheus_listener_addr,
463 Some("127.0.0.1:1234".to_owned())
464 );
465 assert_eq!(meta_opts.config_path, "src/config/test.toml");
466 } else {
467 assert!(false);
468 }
469
470 if let Some(frontend_opts) = &actual.frontend_opts {
471 assert_eq!(frontend_opts.config_path, "src/config/test.toml");
472 assert_eq!(frontend_opts.temp_secret_file_dir, "./frontend/secrets/");
473 assert_eq!(frontend_opts.frontend_total_memory_bytes, 34359738368);
474 assert_eq!(frontend_opts.prometheus_listener_addr, "127.0.0.1:1234");
475 } else {
476 assert!(false);
477 }
478
479 assert!(actual.compactor_opts.is_none());
480 }
481}