1use std::env;
16use std::fmt::Write;
17use std::future::Future;
18use std::path::Path;
19
20use clap::Parser;
21use risingwave_common::config::MetaBackend;
22use risingwave_common::util::env_var::env_var_is_true;
23use risingwave_common::util::meta_addr::MetaAddressStrategy;
24use risingwave_common::util::runtime::BackgroundShutdownRuntime;
25use risingwave_common::util::tokio_util::sync::CancellationToken;
26use risingwave_compactor::CompactorOpts;
27use risingwave_compute::ComputeNodeOpts;
28use risingwave_frontend::FrontendOpts;
29use risingwave_meta_node::MetaNodeOpts;
30use shell_words::split;
31
32use crate::common::osstrs;
33
34#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
35#[command(
36 version,
37 about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service",
38 hide = true
39)]
40pub struct StandaloneOpts {
41 #[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")]
44 compute_opts: Option<String>,
45
46 #[clap(short, long, env = "RW_STANDALONE_META_OPTS")]
47 meta_opts: Option<String>,
50
51 #[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")]
52 frontend_opts: Option<String>,
55
56 #[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")]
57 compactor_opts: Option<String>,
60
61 #[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")]
62 prometheus_listener_addr: Option<String>,
66
67 #[clap(long, env = "RW_STANDALONE_CONFIG_PATH")]
68 config_path: Option<String>,
72}
73
74#[derive(Debug)]
75pub struct ParsedStandaloneOpts {
76 pub meta_opts: Option<MetaNodeOpts>,
77 pub compute_opts: Option<ComputeNodeOpts>,
78 pub frontend_opts: Option<FrontendOpts>,
79 pub compactor_opts: Option<CompactorOpts>,
80}
81
82impl risingwave_common::opts::Opts for ParsedStandaloneOpts {
83 fn name() -> &'static str {
84 "standalone"
85 }
86
87 fn meta_addr(&self) -> MetaAddressStrategy {
88 if let Some(opts) = self.meta_opts.as_ref() {
89 opts.meta_addr()
90 } else if let Some(opts) = self.compute_opts.as_ref() {
91 opts.meta_addr()
92 } else if let Some(opts) = self.frontend_opts.as_ref() {
93 opts.meta_addr()
94 } else if let Some(opts) = self.compactor_opts.as_ref() {
95 opts.meta_addr()
96 } else {
97 unreachable!("at least one service should be specified as checked during parsing")
98 }
99 }
100}
101
102pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
103 let meta_opts = opts.meta_opts.as_ref().map(|s| {
104 let mut s = split(s).unwrap();
105 s.insert(0, "meta-node".into());
106 s
107 });
108 let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o)));
109
110 let compute_opts = opts.compute_opts.as_ref().map(|s| {
111 let mut s = split(s).unwrap();
112 s.insert(0, "compute-node".into());
113 s
114 });
115 let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o)));
116
117 let frontend_opts = opts.frontend_opts.as_ref().map(|s| {
118 let mut s = split(s).unwrap();
119 s.insert(0, "frontend-node".into());
120 s
121 });
122 let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o)));
123
124 let compactor_opts = opts.compactor_opts.as_ref().map(|s| {
125 let mut s = split(s).unwrap();
126 s.insert(0, "compactor-node".into());
127 s
128 });
129 let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o)));
130
131 if let Some(config_path) = opts.config_path.as_ref() {
132 if let Some(meta_opts) = meta_opts.as_mut() {
133 meta_opts.config_path.clone_from(config_path);
134 }
135 if let Some(compute_opts) = compute_opts.as_mut() {
136 compute_opts.config_path.clone_from(config_path);
137 }
138 if let Some(frontend_opts) = frontend_opts.as_mut() {
139 frontend_opts.config_path.clone_from(config_path);
140 }
141 if let Some(compactor_opts) = compactor_opts.as_mut() {
142 compactor_opts.config_path.clone_from(config_path);
143 }
144 }
145 if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() {
146 if let Some(compute_opts) = compute_opts.as_mut() {
147 compute_opts
148 .prometheus_listener_addr
149 .clone_from(prometheus_listener_addr);
150 }
151 if let Some(frontend_opts) = frontend_opts.as_mut() {
152 frontend_opts
153 .prometheus_listener_addr
154 .clone_from(prometheus_listener_addr);
155 }
156 if let Some(compactor_opts) = compactor_opts.as_mut() {
157 compactor_opts
158 .prometheus_listener_addr
159 .clone_from(prometheus_listener_addr);
160 }
161 if let Some(meta_opts) = meta_opts.as_mut() {
162 meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
163 }
164 }
165
166 if meta_opts.is_none()
167 && compute_opts.is_none()
168 && frontend_opts.is_none()
169 && compactor_opts.is_none()
170 {
171 panic!("No service is specified to start.");
172 }
173
174 ParsedStandaloneOpts {
175 meta_opts,
176 compute_opts,
177 frontend_opts,
178 compactor_opts,
179 }
180}
181
182struct Service {
184 name: &'static str,
185 runtime: BackgroundShutdownRuntime,
186 main_task: tokio::task::JoinHandle<()>,
187 shutdown: CancellationToken,
188}
189
190impl Service {
191 fn spawn<F, Fut>(name: &'static str, f: F) -> Self
198 where
199 F: FnOnce(CancellationToken) -> Fut,
200 Fut: Future<Output = ()> + Send + 'static,
201 {
202 let runtime = tokio::runtime::Builder::new_multi_thread()
203 .thread_name(format!("rw-standalone-{name}"))
204 .enable_all()
205 .build()
206 .unwrap();
207 let shutdown = CancellationToken::new();
208 let main_task = runtime.spawn(f(shutdown.clone()));
209
210 Self {
211 name,
212 runtime: runtime.into(),
213 main_task,
214 shutdown,
215 }
216 }
217
218 async fn shutdown(self) {
224 tracing::info!("stopping {} service...", self.name);
225
226 self.shutdown.cancel();
227 let _ = self.main_task.await;
228 drop(self.runtime); tracing::info!("{} service stopped", self.name);
231 }
232}
233
234pub async fn standalone(
242 ParsedStandaloneOpts {
243 meta_opts,
244 compute_opts,
245 frontend_opts,
246 compactor_opts,
247 }: ParsedStandaloneOpts,
248 shutdown: CancellationToken,
249) {
250 tracing::info!("launching Risingwave in standalone mode");
251
252 let (meta, is_in_memory) = if let Some(opts) = meta_opts.clone() {
253 let is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
254 tracing::info!("starting meta-node thread with cli args: {:?}", opts);
255 let service = Service::spawn("meta", |shutdown| {
256 risingwave_meta_node::start(opts, shutdown)
257 });
258
259 let mut tries = 0;
261 while !risingwave_meta_node::is_server_started() {
262 if tries % 50 == 0 {
263 tracing::info!("waiting for meta service to be ready...");
264 }
265 if service.main_task.is_finished() {
266 tracing::error!("meta service failed to start, exiting...");
267 return;
268 }
269 tries += 1;
270 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
271 }
272
273 (Some(service), is_in_memory)
274 } else {
275 (None, false)
276 };
277
278 let compute = if let Some(opts) = compute_opts {
279 tracing::info!("starting compute-node thread with cli args: {:?}", opts);
280 let service = Service::spawn("compute", |shutdown| {
281 risingwave_compute::start(opts, shutdown)
282 });
283 Some(service)
284 } else {
285 None
286 };
287
288 let frontend = if let Some(opts) = frontend_opts.clone() {
289 tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
290 let service = Service::spawn("frontend", |shutdown| {
291 risingwave_frontend::start(opts, shutdown)
292 });
293 Some(service)
294 } else {
295 None
296 };
297
298 let compactor = if let Some(opts) = compactor_opts {
299 tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
300 let service = Service::spawn("compactor", |shutdown| {
301 risingwave_compactor::start(opts, shutdown)
302 });
303 Some(service)
304 } else {
305 None
306 };
307
308 tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
310
311 eprintln!("----------------------------------------");
312 eprintln!("| RisingWave standalone mode is ready. |");
313 eprintln!("----------------------------------------");
314 if is_in_memory {
315 eprintln!(
316 "{}",
317 console::style(
318 "WARNING: You are using RisingWave's in-memory mode.
319It SHOULD NEVER be used in benchmarks and production environment!!!"
320 )
321 .red()
322 .bold()
323 );
324 }
325
326 let mut risedev_env = String::new();
328
329 if let Some(opts) = &frontend_opts {
330 let host = opts.listen_addr.split(':').next().unwrap_or("localhost");
331 let port = opts.listen_addr.split(':').next_back().unwrap_or("4566");
332 let database = "dev";
333 let user = "root";
334
335 writeln!(
336 risedev_env,
337 r#"RISEDEV_RW_FRONTEND_LISTEN_ADDRESS="{host}""#
338 )
339 .unwrap();
340 writeln!(risedev_env, r#"RISEDEV_RW_FRONTEND_PORT="{port}""#).unwrap();
341
342 eprintln!();
343 eprintln!("Connect to the RisingWave instance via psql:");
344 eprintln!(
345 "{}",
346 console::style(format!(
347 " psql -h {host} -p {port} -d {database} -U {user}"
348 ))
349 .blue()
350 );
351 }
352
353 if let Some(opts) = &meta_opts {
354 let meta_addr = &opts.listen_addr;
355 writeln!(risedev_env, r#"RW_META_ADDR="http://{meta_addr}""#).unwrap();
356 }
357
358 if env_var_is_true("RISEDEV") {
360 let env_path = Path::new(
361 &env::var("PREFIX_CONFIG").expect("env var `PREFIX_CONFIG` must be set by RiseDev"),
362 )
363 .join("risedev-env");
364 std::fs::write(env_path, risedev_env).unwrap();
365 }
366
367 let meta_stopped = meta
368 .as_ref()
369 .map(|m| m.shutdown.clone())
370 .unwrap_or_else(CancellationToken::new)
372 .cancelled_owned();
373
374 tokio::select! {
376 _ = meta_stopped => {
379 tracing::info!("meta service is stopped, terminating...");
380 }
381
382 _ = shutdown.cancelled() => {
384 for service in [compactor, frontend, compute, meta].into_iter().flatten() {
385 service.shutdown().await;
386 }
387 tracing::info!("all services stopped, bye");
388 }
389 }
390}
391
392#[cfg(test)]
393mod test {
394 use super::*;
395
396 #[allow(clippy::assertions_on_constants)]
397 #[test]
398 fn test_parse_opt_args() {
399 let raw_opts = "
401--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/
402--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/
403--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368
404--prometheus-listener-addr=127.0.0.1:1234
405--config-path=src/config/test.toml
406";
407 let actual = StandaloneOpts::parse_from(raw_opts.lines());
408 let opts = StandaloneOpts {
409 compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()),
410 meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()),
411 frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ),
412 compactor_opts: None,
413 prometheus_listener_addr: Some("127.0.0.1:1234".into()),
414 config_path: Some("src/config/test.toml".into()),
415 };
416 assert_eq!(actual, opts);
417
418 let actual = parse_standalone_opt_args(&opts);
420
421 if let Some(compute_opts) = &actual.compute_opts {
422 assert_eq!(compute_opts.listen_addr, "127.0.0.1:8000");
423 assert_eq!(compute_opts.total_memory_bytes, 34359738368);
424 assert_eq!(compute_opts.parallelism, 10);
425 assert_eq!(compute_opts.temp_secret_file_dir, "./compute/secrets/");
426 assert_eq!(compute_opts.prometheus_listener_addr, "127.0.0.1:1234");
427 assert_eq!(compute_opts.config_path, "src/config/test.toml");
428 } else {
429 assert!(false);
430 }
431 if let Some(meta_opts) = &actual.meta_opts {
432 assert_eq!(meta_opts.listen_addr, "127.0.0.1:8001");
433 assert_eq!(meta_opts.advertise_addr, "127.0.0.1:9999");
434 assert_eq!(
435 meta_opts.data_directory,
436 Some("some path with spaces".to_owned())
437 );
438 assert_eq!(meta_opts.temp_secret_file_dir, "./meta/secrets/");
439 assert_eq!(
440 meta_opts.prometheus_listener_addr,
441 Some("127.0.0.1:1234".to_owned())
442 );
443 assert_eq!(meta_opts.config_path, "src/config/test.toml");
444 } else {
445 assert!(false);
446 }
447
448 if let Some(frontend_opts) = &actual.frontend_opts {
449 assert_eq!(frontend_opts.config_path, "src/config/test.toml");
450 assert_eq!(frontend_opts.temp_secret_file_dir, "./frontend/secrets/");
451 assert_eq!(frontend_opts.frontend_total_memory_bytes, 34359738368);
452 assert_eq!(frontend_opts.prometheus_listener_addr, "127.0.0.1:1234");
453 } else {
454 assert!(false);
455 }
456
457 assert!(actual.compactor_opts.is_none());
458 }
459}