risingwave_rt/lib.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configures the RisingWave binary, including logging, locks, panic handler, etc.
16//!
17//! See [`init_risingwave_logger`] and [`main_okk`] for more details, including supported
18//! environment variables.
19
20#![feature(panic_update_hook)]
21#![feature(let_chains)]
22#![feature(exitcode_exit_method)]
23
24use std::pin::pin;
25use std::process::ExitCode;
26
27use futures::Future;
28use risingwave_common::util::tokio_util::sync::CancellationToken;
29
30mod logger;
31pub use logger::*;
32mod deadlock;
33pub use deadlock::*;
34mod panic_hook;
35pub use panic_hook::*;
36mod prof;
37use prof::*;
38use tokio::signal::unix::SignalKind;
39
40const MIN_WORKER_THREADS: usize = 4;
41
42/// Start RisingWave components with configs from environment variable.
43///
44/// # Shutdown on Ctrl-C
45///
46/// The given closure `f` will take a [`CancellationToken`] as an argument. When a `SIGINT` signal
47/// is received (typically by pressing `Ctrl-C`), [`CancellationToken::cancel`] will be called to
48/// notify all subscribers to shutdown. You can use [`.cancelled()`](CancellationToken::cancelled)
49/// to get notified on this.
50///
51/// Users may also send a second `SIGINT` signal to force shutdown. In this case, this function
52/// will exit the process with a non-zero exit code.
53///
54/// When `f` returns, this function will assume that the component has finished its work and it's
55/// safe to exit. Therefore, this function will exit the process with exit code 0 **without**
56/// waiting for background tasks to finish. In other words, it's the responsibility of `f` to
57/// ensure that all essential background tasks are finished before returning.
58///
59/// # Environment variables
60///
61/// Currently, the following environment variables will be read and used to configure the runtime.
62///
63/// * `RW_WORKER_THREADS` (alias of `TOKIO_WORKER_THREADS`): number of tokio worker threads. If
64/// not set, it will be decided by tokio. Note that this can still be overridden by per-module
65/// runtime worker thread settings in the config file.
66/// * `RW_DEADLOCK_DETECTION`: whether to enable deadlock detection. If not set, will enable in
67/// debug mode, and disable in release mode.
68/// * `RW_PROFILE_PATH`: the path to generate flamegraph. If set, then profiling is automatically
69/// enabled.
70pub fn main_okk<F, Fut>(f: F) -> !
71where
72 F: FnOnce(CancellationToken) -> Fut,
73 Fut: Future<Output = ()> + Send + 'static,
74{
75 set_panic_hook();
76
77 rustls::crypto::ring::default_provider()
78 .install_default()
79 .inspect_err(|e| {
80 tracing::error!(?e, "Failed to install default crypto provider.");
81 })
82 .unwrap();
83 risingwave_variables::init_server_start_time();
84
85 // `TOKIO` will be read by tokio. Duplicate `RW` for compatibility.
86 if let Some(worker_threads) = std::env::var_os("RW_WORKER_THREADS") {
87 // safety: single-threaded now.
88 unsafe { std::env::set_var("TOKIO_WORKER_THREADS", worker_threads) };
89 }
90
91 // Set the default number of worker threads to be at least `MIN_WORKER_THREADS`, in production.
92 if !cfg!(debug_assertions) {
93 let worker_threads = match std::env::var("TOKIO_WORKER_THREADS") {
94 Ok(v) => v
95 .parse::<usize>()
96 .expect("Failed to parse TOKIO_WORKER_THREADS"),
97 Err(std::env::VarError::NotPresent) => std::thread::available_parallelism()
98 .expect("Failed to get available parallelism")
99 .get(),
100 Err(_) => panic!("Failed to parse TOKIO_WORKER_THREADS"),
101 };
102 if worker_threads < MIN_WORKER_THREADS {
103 tracing::warn!(
104 "the default number of worker threads ({worker_threads}) is too small, which may lead to issues, increasing to {MIN_WORKER_THREADS}"
105 );
106 // safety: single-threaded now.
107 unsafe { std::env::set_var("TOKIO_WORKER_THREADS", MIN_WORKER_THREADS.to_string()) };
108 }
109 }
110
111 if let Ok(enable_deadlock_detection) = std::env::var("RW_DEADLOCK_DETECTION") {
112 let enable_deadlock_detection = enable_deadlock_detection
113 .parse()
114 .expect("Failed to parse RW_DEADLOCK_DETECTION");
115 if enable_deadlock_detection {
116 enable_parking_lot_deadlock_detection();
117 }
118 } else {
119 // In case the env variable is not set
120 if cfg!(debug_assertions) {
121 enable_parking_lot_deadlock_detection();
122 }
123 }
124
125 if let Ok(profile_path) = std::env::var("RW_PROFILE_PATH") {
126 spawn_prof_thread(profile_path);
127 }
128
129 let future_with_shutdown = async move {
130 let shutdown = CancellationToken::new();
131 let mut fut = pin!(f(shutdown.clone()));
132
133 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
134 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
135
136 tokio::select! {
137 biased;
138
139 // Watch SIGINT, typically originating from user pressing ctrl-c.
140 // Attempt to shutdown gracefully and force shutdown on the next signal.
141 _ = sigint.recv() => {
142 tracing::info!("received ctrl-c, shutting down... (press ctrl-c again to force shutdown)");
143 shutdown.cancel();
144
145 // While waiting for the future to finish, listen for the second ctrl-c signal.
146 tokio::select! {
147 biased;
148 _ = sigint.recv() => {
149 tracing::warn!("forced shutdown");
150
151 // Directly exit the process **here** instead of returning from the future, since
152 // we don't even want to run destructors but only exit as soon as possible.
153 ExitCode::FAILURE.exit_process();
154 }
155 _ = &mut fut => {},
156 }
157 }
158
159 // Watch SIGTERM, typically originating from Kubernetes.
160 // Attempt to shutdown gracefully. No need to force shutdown since it will send SIGKILL after a timeout.
161 _ = sigterm.recv() => {
162 tracing::info!("received SIGTERM, shutting down...");
163 shutdown.cancel();
164 fut.await;
165 }
166
167 // Proceed with the future.
168 _ = &mut fut => {},
169 }
170 };
171
172 let runtime = tokio::runtime::Builder::new_multi_thread()
173 .thread_name("rw-main")
174 .enable_all()
175 .build()
176 .unwrap();
177
178 runtime.block_on(future_with_shutdown);
179
180 // Shutdown the runtime and exit the process, without waiting for background tasks to finish.
181 // See the doc on this function for more details.
182 // TODO(shutdown): is it necessary to shutdown here as we're going to exit?
183 runtime.shutdown_background();
184 ExitCode::SUCCESS.exit_process();
185}