risingwave_rt/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configures the RisingWave binary, including logging, locks, panic handler, etc.
16//!
17//! See [`init_risingwave_logger`] and [`main_okk`] for more details, including supported
18//! environment variables.
19
20#![feature(panic_update_hook)]
21#![feature(exitcode_exit_method)]
22
23use std::pin::pin;
24use std::process::ExitCode;
25
26use futures::Future;
27use risingwave_common::util::tokio_util::sync::CancellationToken;
28
29mod logger;
30pub use logger::*;
31mod deadlock;
32pub use deadlock::*;
33mod panic_hook;
34pub use panic_hook::*;
35mod prof;
36use prof::*;
37use tokio::signal::unix::SignalKind;
38
39const MIN_WORKER_THREADS: usize = 4;
40
41/// Start RisingWave components with configs from environment variable.
42///
43/// # Shutdown on Ctrl-C
44///
45/// The given closure `f` will take a [`CancellationToken`] as an argument. When a `SIGINT` signal
46/// is received (typically by pressing `Ctrl-C`), [`CancellationToken::cancel`] will be called to
47/// notify all subscribers to shutdown. You can use [`.cancelled()`](CancellationToken::cancelled)
48/// to get notified on this.
49///
50/// Users may also send a second `SIGINT` signal to force shutdown. In this case, this function
51/// will exit the process with a non-zero exit code.
52///
53/// When `f` returns, this function will assume that the component has finished its work and it's
54/// safe to exit. Therefore, this function will exit the process with exit code 0 **without**
55/// waiting for background tasks to finish. In other words, it's the responsibility of `f` to
56/// ensure that all essential background tasks are finished before returning.
57///
58/// # Environment variables
59///
60/// Currently, the following environment variables will be read and used to configure the runtime.
61///
62/// * `RW_WORKER_THREADS` (alias of `TOKIO_WORKER_THREADS`): number of tokio worker threads. If
63///   not set, it will be decided by tokio. Note that this can still be overridden by per-module
64///   runtime worker thread settings in the config file.
65/// * `RW_DEADLOCK_DETECTION`: whether to enable deadlock detection. If not set, will enable in
66///   debug mode, and disable in release mode.
67/// * `RW_PROFILE_PATH`: the path to generate flamegraph. If set, then profiling is automatically
68///   enabled.
69pub fn main_okk<F, Fut>(f: F) -> !
70where
71    F: FnOnce(CancellationToken) -> Fut,
72    Fut: Future<Output = ()> + Send + 'static,
73{
74    set_panic_hook();
75
76    rustls::crypto::ring::default_provider()
77        .install_default()
78        .inspect_err(|e| {
79            tracing::error!(?e, "Failed to install default crypto provider.");
80        })
81        .unwrap();
82    risingwave_variables::init_server_start_time();
83
84    // `TOKIO` will be read by tokio. Duplicate `RW` for compatibility.
85    if let Some(worker_threads) = std::env::var_os("RW_WORKER_THREADS") {
86        // safety: single-threaded now.
87        unsafe { std::env::set_var("TOKIO_WORKER_THREADS", worker_threads) };
88    }
89
90    // Set the default number of worker threads to be at least `MIN_WORKER_THREADS`, in production.
91    if !cfg!(debug_assertions) {
92        let worker_threads = match std::env::var("TOKIO_WORKER_THREADS") {
93            Ok(v) => v
94                .parse::<usize>()
95                .expect("Failed to parse TOKIO_WORKER_THREADS"),
96            Err(std::env::VarError::NotPresent) => std::thread::available_parallelism()
97                .expect("Failed to get available parallelism")
98                .get(),
99            Err(_) => panic!("Failed to parse TOKIO_WORKER_THREADS"),
100        };
101        if worker_threads < MIN_WORKER_THREADS {
102            tracing::warn!(
103                "the default number of worker threads ({worker_threads}) is too small, which may lead to issues, increasing to {MIN_WORKER_THREADS}"
104            );
105            // safety: single-threaded now.
106            unsafe { std::env::set_var("TOKIO_WORKER_THREADS", MIN_WORKER_THREADS.to_string()) };
107        }
108    }
109
110    if let Ok(enable_deadlock_detection) = std::env::var("RW_DEADLOCK_DETECTION") {
111        let enable_deadlock_detection = enable_deadlock_detection
112            .parse()
113            .expect("Failed to parse RW_DEADLOCK_DETECTION");
114        if enable_deadlock_detection {
115            enable_parking_lot_deadlock_detection();
116        }
117    } else {
118        // In case the env variable is not set
119        if cfg!(debug_assertions) {
120            enable_parking_lot_deadlock_detection();
121        }
122    }
123
124    if let Ok(profile_path) = std::env::var("RW_PROFILE_PATH") {
125        spawn_prof_thread(profile_path);
126    }
127
128    let future_with_shutdown = async move {
129        let shutdown = CancellationToken::new();
130        let mut fut = pin!(f(shutdown.clone()));
131
132        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
133        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
134
135        tokio::select! {
136            biased;
137
138            // Watch SIGINT, typically originating from user pressing ctrl-c.
139            // Attempt to shutdown gracefully and force shutdown on the next signal.
140            _ = sigint.recv() => {
141                tracing::info!("received ctrl-c, shutting down... (press ctrl-c again to force shutdown)");
142                shutdown.cancel();
143
144                // While waiting for the future to finish, listen for the second ctrl-c signal.
145                tokio::select! {
146                    biased;
147                    _ = sigint.recv() => {
148                        tracing::warn!("forced shutdown");
149
150                        // Directly exit the process **here** instead of returning from the future, since
151                        // we don't even want to run destructors but only exit as soon as possible.
152                        ExitCode::FAILURE.exit_process();
153                    }
154                    _ = &mut fut => {},
155                }
156            }
157
158            // Watch SIGTERM, typically originating from Kubernetes.
159            // Attempt to shutdown gracefully. No need to force shutdown since it will send SIGKILL after a timeout.
160            _ = sigterm.recv() => {
161                tracing::info!("received SIGTERM, shutting down...");
162                shutdown.cancel();
163                fut.await;
164            }
165
166            // Proceed with the future.
167            _ = &mut fut => {},
168        }
169    };
170
171    let runtime = tokio::runtime::Builder::new_multi_thread()
172        .thread_name("rw-main")
173        .enable_all()
174        .build()
175        .unwrap();
176
177    runtime.block_on(future_with_shutdown);
178
179    // Shutdown the runtime and exit the process, without waiting for background tasks to finish.
180    // See the doc on this function for more details.
181    // TODO(shutdown): is it necessary to shutdown here as we're going to exit?
182    runtime.shutdown_background();
183    ExitCode::SUCCESS.exit_process();
184}