risingwave_compute/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(trait_alias)]
16#![feature(coroutines)]
17#![feature(type_alias_impl_trait)]
18#![feature(let_chains)]
19#![feature(impl_trait_in_assoc_type)]
20#![cfg_attr(coverage, feature(coverage_attribute))]
21
22#[macro_use]
23extern crate tracing;
24
25pub mod memory;
26pub mod observer;
27pub mod rpc;
28pub mod server;
29pub mod telemetry;
30
31use std::future::Future;
32use std::pin::Pin;
33
34use clap::{Parser, ValueEnum};
35use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
36use risingwave_common::util::meta_addr::MetaAddressStrategy;
37use risingwave_common::util::resource_util::cpu::total_cpu_available;
38use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
39use risingwave_common::util::tokio_util::sync::CancellationToken;
40use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
41use serde::{Deserialize, Serialize};
42
43/// If `total_memory_bytes` is not specified, the default memory limit will be set to
44/// the system memory limit multiplied by this proportion
45const DEFAULT_MEMORY_PROPORTION: f64 = 0.7;
46
47/// Command-line arguments for compute-node.
48#[derive(Parser, Clone, Debug, OverrideConfig)]
49#[command(
50    version,
51    about = "The worker node that executes query plans and handles data ingestion and output"
52)]
53pub struct ComputeNodeOpts {
54    // TODO: rename to listen_addr and separate out the port.
55    /// The address that this service listens to.
56    /// Usually the localhost + desired port.
57    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5688")]
58    pub listen_addr: String,
59
60    /// The address for contacting this instance of the service.
61    /// This would be synonymous with the service's "public address"
62    /// or "identifying address".
63    /// Optional, we will use `listen_addr` if not specified.
64    #[clap(long, env = "RW_ADVERTISE_ADDR")]
65    pub advertise_addr: Option<String>,
66
67    /// We will start a http server at this address via `MetricsManager`.
68    /// Then the prometheus instance will poll the metrics from this address.
69    #[clap(
70        long,
71        env = "RW_PROMETHEUS_LISTENER_ADDR",
72        default_value = "127.0.0.1:1222"
73    )]
74    pub prometheus_listener_addr: String,
75
76    #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
77    pub meta_address: MetaAddressStrategy,
78
79    /// The path of `risingwave.toml` configuration file.
80    ///
81    /// If empty, default configuration values will be used.
82    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
83    pub config_path: String,
84
85    /// Total available memory for the compute node in bytes. Used by both computing and storage.
86    #[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
87    pub total_memory_bytes: usize,
88
89    /// Reserved memory for the compute node in bytes.
90    /// If not set, a portion (default to 30% for the first 16GB and 20% for the rest)
91    /// for the `total_memory_bytes` will be used as the reserved memory.
92    ///
93    /// The total memory compute and storage can use is `total_memory_bytes` - `reserved_memory_bytes`.
94    #[clap(long, env = "RW_RESERVED_MEMORY_BYTES")]
95    pub reserved_memory_bytes: Option<usize>,
96
97    /// Target memory usage for Memory Manager.
98    /// If not set, the default value is `total_memory_bytes` - `reserved_memory_bytes`
99    ///
100    /// It's strongly recommended to set it for standalone deployment.
101    ///
102    /// ## Why need this?
103    ///
104    /// Our [`crate::memory::manager::MemoryManager`] works by reading the memory statistics from
105    /// Jemalloc. This is fine when running the compute node alone; while for standalone mode,
106    /// the memory usage of **all nodes** are counted. Thus, we need to pass a reasonable total
107    /// usage so that the memory is kept around this value.
108    #[clap(long, env = "RW_MEMORY_MANAGER_TARGET_BYTES")]
109    pub memory_manager_target_bytes: Option<usize>,
110
111    /// The parallelism that the compute node will register to the scheduler of the meta service.
112    #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
113    #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
114    pub parallelism: usize,
115
116    /// Resource group for scheduling, default value is "default"
117    #[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
118    pub resource_group: String,
119
120    /// Decides whether the compute node can be used for streaming and serving.
121    #[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
122    pub role: Role,
123
124    /// Used for control the metrics level, similar to log level.
125    ///
126    /// level = 0: disable metrics
127    /// level > 0: enable metrics
128    #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
129    #[override_opts(path = server.metrics_level)]
130    pub metrics_level: Option<MetricLevel>,
131
132    /// Path to data file cache data directory.
133    /// Left empty to disable file cache.
134    #[clap(long, hide = true, env = "RW_DATA_FILE_CACHE_DIR")]
135    #[override_opts(path = storage.data_file_cache.dir)]
136    pub data_file_cache_dir: Option<String>,
137
138    /// Path to meta file cache data directory.
139    /// Left empty to disable file cache.
140    #[clap(long, hide = true, env = "RW_META_FILE_CACHE_DIR")]
141    #[override_opts(path = storage.meta_file_cache.dir)]
142    pub meta_file_cache_dir: Option<String>,
143
144    /// Enable async stack tracing through `await-tree` for risectl.
145    #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
146    #[override_opts(path = streaming.async_stack_trace)]
147    pub async_stack_trace: Option<AsyncStackTraceOption>,
148
149    /// Enable heap profile dump when memory usage is high.
150    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
151    #[override_opts(path = server.heap_profiling.dir)]
152    pub heap_profiling_dir: Option<String>,
153
154    /// Endpoint of the connector node.
155    #[deprecated = "connector node has been deprecated."]
156    #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
157    pub connector_rpc_endpoint: Option<String>,
158
159    /// The path of the temp secret file directory.
160    #[clap(
161        long,
162        hide = true,
163        env = "RW_TEMP_SECRET_FILE_DIR",
164        default_value = "./secrets"
165    )]
166    pub temp_secret_file_dir: String,
167}
168
169impl risingwave_common::opts::Opts for ComputeNodeOpts {
170    fn name() -> &'static str {
171        "compute"
172    }
173
174    fn meta_addr(&self) -> MetaAddressStrategy {
175        self.meta_address.clone()
176    }
177}
178
179#[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)]
180pub enum Role {
181    Serving,
182    Streaming,
183    #[default]
184    Both,
185}
186
187impl Role {
188    pub fn for_streaming(&self) -> bool {
189        match self {
190            Role::Serving => false,
191            Role::Streaming => true,
192            Role::Both => true,
193        }
194    }
195
196    pub fn for_serving(&self) -> bool {
197        match self {
198            Role::Serving => true,
199            Role::Streaming => false,
200            Role::Both => true,
201        }
202    }
203}
204
205fn validate_opts(opts: &ComputeNodeOpts) {
206    let system_memory_available_bytes = system_memory_available_bytes();
207    if opts.total_memory_bytes > system_memory_available_bytes {
208        let error_msg = format!(
209            "total_memory_bytes {} is larger than the total memory available bytes {} that can be acquired.",
210            opts.total_memory_bytes, system_memory_available_bytes
211        );
212        tracing::error!(error_msg);
213        panic!("{}", error_msg);
214    }
215    if opts.parallelism == 0 {
216        let error_msg = "parallelism should not be zero";
217        tracing::error!(error_msg);
218        panic!("{}", error_msg);
219    }
220    let total_cpu_available = total_cpu_available().ceil() as usize;
221    if opts.parallelism > total_cpu_available {
222        let error_msg = format!(
223            "parallelism {} is larger than the total cpu available {} that can be acquired.",
224            opts.parallelism, total_cpu_available
225        );
226        tracing::warn!(error_msg);
227    }
228}
229
230use crate::server::compute_node_serve;
231
232/// Start compute node
233pub fn start(
234    opts: ComputeNodeOpts,
235    shutdown: CancellationToken,
236) -> Pin<Box<dyn Future<Output = ()> + Send>> {
237    // WARNING: don't change the function signature. Making it `async fn` will cause
238    // slow compile in release mode.
239    Box::pin(async move {
240        tracing::info!("options: {:?}", opts);
241        validate_opts(&opts);
242
243        let listen_addr = opts.listen_addr.parse().unwrap();
244
245        let advertise_addr = opts
246            .advertise_addr
247            .as_ref()
248            .unwrap_or_else(|| {
249                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
250                &opts.listen_addr
251            })
252            .parse()
253            .unwrap();
254        tracing::info!("advertise addr is {}", advertise_addr);
255
256        compute_node_serve(listen_addr, advertise_addr, opts, shutdown).await;
257    })
258}
259
260pub fn default_total_memory_bytes() -> usize {
261    (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
262}
263
264pub fn default_parallelism() -> usize {
265    total_cpu_available().ceil() as usize
266}
267
268pub fn default_resource_group() -> String {
269    DEFAULT_RESOURCE_GROUP.to_owned()
270}
271
272pub fn default_role() -> Role {
273    Role::Both
274}