risingwave_cmd_all/
standalone.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::fmt::Write;
17use std::future::Future;
18use std::path::Path;
19
20use clap::Parser;
21use risingwave_common::config::MetaBackend;
22use risingwave_common::util::env_var::env_var_is_true;
23use risingwave_common::util::meta_addr::MetaAddressStrategy;
24use risingwave_common::util::runtime::BackgroundShutdownRuntime;
25use risingwave_common::util::tokio_util::sync::CancellationToken;
26use risingwave_compactor::CompactorOpts;
27use risingwave_compute::ComputeNodeOpts;
28use risingwave_frontend::FrontendOpts;
29use risingwave_meta_node::MetaNodeOpts;
30use shell_words::split;
31
32use crate::common::osstrs;
33
34#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
35#[command(
36    version,
37    about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service",
38    hide = true
39)]
40pub struct StandaloneOpts {
41    /// Compute node options
42    /// If missing, compute node won't start
43    #[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")]
44    compute_opts: Option<String>,
45
46    #[clap(short, long, env = "RW_STANDALONE_META_OPTS")]
47    /// Meta node options
48    /// If missing, meta node won't start
49    meta_opts: Option<String>,
50
51    #[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")]
52    /// Frontend node options
53    /// If missing, frontend node won't start
54    frontend_opts: Option<String>,
55
56    #[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")]
57    /// Compactor node options
58    /// If missing compactor node won't start
59    compactor_opts: Option<String>,
60
61    #[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")]
62    /// Prometheus listener address
63    /// If present, it will override prometheus listener address for
64    /// Frontend, Compute and Compactor nodes
65    prometheus_listener_addr: Option<String>,
66
67    #[clap(long, env = "RW_STANDALONE_CONFIG_PATH")]
68    /// Path to the config file
69    /// If present, it will override config path for
70    /// Frontend, Compute and Compactor nodes
71    config_path: Option<String>,
72}
73
74#[derive(Debug)]
75pub struct ParsedStandaloneOpts {
76    pub meta_opts: Option<MetaNodeOpts>,
77    pub compute_opts: Option<ComputeNodeOpts>,
78    pub frontend_opts: Option<FrontendOpts>,
79    pub compactor_opts: Option<CompactorOpts>,
80}
81
82impl risingwave_common::opts::Opts for ParsedStandaloneOpts {
83    fn name() -> &'static str {
84        "standalone"
85    }
86
87    fn meta_addr(&self) -> MetaAddressStrategy {
88        if let Some(opts) = self.meta_opts.as_ref() {
89            opts.meta_addr()
90        } else if let Some(opts) = self.compute_opts.as_ref() {
91            opts.meta_addr()
92        } else if let Some(opts) = self.frontend_opts.as_ref() {
93            opts.meta_addr()
94        } else if let Some(opts) = self.compactor_opts.as_ref() {
95            opts.meta_addr()
96        } else {
97            unreachable!("at least one service should be specified as checked during parsing")
98        }
99    }
100}
101
102pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
103    let meta_opts = opts.meta_opts.as_ref().map(|s| {
104        let mut s = split(s).unwrap();
105        s.insert(0, "meta-node".into());
106        s
107    });
108    let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o)));
109
110    let compute_opts = opts.compute_opts.as_ref().map(|s| {
111        let mut s = split(s).unwrap();
112        s.insert(0, "compute-node".into());
113        s
114    });
115    let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o)));
116
117    let frontend_opts = opts.frontend_opts.as_ref().map(|s| {
118        let mut s = split(s).unwrap();
119        s.insert(0, "frontend-node".into());
120        s
121    });
122    let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o)));
123
124    let compactor_opts = opts.compactor_opts.as_ref().map(|s| {
125        let mut s = split(s).unwrap();
126        s.insert(0, "compactor-node".into());
127        s
128    });
129    let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o)));
130
131    if let Some(config_path) = opts.config_path.as_ref() {
132        if let Some(meta_opts) = meta_opts.as_mut() {
133            meta_opts.config_path.clone_from(config_path);
134        }
135        if let Some(compute_opts) = compute_opts.as_mut() {
136            compute_opts.config_path.clone_from(config_path);
137        }
138        if let Some(frontend_opts) = frontend_opts.as_mut() {
139            frontend_opts.config_path.clone_from(config_path);
140        }
141        if let Some(compactor_opts) = compactor_opts.as_mut() {
142            compactor_opts.config_path.clone_from(config_path);
143        }
144    }
145    if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() {
146        if let Some(compute_opts) = compute_opts.as_mut() {
147            compute_opts
148                .prometheus_listener_addr
149                .clone_from(prometheus_listener_addr);
150        }
151        if let Some(frontend_opts) = frontend_opts.as_mut() {
152            frontend_opts
153                .prometheus_listener_addr
154                .clone_from(prometheus_listener_addr);
155        }
156        if let Some(compactor_opts) = compactor_opts.as_mut() {
157            compactor_opts
158                .prometheus_listener_addr
159                .clone_from(prometheus_listener_addr);
160        }
161        if let Some(meta_opts) = meta_opts.as_mut() {
162            meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
163        }
164    }
165
166    if meta_opts.is_none()
167        && compute_opts.is_none()
168        && frontend_opts.is_none()
169        && compactor_opts.is_none()
170    {
171        panic!("No service is specified to start.");
172    }
173
174    ParsedStandaloneOpts {
175        meta_opts,
176        compute_opts,
177        frontend_opts,
178        compactor_opts,
179    }
180}
181
182/// A service under standalone mode.
183struct Service {
184    name: &'static str,
185    runtime: BackgroundShutdownRuntime,
186    main_task: tokio::task::JoinHandle<()>,
187    shutdown: CancellationToken,
188}
189
190impl Service {
191    /// Spawn a new tokio runtime and start a service in it.
192    ///
193    /// By using a separate runtime, we get better isolation between services. For example,
194    ///
195    /// - The logs in the main runtime of each service can be distinguished by the thread name.
196    /// - Each service can be shutdown cleanly by shutting down its runtime.
197    fn spawn<F, Fut>(name: &'static str, f: F) -> Self
198    where
199        F: FnOnce(CancellationToken) -> Fut,
200        Fut: Future<Output = ()> + Send + 'static,
201    {
202        let runtime = tokio::runtime::Builder::new_multi_thread()
203            .thread_name(format!("rw-standalone-{name}"))
204            .enable_all()
205            .build()
206            .unwrap();
207        let shutdown = CancellationToken::new();
208        let main_task = runtime.spawn(f(shutdown.clone()));
209
210        Self {
211            name,
212            runtime: runtime.into(),
213            main_task,
214            shutdown,
215        }
216    }
217
218    /// Shutdown the service and the runtime gracefully.
219    ///
220    /// As long as the main task of the service is resolved after signaling `shutdown`,
221    /// the service is considered stopped and the runtime will be shutdown. This follows
222    /// the same convention as described in `risingwave_rt::main_okk`.
223    async fn shutdown(self) {
224        tracing::info!("stopping {} service...", self.name);
225
226        self.shutdown.cancel();
227        let _ = self.main_task.await;
228        drop(self.runtime); // shutdown in background
229
230        tracing::info!("{} service stopped", self.name);
231    }
232}
233
234/// For `standalone` mode, we can configure and start multiple services in one process.
235/// `standalone` mode is meant to be used by our cloud service and docker,
236/// where we can configure and start multiple services in one process.
237///
238/// Services are started in the order of `meta`, `compute`, `frontend`, then `compactor`.
239/// When the `shutdown` token is signaled, all services will be stopped gracefully in the
240/// reverse order.
241pub async fn standalone(
242    ParsedStandaloneOpts {
243        meta_opts,
244        compute_opts,
245        frontend_opts,
246        compactor_opts,
247    }: ParsedStandaloneOpts,
248    shutdown: CancellationToken,
249) {
250    tracing::info!("launching Risingwave in standalone mode");
251
252    let (meta, is_in_memory) = if let Some(opts) = meta_opts.clone() {
253        let is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
254        tracing::info!("starting meta-node thread with cli args: {:?}", opts);
255        let service = Service::spawn("meta", |shutdown| {
256            risingwave_meta_node::start(opts, shutdown)
257        });
258
259        // wait for the service to be ready
260        let mut tries = 0;
261        while !risingwave_meta_node::is_server_started() {
262            if tries % 50 == 0 {
263                tracing::info!("waiting for meta service to be ready...");
264            }
265            if service.main_task.is_finished() {
266                tracing::error!("meta service failed to start, exiting...");
267                return;
268            }
269            tries += 1;
270            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
271        }
272
273        (Some(service), is_in_memory)
274    } else {
275        (None, false)
276    };
277
278    let compute = if let Some(opts) = compute_opts {
279        tracing::info!("starting compute-node thread with cli args: {:?}", opts);
280        let service = Service::spawn("compute", |shutdown| {
281            risingwave_compute::start(opts, shutdown)
282        });
283        Some(service)
284    } else {
285        None
286    };
287
288    let frontend = if let Some(opts) = frontend_opts.clone() {
289        tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
290        let service = Service::spawn("frontend", |shutdown| {
291            risingwave_frontend::start(opts, shutdown)
292        });
293        Some(service)
294    } else {
295        None
296    };
297
298    let compactor = if let Some(opts) = compactor_opts {
299        tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
300        let service = Service::spawn("compactor", |shutdown| {
301            risingwave_compactor::start(opts, shutdown)
302        });
303        Some(service)
304    } else {
305        None
306    };
307
308    // wait for log messages to be flushed
309    tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
310
311    eprintln!("----------------------------------------");
312    eprintln!("| RisingWave standalone mode is ready. |");
313    eprintln!("----------------------------------------");
314    if is_in_memory {
315        eprintln!(
316            "{}",
317            console::style(
318                "WARNING: You are using RisingWave's in-memory mode.
319It SHOULD NEVER be used in benchmarks and production environment!!!"
320            )
321            .red()
322            .bold()
323        );
324    }
325
326    // This is a specialization of `generate_risedev_env` in `src/risedevtool/src/risedev_env.rs`.
327    let mut risedev_env = String::new();
328
329    if let Some(opts) = &frontend_opts {
330        let host = opts.listen_addr.split(':').next().unwrap_or("localhost");
331        let port = opts.listen_addr.split(':').next_back().unwrap_or("4566");
332        let database = "dev";
333        let user = "root";
334
335        writeln!(
336            risedev_env,
337            r#"RISEDEV_RW_FRONTEND_LISTEN_ADDRESS="{host}""#
338        )
339        .unwrap();
340        writeln!(risedev_env, r#"RISEDEV_RW_FRONTEND_PORT="{port}""#).unwrap();
341
342        eprintln!();
343        eprintln!("Connect to the RisingWave instance via psql:");
344        eprintln!(
345            "{}",
346            console::style(format!(
347                "  psql -h {host} -p {port} -d {database} -U {user}"
348            ))
349            .blue()
350        );
351    }
352
353    if let Some(opts) = &meta_opts {
354        let meta_addr = &opts.listen_addr;
355        writeln!(risedev_env, r#"RW_META_ADDR="http://{meta_addr}""#).unwrap();
356    }
357
358    // Create the environment file when launched by RiseDev.
359    if env_var_is_true("RISEDEV") {
360        let env_path = Path::new(
361            &env::var("PREFIX_CONFIG").expect("env var `PREFIX_CONFIG` must be set by RiseDev"),
362        )
363        .join("risedev-env");
364        std::fs::write(env_path, risedev_env).unwrap();
365    }
366
367    let meta_stopped = meta
368        .as_ref()
369        .map(|m| m.shutdown.clone())
370        // If there's no meta service, use a dummy token which will never resolve.
371        .unwrap_or_else(CancellationToken::new)
372        .cancelled_owned();
373
374    // Wait for shutdown signals.
375    tokio::select! {
376        // Meta service stopped itself, typically due to leadership loss of idleness.
377        // Directly exit in this case.
378        _ = meta_stopped => {
379            tracing::info!("meta service is stopped, terminating...");
380        }
381
382        // Shutdown requested by the user.
383        _ = shutdown.cancelled() => {
384            for service in [compactor, frontend, compute, meta].into_iter().flatten() {
385                service.shutdown().await;
386            }
387            tracing::info!("all services stopped, bye");
388        }
389    }
390}
391
392#[cfg(test)]
393mod test {
394    use super::*;
395
396    #[allow(clippy::assertions_on_constants)]
397    #[test]
398    fn test_parse_opt_args() {
399        // Test parsing into standalone-level opts.
400        let raw_opts = "
401--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/
402--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/
403--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368
404--prometheus-listener-addr=127.0.0.1:1234
405--config-path=src/config/test.toml
406";
407        let actual = StandaloneOpts::parse_from(raw_opts.lines());
408        let opts = StandaloneOpts {
409            compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()),
410            meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()),
411            frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ),
412            compactor_opts: None,
413            prometheus_listener_addr: Some("127.0.0.1:1234".into()),
414            config_path: Some("src/config/test.toml".into()),
415        };
416        assert_eq!(actual, opts);
417
418        // Test parsing into node-level opts.
419        let actual = parse_standalone_opt_args(&opts);
420
421        if let Some(compute_opts) = &actual.compute_opts {
422            assert_eq!(compute_opts.listen_addr, "127.0.0.1:8000");
423            assert_eq!(compute_opts.total_memory_bytes, 34359738368);
424            assert_eq!(compute_opts.parallelism, 10);
425            assert_eq!(compute_opts.temp_secret_file_dir, "./compute/secrets/");
426            assert_eq!(compute_opts.prometheus_listener_addr, "127.0.0.1:1234");
427            assert_eq!(compute_opts.config_path, "src/config/test.toml");
428        } else {
429            assert!(false);
430        }
431        if let Some(meta_opts) = &actual.meta_opts {
432            assert_eq!(meta_opts.listen_addr, "127.0.0.1:8001");
433            assert_eq!(meta_opts.advertise_addr, "127.0.0.1:9999");
434            assert_eq!(
435                meta_opts.data_directory,
436                Some("some path with spaces".to_owned())
437            );
438            assert_eq!(meta_opts.temp_secret_file_dir, "./meta/secrets/");
439            assert_eq!(
440                meta_opts.prometheus_listener_addr,
441                Some("127.0.0.1:1234".to_owned())
442            );
443            assert_eq!(meta_opts.config_path, "src/config/test.toml");
444        } else {
445            assert!(false);
446        }
447
448        if let Some(frontend_opts) = &actual.frontend_opts {
449            assert_eq!(frontend_opts.config_path, "src/config/test.toml");
450            assert_eq!(frontend_opts.temp_secret_file_dir, "./frontend/secrets/");
451            assert_eq!(frontend_opts.frontend_total_memory_bytes, 34359738368);
452            assert_eq!(frontend_opts.prometheus_listener_addr, "127.0.0.1:1234");
453        } else {
454            assert!(false);
455        }
456
457        assert!(actual.compactor_opts.is_none());
458    }
459}