risingwave_rt/lib.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configures the RisingWave binary, including logging, locks, panic handler, etc.
16//!
17//! See [`init_risingwave_logger`] and [`main_okk`] for more details, including supported
18//! environment variables.
19
20#![feature(panic_update_hook)]
21#![feature(exitcode_exit_method)]
22
23use std::pin::pin;
24use std::process::ExitCode;
25
26use futures::Future;
27use risingwave_common::util::tokio_util::sync::CancellationToken;
28
29mod logger;
30pub use logger::*;
31mod deadlock;
32pub use deadlock::*;
33mod panic_hook;
34pub use panic_hook::*;
35mod prof;
36use prof::*;
37use tokio::signal::unix::SignalKind;
38
39const MIN_WORKER_THREADS: usize = 4;
40
41/// Start RisingWave components with configs from environment variable.
42///
43/// # Shutdown on Ctrl-C
44///
45/// The given closure `f` will take a [`CancellationToken`] as an argument. When a `SIGINT` signal
46/// is received (typically by pressing `Ctrl-C`), [`CancellationToken::cancel`] will be called to
47/// notify all subscribers to shutdown. You can use [`.cancelled()`](CancellationToken::cancelled)
48/// to get notified on this.
49///
50/// Users may also send a second `SIGINT` signal to force shutdown. In this case, this function
51/// will exit the process with a non-zero exit code.
52///
53/// When `f` returns, this function will assume that the component has finished its work and it's
54/// safe to exit. Therefore, this function will exit the process with exit code 0 **without**
55/// waiting for background tasks to finish. In other words, it's the responsibility of `f` to
56/// ensure that all essential background tasks are finished before returning.
57///
58/// # Environment variables
59///
60/// Currently, the following environment variables will be read and used to configure the runtime.
61///
62/// * `RW_WORKER_THREADS` (alias of `TOKIO_WORKER_THREADS`): number of tokio worker threads. If
63/// not set, it will be decided by tokio. Note that this can still be overridden by per-module
64/// runtime worker thread settings in the config file.
65/// * `RW_DEADLOCK_DETECTION`: whether to enable deadlock detection. If not set, will enable in
66/// debug mode, and disable in release mode.
67/// * `RW_PROFILE_PATH`: the path to generate flamegraph. If set, then profiling is automatically
68/// enabled.
69pub fn main_okk<F, Fut>(f: F) -> !
70where
71 F: FnOnce(CancellationToken) -> Fut,
72 Fut: Future<Output = ()> + Send + 'static,
73{
74 set_panic_hook();
75
76 rustls::crypto::ring::default_provider()
77 .install_default()
78 .inspect_err(|e| {
79 tracing::error!(?e, "Failed to install default crypto provider.");
80 })
81 .unwrap();
82 risingwave_variables::init_server_start_time();
83
84 // `TOKIO` will be read by tokio. Duplicate `RW` for compatibility.
85 if let Some(worker_threads) = std::env::var_os("RW_WORKER_THREADS") {
86 // safety: single-threaded now.
87 unsafe { std::env::set_var("TOKIO_WORKER_THREADS", worker_threads) };
88 }
89
90 // Set the default number of worker threads to be at least `MIN_WORKER_THREADS`, in production.
91 if !cfg!(debug_assertions) {
92 let worker_threads = match std::env::var("TOKIO_WORKER_THREADS") {
93 Ok(v) => v
94 .parse::<usize>()
95 .expect("Failed to parse TOKIO_WORKER_THREADS"),
96 Err(std::env::VarError::NotPresent) => std::thread::available_parallelism()
97 .expect("Failed to get available parallelism")
98 .get(),
99 Err(_) => panic!("Failed to parse TOKIO_WORKER_THREADS"),
100 };
101 if worker_threads < MIN_WORKER_THREADS {
102 tracing::warn!(
103 "the default number of worker threads ({worker_threads}) is too small, which may lead to issues, increasing to {MIN_WORKER_THREADS}"
104 );
105 // safety: single-threaded now.
106 unsafe { std::env::set_var("TOKIO_WORKER_THREADS", MIN_WORKER_THREADS.to_string()) };
107 }
108 }
109
110 if let Ok(enable_deadlock_detection) = std::env::var("RW_DEADLOCK_DETECTION") {
111 let enable_deadlock_detection = enable_deadlock_detection
112 .parse()
113 .expect("Failed to parse RW_DEADLOCK_DETECTION");
114 if enable_deadlock_detection {
115 enable_parking_lot_deadlock_detection();
116 }
117 } else {
118 // In case the env variable is not set
119 if cfg!(debug_assertions) {
120 enable_parking_lot_deadlock_detection();
121 }
122 }
123
124 if let Ok(profile_path) = std::env::var("RW_PROFILE_PATH") {
125 spawn_prof_thread(profile_path);
126 }
127
128 let future_with_shutdown = async move {
129 let shutdown = CancellationToken::new();
130 let mut fut = pin!(f(shutdown.clone()));
131
132 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
133 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
134
135 tokio::select! {
136 biased;
137
138 // Watch SIGINT, typically originating from user pressing ctrl-c.
139 // Attempt to shutdown gracefully and force shutdown on the next signal.
140 _ = sigint.recv() => {
141 tracing::info!("received ctrl-c, shutting down... (press ctrl-c again to force shutdown)");
142 shutdown.cancel();
143
144 // While waiting for the future to finish, listen for the second ctrl-c signal.
145 tokio::select! {
146 biased;
147 _ = sigint.recv() => {
148 tracing::warn!("forced shutdown");
149
150 // Directly exit the process **here** instead of returning from the future, since
151 // we don't even want to run destructors but only exit as soon as possible.
152 ExitCode::FAILURE.exit_process();
153 }
154 _ = &mut fut => {},
155 }
156 }
157
158 // Watch SIGTERM, typically originating from Kubernetes.
159 // Attempt to shutdown gracefully. No need to force shutdown since it will send SIGKILL after a timeout.
160 _ = sigterm.recv() => {
161 tracing::info!("received SIGTERM, shutting down...");
162 shutdown.cancel();
163 fut.await;
164 }
165
166 // Proceed with the future.
167 _ = &mut fut => {},
168 }
169 };
170
171 let runtime = tokio::runtime::Builder::new_multi_thread()
172 .thread_name("rw-main")
173 .enable_all()
174 .build()
175 .unwrap();
176
177 runtime.block_on(future_with_shutdown);
178
179 // Shutdown the runtime and exit the process, without waiting for background tasks to finish.
180 // See the doc on this function for more details.
181 // TODO(shutdown): is it necessary to shutdown here as we're going to exit?
182 runtime.shutdown_background();
183 ExitCode::SUCCESS.exit_process();
184}