risingwave_compute/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(trait_alias)]
16#![feature(coroutines)]
17#![feature(type_alias_impl_trait)]
18#![feature(let_chains)]
19#![feature(impl_trait_in_assoc_type)]
20#![feature(coverage_attribute)]
21#![warn(clippy::large_futures, clippy::large_stack_frames)]
22
23#[macro_use]
24extern crate tracing;
25
26pub mod memory;
27pub mod observer;
28pub mod rpc;
29pub mod server;
30pub mod telemetry;
31
32use std::future::Future;
33use std::pin::Pin;
34use std::sync::Arc;
35
36use clap::{Parser, ValueEnum};
37use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
38use risingwave_common::util::meta_addr::MetaAddressStrategy;
39use risingwave_common::util::resource_util::cpu::total_cpu_available;
40use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
41use risingwave_common::util::tokio_util::sync::CancellationToken;
42use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
43use serde::{Deserialize, Serialize};
44
45/// If `total_memory_bytes` is not specified, the default memory limit will be set to
46/// the system memory limit multiplied by this proportion
47const DEFAULT_MEMORY_PROPORTION: f64 = 0.7;
48
49/// Command-line arguments for compute-node.
50#[derive(Parser, Clone, Debug, OverrideConfig)]
51#[command(
52    version,
53    about = "The worker node that executes query plans and handles data ingestion and output"
54)]
55pub struct ComputeNodeOpts {
56    // TODO: rename to listen_addr and separate out the port.
57    /// The address that this service listens to.
58    /// Usually the localhost + desired port.
59    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5688")]
60    pub listen_addr: String,
61
62    /// The address for contacting this instance of the service.
63    /// This would be synonymous with the service's "public address"
64    /// or "identifying address".
65    /// Optional, we will use `listen_addr` if not specified.
66    #[clap(long, env = "RW_ADVERTISE_ADDR")]
67    pub advertise_addr: Option<String>,
68
69    /// We will start a http server at this address via `MetricsManager`.
70    /// Then the prometheus instance will poll the metrics from this address.
71    #[clap(
72        long,
73        env = "RW_PROMETHEUS_LISTENER_ADDR",
74        default_value = "127.0.0.1:1222"
75    )]
76    pub prometheus_listener_addr: String,
77
78    #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
79    pub meta_address: MetaAddressStrategy,
80
81    /// The path of `risingwave.toml` configuration file.
82    ///
83    /// If empty, default configuration values will be used.
84    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
85    pub config_path: String,
86
87    /// Total available memory for the compute node in bytes. Used by both computing and storage.
88    #[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
89    pub total_memory_bytes: usize,
90
91    /// Reserved memory for the compute node in bytes.
92    /// If not set, a portion (default to 30% for the first 16GB and 20% for the rest)
93    /// for the `total_memory_bytes` will be used as the reserved memory.
94    ///
95    /// The total memory compute and storage can use is `total_memory_bytes` - `reserved_memory_bytes`.
96    #[clap(long, env = "RW_RESERVED_MEMORY_BYTES")]
97    pub reserved_memory_bytes: Option<usize>,
98
99    /// Target memory usage for Memory Manager.
100    /// If not set, the default value is `total_memory_bytes` - `reserved_memory_bytes`
101    ///
102    /// It's strongly recommended to set it for standalone deployment.
103    ///
104    /// ## Why need this?
105    ///
106    /// Our [`crate::memory::manager::MemoryManager`] works by reading the memory statistics from
107    /// Jemalloc. This is fine when running the compute node alone; while for standalone mode,
108    /// the memory usage of **all nodes** are counted. Thus, we need to pass a reasonable total
109    /// usage so that the memory is kept around this value.
110    #[clap(long, env = "RW_MEMORY_MANAGER_TARGET_BYTES")]
111    pub memory_manager_target_bytes: Option<usize>,
112
113    /// The parallelism that the compute node will register to the scheduler of the meta service.
114    #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
115    #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
116    pub parallelism: usize,
117
118    /// Resource group for scheduling, default value is "default"
119    #[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
120    pub resource_group: String,
121
122    /// Decides whether the compute node can be used for streaming and serving.
123    #[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
124    pub role: Role,
125
126    /// Used for control the metrics level, similar to log level.
127    ///
128    /// level = 0: disable metrics
129    /// level > 0: enable metrics
130    #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
131    #[override_opts(path = server.metrics_level)]
132    pub metrics_level: Option<MetricLevel>,
133
134    /// Path to data file cache data directory.
135    /// Left empty to disable file cache.
136    #[clap(long, hide = true, env = "RW_DATA_FILE_CACHE_DIR")]
137    #[override_opts(path = storage.data_file_cache.dir)]
138    pub data_file_cache_dir: Option<String>,
139
140    /// Path to meta file cache data directory.
141    /// Left empty to disable file cache.
142    #[clap(long, hide = true, env = "RW_META_FILE_CACHE_DIR")]
143    #[override_opts(path = storage.meta_file_cache.dir)]
144    pub meta_file_cache_dir: Option<String>,
145
146    /// Enable async stack tracing through `await-tree` for risectl.
147    #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
148    #[override_opts(path = streaming.async_stack_trace)]
149    pub async_stack_trace: Option<AsyncStackTraceOption>,
150
151    /// Enable heap profile dump when memory usage is high.
152    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
153    #[override_opts(path = server.heap_profiling.dir)]
154    pub heap_profiling_dir: Option<String>,
155
156    /// Endpoint of the connector node.
157    #[deprecated = "connector node has been deprecated."]
158    #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
159    pub connector_rpc_endpoint: Option<String>,
160
161    /// The path of the temp secret file directory.
162    #[clap(
163        long,
164        hide = true,
165        env = "RW_TEMP_SECRET_FILE_DIR",
166        default_value = "./secrets"
167    )]
168    pub temp_secret_file_dir: String,
169}
170
171impl risingwave_common::opts::Opts for ComputeNodeOpts {
172    fn name() -> &'static str {
173        "compute"
174    }
175
176    fn meta_addr(&self) -> MetaAddressStrategy {
177        self.meta_address.clone()
178    }
179}
180
181#[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)]
182pub enum Role {
183    Serving,
184    Streaming,
185    #[default]
186    Both,
187}
188
189impl Role {
190    pub fn for_streaming(&self) -> bool {
191        match self {
192            Role::Serving => false,
193            Role::Streaming => true,
194            Role::Both => true,
195        }
196    }
197
198    pub fn for_serving(&self) -> bool {
199        match self {
200            Role::Serving => true,
201            Role::Streaming => false,
202            Role::Both => true,
203        }
204    }
205}
206
207fn validate_opts(opts: &ComputeNodeOpts) {
208    let system_memory_available_bytes = system_memory_available_bytes();
209    if opts.total_memory_bytes > system_memory_available_bytes {
210        let error_msg = format!(
211            "total_memory_bytes {} is larger than the total memory available bytes {} that can be acquired.",
212            opts.total_memory_bytes, system_memory_available_bytes
213        );
214        tracing::error!(error_msg);
215        panic!("{}", error_msg);
216    }
217    if opts.parallelism == 0 {
218        let error_msg = "parallelism should not be zero";
219        tracing::error!(error_msg);
220        panic!("{}", error_msg);
221    }
222    let total_cpu_available = total_cpu_available().ceil() as usize;
223    if opts.parallelism > total_cpu_available {
224        let error_msg = format!(
225            "parallelism {} is larger than the total cpu available {} that can be acquired.",
226            opts.parallelism, total_cpu_available
227        );
228        tracing::warn!(error_msg);
229    }
230}
231
232use crate::server::compute_node_serve;
233
234/// Start compute node
235pub fn start(
236    opts: ComputeNodeOpts,
237    shutdown: CancellationToken,
238) -> Pin<Box<dyn Future<Output = ()> + Send>> {
239    // WARNING: don't change the function signature. Making it `async fn` will cause
240    // slow compile in release mode.
241    Box::pin(async move {
242        tracing::info!("options: {:?}", opts);
243        validate_opts(&opts);
244
245        let listen_addr = opts.listen_addr.parse().unwrap();
246
247        let advertise_addr = opts
248            .advertise_addr
249            .as_ref()
250            .unwrap_or_else(|| {
251                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
252                &opts.listen_addr
253            })
254            .parse()
255            .unwrap();
256        tracing::info!("advertise addr is {}", advertise_addr);
257
258        compute_node_serve(listen_addr, advertise_addr, Arc::new(opts), shutdown).await;
259    })
260}
261
262pub fn default_total_memory_bytes() -> usize {
263    (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
264}
265
266pub fn default_parallelism() -> usize {
267    total_cpu_available().ceil() as usize
268}
269
270pub fn default_resource_group() -> String {
271    DEFAULT_RESOURCE_GROUP.to_owned()
272}
273
274pub fn default_role() -> Role {
275    Role::Both
276}