risingwave_rt/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Configures the RisingWave binary, including logging, locks, panic handler, etc.
//!
//! See [`init_risingwave_logger`] and [`main_okk`] for more details, including supported
//! environment variables.
#![feature(panic_update_hook)]
#![feature(let_chains)]
#![feature(exitcode_exit_method)]
use std::pin::pin;
use std::process::ExitCode;
use futures::Future;
use risingwave_common::util::tokio_util::sync::CancellationToken;
mod logger;
pub use logger::*;
mod deadlock;
pub use deadlock::*;
mod panic_hook;
pub use panic_hook::*;
mod prof;
use prof::*;
use tokio::signal::unix::SignalKind;
const MIN_WORKER_THREADS: usize = 4;
/// Start RisingWave components with configs from environment variable.
///
/// # Shutdown on Ctrl-C
///
/// The given closure `f` will take a [`CancellationToken`] as an argument. When a `SIGINT` signal
/// is received (typically by pressing `Ctrl-C`), [`CancellationToken::cancel`] will be called to
/// notify all subscribers to shutdown. You can use [`.cancelled()`](CancellationToken::cancelled)
/// to get notified on this.
///
/// Users may also send a second `SIGINT` signal to force shutdown. In this case, this function
/// will exit the process with a non-zero exit code.
///
/// When `f` returns, this function will assume that the component has finished its work and it's
/// safe to exit. Therefore, this function will exit the process with exit code 0 **without**
/// waiting for background tasks to finish. In other words, it's the responsibility of `f` to
/// ensure that all essential background tasks are finished before returning.
///
/// # Environment variables
///
/// Currently, the following environment variables will be read and used to configure the runtime.
///
/// * `RW_WORKER_THREADS` (alias of `TOKIO_WORKER_THREADS`): number of tokio worker threads. If
/// not set, it will be decided by tokio. Note that this can still be overridden by per-module
/// runtime worker thread settings in the config file.
/// * `RW_DEADLOCK_DETECTION`: whether to enable deadlock detection. If not set, will enable in
/// debug mode, and disable in release mode.
/// * `RW_PROFILE_PATH`: the path to generate flamegraph. If set, then profiling is automatically
/// enabled.
pub fn main_okk<F, Fut>(f: F) -> !
where
F: FnOnce(CancellationToken) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
set_panic_hook();
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.inspect_err(|e| {
tracing::error!(?e, "Failed to install default crypto provider.");
})
.unwrap();
risingwave_variables::init_server_start_time();
// `TOKIO` will be read by tokio. Duplicate `RW` for compatibility.
if let Some(worker_threads) = std::env::var_os("RW_WORKER_THREADS") {
std::env::set_var("TOKIO_WORKER_THREADS", worker_threads);
}
// Set the default number of worker threads to be at least `MIN_WORKER_THREADS`, in production.
if !cfg!(debug_assertions) {
let worker_threads = match std::env::var("TOKIO_WORKER_THREADS") {
Ok(v) => v
.parse::<usize>()
.expect("Failed to parse TOKIO_WORKER_THREADS"),
Err(std::env::VarError::NotPresent) => std::thread::available_parallelism()
.expect("Failed to get available parallelism")
.get(),
Err(_) => panic!("Failed to parse TOKIO_WORKER_THREADS"),
};
if worker_threads < MIN_WORKER_THREADS {
tracing::warn!("the default number of worker threads ({worker_threads}) is too small, which may lead to issues, increasing to {MIN_WORKER_THREADS}");
std::env::set_var("TOKIO_WORKER_THREADS", MIN_WORKER_THREADS.to_string());
}
}
if let Ok(enable_deadlock_detection) = std::env::var("RW_DEADLOCK_DETECTION") {
let enable_deadlock_detection = enable_deadlock_detection
.parse()
.expect("Failed to parse RW_DEADLOCK_DETECTION");
if enable_deadlock_detection {
enable_parking_lot_deadlock_detection();
}
} else {
// In case the env variable is not set
if cfg!(debug_assertions) {
enable_parking_lot_deadlock_detection();
}
}
if let Ok(profile_path) = std::env::var("RW_PROFILE_PATH") {
spawn_prof_thread(profile_path);
}
let future_with_shutdown = async move {
let shutdown = CancellationToken::new();
let mut fut = pin!(f(shutdown.clone()));
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
tokio::select! {
biased;
// Watch SIGINT, typically originating from user pressing ctrl-c.
// Attempt to shutdown gracefully and force shutdown on the next signal.
_ = sigint.recv() => {
tracing::info!("received ctrl-c, shutting down... (press ctrl-c again to force shutdown)");
shutdown.cancel();
// While waiting for the future to finish, listen for the second ctrl-c signal.
tokio::select! {
biased;
_ = sigint.recv() => {
tracing::warn!("forced shutdown");
// Directly exit the process **here** instead of returning from the future, since
// we don't even want to run destructors but only exit as soon as possible.
ExitCode::FAILURE.exit_process();
}
_ = &mut fut => {},
}
}
// Watch SIGTERM, typically originating from Kubernetes.
// Attempt to shutdown gracefully. No need to force shutdown since it will send SIGKILL after a timeout.
_ = sigterm.recv() => {
tracing::info!("received SIGTERM, shutting down...");
shutdown.cancel();
fut.await;
}
// Proceed with the future.
_ = &mut fut => {},
}
};
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("rw-main")
.enable_all()
.build()
.unwrap();
runtime.block_on(future_with_shutdown);
// Shutdown the runtime and exit the process, without waiting for background tasks to finish.
// See the doc on this function for more details.
// TODO(shutdown): is it necessary to shutdown here as we're going to exit?
runtime.shutdown_background();
ExitCode::SUCCESS.exit_process();
}