risingwave_compute/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(trait_alias)]
16#![feature(coroutines)]
17#![feature(type_alias_impl_trait)]
18#![feature(impl_trait_in_assoc_type)]
19#![feature(coverage_attribute)]
20#![warn(clippy::large_futures, clippy::large_stack_frames)]
21
22#[macro_use]
23extern crate tracing;
24
25pub mod memory;
26pub mod observer;
27pub mod rpc;
28pub mod server;
29pub mod telemetry;
30
31use std::future::Future;
32use std::pin::Pin;
33use std::sync::Arc;
34
35use clap::{Parser, ValueEnum};
36use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
37use risingwave_common::util::meta_addr::MetaAddressStrategy;
38use risingwave_common::util::resource_util::cpu::total_cpu_available;
39use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
40use risingwave_common::util::tokio_util::sync::CancellationToken;
41use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
42use serde::{Deserialize, Serialize};
43
44/// If `total_memory_bytes` is not specified, the default memory limit will be set to
45/// the system memory limit multiplied by this proportion
46const DEFAULT_MEMORY_PROPORTION: f64 = 0.7;
47
48/// Command-line arguments for compute-node.
49#[derive(Parser, Clone, Debug, OverrideConfig)]
50#[command(
51    version,
52    about = "The worker node that executes query plans and handles data ingestion and output"
53)]
54pub struct ComputeNodeOpts {
55    // TODO: rename to listen_addr and separate out the port.
56    /// The address that this service listens to.
57    /// Usually the localhost + desired port.
58    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5688")]
59    pub listen_addr: String,
60
61    /// The address for contacting this instance of the service.
62    /// This would be synonymous with the service's "public address"
63    /// or "identifying address".
64    /// Optional, we will use `listen_addr` if not specified.
65    #[clap(long, env = "RW_ADVERTISE_ADDR")]
66    pub advertise_addr: Option<String>,
67
68    /// We will start a http server at this address via `MetricsManager`.
69    /// Then the prometheus instance will poll the metrics from this address.
70    #[clap(
71        long,
72        env = "RW_PROMETHEUS_LISTENER_ADDR",
73        default_value = "127.0.0.1:1222"
74    )]
75    pub prometheus_listener_addr: String,
76
77    #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
78    pub meta_address: MetaAddressStrategy,
79
80    /// The path of `risingwave.toml` configuration file.
81    ///
82    /// If empty, default configuration values will be used.
83    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
84    pub config_path: String,
85
86    /// Total available memory for the compute node in bytes. Used by both computing and storage.
87    #[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
88    pub total_memory_bytes: usize,
89
90    /// Reserved memory for the compute node in bytes.
91    /// If not set, a portion (default to 30% for the first 16GB and 20% for the rest)
92    /// for the `total_memory_bytes` will be used as the reserved memory.
93    ///
94    /// The total memory compute and storage can use is `total_memory_bytes` - `reserved_memory_bytes`.
95    #[clap(long, env = "RW_RESERVED_MEMORY_BYTES")]
96    pub reserved_memory_bytes: Option<usize>,
97
98    /// Target memory usage for Memory Manager.
99    /// If not set, the default value is `total_memory_bytes` - `reserved_memory_bytes`
100    ///
101    /// It's strongly recommended to set it for standalone deployment.
102    ///
103    /// ## Why need this?
104    ///
105    /// Our [`crate::memory::manager::MemoryManager`] works by reading the memory statistics from
106    /// Jemalloc. This is fine when running the compute node alone; while for standalone mode,
107    /// the memory usage of **all nodes** are counted. Thus, we need to pass a reasonable total
108    /// usage so that the memory is kept around this value.
109    #[clap(long, env = "RW_MEMORY_MANAGER_TARGET_BYTES")]
110    pub memory_manager_target_bytes: Option<usize>,
111
112    /// The parallelism that the compute node will register to the scheduler of the meta service.
113    #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
114    #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
115    pub parallelism: usize,
116
117    /// Resource group for scheduling, default value is "default"
118    #[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
119    pub resource_group: String,
120
121    /// Decides whether the compute node can be used for streaming and serving.
122    #[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
123    pub role: Role,
124
125    /// Used for control the metrics level, similar to log level.
126    ///
127    /// level = 0: disable metrics
128    /// level > 0: enable metrics
129    #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
130    #[override_opts(path = server.metrics_level)]
131    pub metrics_level: Option<MetricLevel>,
132
133    /// Path to data file cache data directory.
134    /// Left empty to disable file cache.
135    #[clap(long, hide = true, env = "RW_DATA_FILE_CACHE_DIR")]
136    #[override_opts(path = storage.data_file_cache.dir)]
137    pub data_file_cache_dir: Option<String>,
138
139    /// Path to meta file cache data directory.
140    /// Left empty to disable file cache.
141    #[clap(long, hide = true, env = "RW_META_FILE_CACHE_DIR")]
142    #[override_opts(path = storage.meta_file_cache.dir)]
143    pub meta_file_cache_dir: Option<String>,
144
145    /// Enable async stack tracing through `await-tree` for risectl.
146    #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
147    #[override_opts(path = streaming.async_stack_trace)]
148    pub async_stack_trace: Option<AsyncStackTraceOption>,
149
150    /// Enable heap profile dump when memory usage is high.
151    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
152    #[override_opts(path = server.heap_profiling.dir)]
153    pub heap_profiling_dir: Option<String>,
154
155    /// Endpoint of the connector node.
156    #[deprecated = "connector node has been deprecated."]
157    #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
158    pub connector_rpc_endpoint: Option<String>,
159
160    /// The path of the temp secret file directory.
161    #[clap(
162        long,
163        hide = true,
164        env = "RW_TEMP_SECRET_FILE_DIR",
165        default_value = "./secrets"
166    )]
167    pub temp_secret_file_dir: String,
168}
169
170impl risingwave_common::opts::Opts for ComputeNodeOpts {
171    fn name() -> &'static str {
172        "compute"
173    }
174
175    fn meta_addr(&self) -> MetaAddressStrategy {
176        self.meta_address.clone()
177    }
178}
179
180#[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)]
181pub enum Role {
182    Serving,
183    Streaming,
184    #[default]
185    Both,
186}
187
188impl Role {
189    pub fn for_streaming(&self) -> bool {
190        match self {
191            Role::Serving => false,
192            Role::Streaming => true,
193            Role::Both => true,
194        }
195    }
196
197    pub fn for_serving(&self) -> bool {
198        match self {
199            Role::Serving => true,
200            Role::Streaming => false,
201            Role::Both => true,
202        }
203    }
204}
205
206fn validate_opts(opts: &ComputeNodeOpts) {
207    let system_memory_available_bytes = system_memory_available_bytes();
208    if opts.total_memory_bytes > system_memory_available_bytes {
209        let error_msg = format!(
210            "total_memory_bytes {} is larger than the total memory available bytes {} that can be acquired.",
211            opts.total_memory_bytes, system_memory_available_bytes
212        );
213        tracing::error!(error_msg);
214        panic!("{}", error_msg);
215    }
216    if opts.parallelism == 0 {
217        let error_msg = "parallelism should not be zero";
218        tracing::error!(error_msg);
219        panic!("{}", error_msg);
220    }
221    let total_cpu_available = total_cpu_available().ceil() as usize;
222    if opts.parallelism > total_cpu_available {
223        let error_msg = format!(
224            "parallelism {} is larger than the total cpu available {} that can be acquired.",
225            opts.parallelism, total_cpu_available
226        );
227        tracing::warn!(error_msg);
228    }
229}
230
231use crate::server::compute_node_serve;
232
233/// Start compute node
234pub fn start(
235    opts: ComputeNodeOpts,
236    shutdown: CancellationToken,
237) -> Pin<Box<dyn Future<Output = ()> + Send>> {
238    // WARNING: don't change the function signature. Making it `async fn` will cause
239    // slow compile in release mode.
240    Box::pin(async move {
241        tracing::info!("options: {:?}", opts);
242        validate_opts(&opts);
243
244        let listen_addr = opts.listen_addr.parse().unwrap();
245
246        let advertise_addr = opts
247            .advertise_addr
248            .as_ref()
249            .unwrap_or_else(|| {
250                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
251                &opts.listen_addr
252            })
253            .parse()
254            .unwrap();
255        tracing::info!("advertise addr is {}", advertise_addr);
256
257        compute_node_serve(listen_addr, advertise_addr, Arc::new(opts), shutdown).await;
258    })
259}
260
261pub fn default_total_memory_bytes() -> usize {
262    (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
263}
264
265pub fn default_parallelism() -> usize {
266    total_cpu_available().ceil() as usize
267}
268
269pub fn default_resource_group() -> String {
270    DEFAULT_RESOURCE_GROUP.to_owned()
271}
272
273pub fn default_role() -> Role {
274    Role::Both
275}