risingwave_cmd_all/
standalone.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::fmt::Write;
17use std::future::Future;
18use std::path::Path;
19
20use clap::Parser;
21use risingwave_common::config::{MetaBackend, load_config};
22use risingwave_common::license::LicenseManager;
23use risingwave_common::util::env_var::env_var_is_true;
24use risingwave_common::util::meta_addr::MetaAddressStrategy;
25use risingwave_common::util::runtime::BackgroundShutdownRuntime;
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_compactor::CompactorOpts;
28use risingwave_compute::ComputeNodeOpts;
29use risingwave_frontend::FrontendOpts;
30use risingwave_meta_node::MetaNodeOpts;
31use shell_words::split;
32
33use crate::common::osstrs;
34
35#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
36#[command(
37    version,
38    about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service",
39    hide = true
40)]
41pub struct StandaloneOpts {
42    /// Compute node options
43    /// If missing, compute node won't start
44    #[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")]
45    compute_opts: Option<String>,
46
47    #[clap(short, long, env = "RW_STANDALONE_META_OPTS")]
48    /// Meta node options
49    /// If missing, meta node won't start
50    meta_opts: Option<String>,
51
52    #[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")]
53    /// Frontend node options
54    /// If missing, frontend node won't start
55    frontend_opts: Option<String>,
56
57    #[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")]
58    /// Compactor node options
59    /// If missing compactor node won't start
60    compactor_opts: Option<String>,
61
62    #[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")]
63    /// Prometheus listener address
64    /// If present, it will override prometheus listener address for
65    /// Frontend, Compute and Compactor nodes
66    prometheus_listener_addr: Option<String>,
67
68    #[clap(long, env = "RW_STANDALONE_CONFIG_PATH")]
69    /// Path to the config file
70    /// If present, it will override config path for
71    /// Frontend, Compute and Compactor nodes
72    config_path: Option<String>,
73}
74
75#[derive(Debug)]
76pub struct ParsedStandaloneOpts {
77    pub meta_opts: Option<MetaNodeOpts>,
78    pub compute_opts: Option<ComputeNodeOpts>,
79    pub frontend_opts: Option<FrontendOpts>,
80    pub compactor_opts: Option<CompactorOpts>,
81}
82
83impl risingwave_common::opts::Opts for ParsedStandaloneOpts {
84    fn name() -> &'static str {
85        "standalone"
86    }
87
88    fn meta_addr(&self) -> MetaAddressStrategy {
89        if let Some(opts) = self.meta_opts.as_ref() {
90            opts.meta_addr()
91        } else if let Some(opts) = self.compute_opts.as_ref() {
92            opts.meta_addr()
93        } else if let Some(opts) = self.frontend_opts.as_ref() {
94            opts.meta_addr()
95        } else if let Some(opts) = self.compactor_opts.as_ref() {
96            opts.meta_addr()
97        } else {
98            unreachable!("at least one service should be specified as checked during parsing")
99        }
100    }
101}
102
103pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
104    let meta_opts = opts.meta_opts.as_ref().map(|s| {
105        let mut s = split(s).unwrap();
106        s.insert(0, "meta-node".into());
107        s
108    });
109    let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o)));
110
111    let compute_opts = opts.compute_opts.as_ref().map(|s| {
112        let mut s = split(s).unwrap();
113        s.insert(0, "compute-node".into());
114        s
115    });
116    let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o)));
117
118    let frontend_opts = opts.frontend_opts.as_ref().map(|s| {
119        let mut s = split(s).unwrap();
120        s.insert(0, "frontend-node".into());
121        s
122    });
123    let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o)));
124
125    let compactor_opts = opts.compactor_opts.as_ref().map(|s| {
126        let mut s = split(s).unwrap();
127        s.insert(0, "compactor-node".into());
128        s
129    });
130    let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o)));
131
132    if let Some(config_path) = opts.config_path.as_ref() {
133        if let Some(meta_opts) = meta_opts.as_mut() {
134            meta_opts.config_path.clone_from(config_path);
135        }
136        if let Some(compute_opts) = compute_opts.as_mut() {
137            compute_opts.config_path.clone_from(config_path);
138        }
139        if let Some(frontend_opts) = frontend_opts.as_mut() {
140            frontend_opts.config_path.clone_from(config_path);
141        }
142        if let Some(compactor_opts) = compactor_opts.as_mut() {
143            compactor_opts.config_path.clone_from(config_path);
144        }
145    }
146    if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() {
147        if let Some(compute_opts) = compute_opts.as_mut() {
148            compute_opts
149                .prometheus_listener_addr
150                .clone_from(prometheus_listener_addr);
151        }
152        if let Some(frontend_opts) = frontend_opts.as_mut() {
153            frontend_opts
154                .prometheus_listener_addr
155                .clone_from(prometheus_listener_addr);
156        }
157        if let Some(compactor_opts) = compactor_opts.as_mut() {
158            compactor_opts
159                .prometheus_listener_addr
160                .clone_from(prometheus_listener_addr);
161        }
162        if let Some(meta_opts) = meta_opts.as_mut() {
163            meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
164        }
165    }
166
167    if meta_opts.is_none()
168        && compute_opts.is_none()
169        && frontend_opts.is_none()
170        && compactor_opts.is_none()
171    {
172        panic!("No service is specified to start.");
173    }
174
175    ParsedStandaloneOpts {
176        meta_opts,
177        compute_opts,
178        frontend_opts,
179        compactor_opts,
180    }
181}
182
183/// A service under standalone mode.
184struct Service {
185    name: &'static str,
186    runtime: BackgroundShutdownRuntime,
187    main_task: tokio::task::JoinHandle<()>,
188    shutdown: CancellationToken,
189}
190
191impl Service {
192    /// Spawn a new tokio runtime and start a service in it.
193    ///
194    /// By using a separate runtime, we get better isolation between services. For example,
195    ///
196    /// - The logs in the main runtime of each service can be distinguished by the thread name.
197    /// - Each service can be shutdown cleanly by shutting down its runtime.
198    fn spawn<F, Fut>(name: &'static str, f: F) -> Self
199    where
200        F: FnOnce(CancellationToken) -> Fut,
201        Fut: Future<Output = ()> + Send + 'static,
202    {
203        let runtime = tokio::runtime::Builder::new_multi_thread()
204            .thread_name(format!("rw-standalone-{name}"))
205            .enable_all()
206            .build()
207            .unwrap();
208        let shutdown = CancellationToken::new();
209        let main_task = runtime.spawn(f(shutdown.clone()));
210
211        Self {
212            name,
213            runtime: runtime.into(),
214            main_task,
215            shutdown,
216        }
217    }
218
219    /// Shutdown the service and the runtime gracefully.
220    ///
221    /// As long as the main task of the service is resolved after signaling `shutdown`,
222    /// the service is considered stopped and the runtime will be shutdown. This follows
223    /// the same convention as described in `risingwave_rt::main_okk`.
224    async fn shutdown(self) {
225        tracing::info!("stopping {} service...", self.name);
226
227        self.shutdown.cancel();
228        let _ = self.main_task.await;
229        drop(self.runtime); // shutdown in background
230
231        tracing::info!("{} service stopped", self.name);
232    }
233}
234
235/// For `standalone` mode, we can configure and start multiple services in one process.
236/// `standalone` mode is meant to be used by our cloud service and docker,
237/// where we can configure and start multiple services in one process.
238///
239/// Services are started in the order of `meta`, `compute`, `frontend`, then `compactor`.
240/// When the `shutdown` token is signaled, all services will be stopped gracefully in the
241/// reverse order.
242pub async fn standalone(
243    ParsedStandaloneOpts {
244        meta_opts,
245        compute_opts,
246        frontend_opts,
247        compactor_opts,
248    }: ParsedStandaloneOpts,
249    shutdown: CancellationToken,
250) {
251    tracing::info!("launching Risingwave in standalone mode");
252
253    // Skip license resource limit checks for single-node deployments
254    // (standalone with local meta backend (sqlite/mem)).
255    if let Some(opts) = meta_opts.as_ref() {
256        let config = load_config(&opts.config_path, opts);
257        let is_local_meta_backend =
258            matches!(config.meta.backend, MetaBackend::Mem | MetaBackend::Sqlite)
259                || (matches!(config.meta.backend, MetaBackend::Sql)
260                    && opts
261                        .sql_endpoint
262                        .as_ref()
263                        .map(|endpoint| endpoint.expose_secret().starts_with("sqlite:"))
264                        .unwrap_or(false));
265        if is_local_meta_backend {
266            LicenseManager::get().set_ignore_resource_limit(true);
267            tracing::info!(
268                "standalone meta backend {:?}; ignore license resource limits",
269                config.meta.backend
270            );
271        }
272    }
273
274    let (meta, is_in_memory) = if let Some(opts) = meta_opts.clone() {
275        let is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
276        tracing::info!("starting meta-node thread with cli args: {:?}", opts);
277        let service = Service::spawn("meta", |shutdown| {
278            risingwave_meta_node::start(opts, shutdown)
279        });
280
281        // wait for the service to be ready
282        let mut tries = 0;
283        while !risingwave_meta_node::is_server_started() {
284            if tries % 50 == 0 {
285                tracing::info!("waiting for meta service to be ready...");
286            }
287            if service.main_task.is_finished() {
288                tracing::error!("meta service failed to start, exiting...");
289                return;
290            }
291            tries += 1;
292            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
293        }
294
295        (Some(service), is_in_memory)
296    } else {
297        (None, false)
298    };
299
300    let compute = if let Some(opts) = compute_opts {
301        tracing::info!("starting compute-node thread with cli args: {:?}", opts);
302        let service = Service::spawn("compute", |shutdown| {
303            risingwave_compute::start(opts, shutdown)
304        });
305        Some(service)
306    } else {
307        None
308    };
309
310    let frontend = if let Some(opts) = frontend_opts.clone() {
311        tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
312        let service = Service::spawn("frontend", |shutdown| {
313            risingwave_frontend::start(opts, shutdown)
314        });
315        Some(service)
316    } else {
317        None
318    };
319
320    let compactor = if let Some(opts) = compactor_opts {
321        tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
322        let service = Service::spawn("compactor", |shutdown| {
323            risingwave_compactor::start(opts, shutdown)
324        });
325        Some(service)
326    } else {
327        None
328    };
329
330    // wait for log messages to be flushed
331    tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
332
333    eprintln!("----------------------------------------");
334    eprintln!("| RisingWave standalone mode is ready. |");
335    eprintln!("----------------------------------------");
336    if is_in_memory {
337        eprintln!(
338            "{}",
339            console::style(
340                "WARNING: You are using RisingWave's in-memory mode.
341It SHOULD NEVER be used in benchmarks and production environment!!!"
342            )
343            .red()
344            .bold()
345        );
346    }
347
348    // This is a specialization of `generate_risedev_env` in `src/risedevtool/src/risedev_env.rs`.
349    let mut risedev_env = String::new();
350
351    if let Some(opts) = &frontend_opts {
352        let host = opts.listen_addr.split(':').next().unwrap_or("localhost");
353        let port = opts.listen_addr.split(':').next_back().unwrap_or("4566");
354        let database = "dev";
355        let user = "root";
356
357        writeln!(
358            risedev_env,
359            r#"RISEDEV_RW_FRONTEND_LISTEN_ADDRESS="{host}""#
360        )
361        .unwrap();
362        writeln!(risedev_env, r#"RISEDEV_RW_FRONTEND_PORT="{port}""#).unwrap();
363
364        eprintln!();
365        eprintln!("Connect to the RisingWave instance via psql:");
366        eprintln!(
367            "{}",
368            console::style(format!(
369                "  psql -h {host} -p {port} -d {database} -U {user}"
370            ))
371            .blue()
372        );
373    }
374
375    if let Some(opts) = &meta_opts {
376        let meta_addr = &opts.listen_addr;
377        writeln!(risedev_env, r#"RW_META_ADDR="http://{meta_addr}""#).unwrap();
378    }
379
380    // Create the environment file when launched by RiseDev.
381    if env_var_is_true("RISEDEV") {
382        let env_path = Path::new(
383            &env::var("PREFIX_CONFIG").expect("env var `PREFIX_CONFIG` must be set by RiseDev"),
384        )
385        .join("risedev-env");
386        std::fs::write(env_path, risedev_env).unwrap();
387    }
388
389    let meta_stopped = meta
390        .as_ref()
391        .map(|m| m.shutdown.clone())
392        // If there's no meta service, use a dummy token which will never resolve.
393        .unwrap_or_else(CancellationToken::new)
394        .cancelled_owned();
395
396    // Wait for shutdown signals.
397    tokio::select! {
398        // Meta service stopped itself, typically due to leadership loss of idleness.
399        // Directly exit in this case.
400        _ = meta_stopped => {
401            tracing::info!("meta service is stopped, terminating...");
402        }
403
404        // Shutdown requested by the user.
405        _ = shutdown.cancelled() => {
406            for service in [compactor, frontend, compute, meta].into_iter().flatten() {
407                service.shutdown().await;
408            }
409            tracing::info!("all services stopped, bye");
410        }
411    }
412}
413
414#[cfg(test)]
415mod test {
416    use super::*;
417
418    #[allow(clippy::assertions_on_constants)]
419    #[test]
420    fn test_parse_opt_args() {
421        // Test parsing into standalone-level opts.
422        let raw_opts = "
423--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/
424--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/
425--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368
426--prometheus-listener-addr=127.0.0.1:1234
427--config-path=src/config/test.toml
428";
429        let actual = StandaloneOpts::parse_from(raw_opts.lines());
430        let opts = StandaloneOpts {
431            compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()),
432            meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()),
433            frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ),
434            compactor_opts: None,
435            prometheus_listener_addr: Some("127.0.0.1:1234".into()),
436            config_path: Some("src/config/test.toml".into()),
437        };
438        assert_eq!(actual, opts);
439
440        // Test parsing into node-level opts.
441        let actual = parse_standalone_opt_args(&opts);
442
443        if let Some(compute_opts) = &actual.compute_opts {
444            assert_eq!(compute_opts.listen_addr, "127.0.0.1:8000");
445            assert_eq!(compute_opts.total_memory_bytes, 34359738368);
446            assert_eq!(compute_opts.parallelism, 10);
447            assert_eq!(compute_opts.temp_secret_file_dir, "./compute/secrets/");
448            assert_eq!(compute_opts.prometheus_listener_addr, "127.0.0.1:1234");
449            assert_eq!(compute_opts.config_path, "src/config/test.toml");
450        } else {
451            assert!(false);
452        }
453        if let Some(meta_opts) = &actual.meta_opts {
454            assert_eq!(meta_opts.listen_addr, "127.0.0.1:8001");
455            assert_eq!(meta_opts.advertise_addr, "127.0.0.1:9999");
456            assert_eq!(
457                meta_opts.data_directory,
458                Some("some path with spaces".to_owned())
459            );
460            assert_eq!(meta_opts.temp_secret_file_dir, "./meta/secrets/");
461            assert_eq!(
462                meta_opts.prometheus_listener_addr,
463                Some("127.0.0.1:1234".to_owned())
464            );
465            assert_eq!(meta_opts.config_path, "src/config/test.toml");
466        } else {
467            assert!(false);
468        }
469
470        if let Some(frontend_opts) = &actual.frontend_opts {
471            assert_eq!(frontend_opts.config_path, "src/config/test.toml");
472            assert_eq!(frontend_opts.temp_secret_file_dir, "./frontend/secrets/");
473            assert_eq!(frontend_opts.frontend_total_memory_bytes, 34359738368);
474            assert_eq!(frontend_opts.prometheus_listener_addr, "127.0.0.1:1234");
475        } else {
476            assert!(false);
477        }
478
479        assert!(actual.compactor_opts.is_none());
480    }
481}