risingwave_rt/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configures the RisingWave binary, including logging, locks, panic handler, etc.
16//!
17//! See [`init_risingwave_logger`] and [`main_okk`] for more details, including supported
18//! environment variables.
19
20#![feature(panic_update_hook)]
21#![feature(let_chains)]
22#![feature(exitcode_exit_method)]
23
24use std::pin::pin;
25use std::process::ExitCode;
26
27use futures::Future;
28use risingwave_common::util::tokio_util::sync::CancellationToken;
29
30mod logger;
31pub use logger::*;
32mod deadlock;
33pub use deadlock::*;
34mod panic_hook;
35pub use panic_hook::*;
36mod prof;
37use prof::*;
38use tokio::signal::unix::SignalKind;
39
40const MIN_WORKER_THREADS: usize = 4;
41
42/// Start RisingWave components with configs from environment variable.
43///
44/// # Shutdown on Ctrl-C
45///
46/// The given closure `f` will take a [`CancellationToken`] as an argument. When a `SIGINT` signal
47/// is received (typically by pressing `Ctrl-C`), [`CancellationToken::cancel`] will be called to
48/// notify all subscribers to shutdown. You can use [`.cancelled()`](CancellationToken::cancelled)
49/// to get notified on this.
50///
51/// Users may also send a second `SIGINT` signal to force shutdown. In this case, this function
52/// will exit the process with a non-zero exit code.
53///
54/// When `f` returns, this function will assume that the component has finished its work and it's
55/// safe to exit. Therefore, this function will exit the process with exit code 0 **without**
56/// waiting for background tasks to finish. In other words, it's the responsibility of `f` to
57/// ensure that all essential background tasks are finished before returning.
58///
59/// # Environment variables
60///
61/// Currently, the following environment variables will be read and used to configure the runtime.
62///
63/// * `RW_WORKER_THREADS` (alias of `TOKIO_WORKER_THREADS`): number of tokio worker threads. If
64///   not set, it will be decided by tokio. Note that this can still be overridden by per-module
65///   runtime worker thread settings in the config file.
66/// * `RW_DEADLOCK_DETECTION`: whether to enable deadlock detection. If not set, will enable in
67///   debug mode, and disable in release mode.
68/// * `RW_PROFILE_PATH`: the path to generate flamegraph. If set, then profiling is automatically
69///   enabled.
70pub fn main_okk<F, Fut>(f: F) -> !
71where
72    F: FnOnce(CancellationToken) -> Fut,
73    Fut: Future<Output = ()> + Send + 'static,
74{
75    set_panic_hook();
76
77    rustls::crypto::ring::default_provider()
78        .install_default()
79        .inspect_err(|e| {
80            tracing::error!(?e, "Failed to install default crypto provider.");
81        })
82        .unwrap();
83    risingwave_variables::init_server_start_time();
84
85    // `TOKIO` will be read by tokio. Duplicate `RW` for compatibility.
86    if let Some(worker_threads) = std::env::var_os("RW_WORKER_THREADS") {
87        // safety: single-threaded now.
88        unsafe { std::env::set_var("TOKIO_WORKER_THREADS", worker_threads) };
89    }
90
91    // Set the default number of worker threads to be at least `MIN_WORKER_THREADS`, in production.
92    if !cfg!(debug_assertions) {
93        let worker_threads = match std::env::var("TOKIO_WORKER_THREADS") {
94            Ok(v) => v
95                .parse::<usize>()
96                .expect("Failed to parse TOKIO_WORKER_THREADS"),
97            Err(std::env::VarError::NotPresent) => std::thread::available_parallelism()
98                .expect("Failed to get available parallelism")
99                .get(),
100            Err(_) => panic!("Failed to parse TOKIO_WORKER_THREADS"),
101        };
102        if worker_threads < MIN_WORKER_THREADS {
103            tracing::warn!(
104                "the default number of worker threads ({worker_threads}) is too small, which may lead to issues, increasing to {MIN_WORKER_THREADS}"
105            );
106            // safety: single-threaded now.
107            unsafe { std::env::set_var("TOKIO_WORKER_THREADS", MIN_WORKER_THREADS.to_string()) };
108        }
109    }
110
111    if let Ok(enable_deadlock_detection) = std::env::var("RW_DEADLOCK_DETECTION") {
112        let enable_deadlock_detection = enable_deadlock_detection
113            .parse()
114            .expect("Failed to parse RW_DEADLOCK_DETECTION");
115        if enable_deadlock_detection {
116            enable_parking_lot_deadlock_detection();
117        }
118    } else {
119        // In case the env variable is not set
120        if cfg!(debug_assertions) {
121            enable_parking_lot_deadlock_detection();
122        }
123    }
124
125    if let Ok(profile_path) = std::env::var("RW_PROFILE_PATH") {
126        spawn_prof_thread(profile_path);
127    }
128
129    let future_with_shutdown = async move {
130        let shutdown = CancellationToken::new();
131        let mut fut = pin!(f(shutdown.clone()));
132
133        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
134        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
135
136        tokio::select! {
137            biased;
138
139            // Watch SIGINT, typically originating from user pressing ctrl-c.
140            // Attempt to shutdown gracefully and force shutdown on the next signal.
141            _ = sigint.recv() => {
142                tracing::info!("received ctrl-c, shutting down... (press ctrl-c again to force shutdown)");
143                shutdown.cancel();
144
145                // While waiting for the future to finish, listen for the second ctrl-c signal.
146                tokio::select! {
147                    biased;
148                    _ = sigint.recv() => {
149                        tracing::warn!("forced shutdown");
150
151                        // Directly exit the process **here** instead of returning from the future, since
152                        // we don't even want to run destructors but only exit as soon as possible.
153                        ExitCode::FAILURE.exit_process();
154                    }
155                    _ = &mut fut => {},
156                }
157            }
158
159            // Watch SIGTERM, typically originating from Kubernetes.
160            // Attempt to shutdown gracefully. No need to force shutdown since it will send SIGKILL after a timeout.
161            _ = sigterm.recv() => {
162                tracing::info!("received SIGTERM, shutting down...");
163                shutdown.cancel();
164                fut.await;
165            }
166
167            // Proceed with the future.
168            _ = &mut fut => {},
169        }
170    };
171
172    let runtime = tokio::runtime::Builder::new_multi_thread()
173        .thread_name("rw-main")
174        .enable_all()
175        .build()
176        .unwrap();
177
178    runtime.block_on(future_with_shutdown);
179
180    // Shutdown the runtime and exit the process, without waiting for background tasks to finish.
181    // See the doc on this function for more details.
182    // TODO(shutdown): is it necessary to shutdown here as we're going to exit?
183    runtime.shutdown_background();
184    ExitCode::SUCCESS.exit_process();
185}