1#![feature(trait_alias)]
16#![feature(coroutines)]
17#![feature(type_alias_impl_trait)]
18#![feature(impl_trait_in_assoc_type)]
19#![feature(coverage_attribute)]
20#![warn(clippy::large_futures, clippy::large_stack_frames)]
21
22#[macro_use]
23extern crate tracing;
24
25pub mod memory;
26pub mod observer;
27pub mod rpc;
28pub mod server;
29pub mod telemetry;
30
31use std::future::Future;
32use std::pin::Pin;
33use std::sync::Arc;
34
35use clap::{Parser, ValueEnum};
36use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
37use risingwave_common::util::meta_addr::MetaAddressStrategy;
38use risingwave_common::util::resource_util::cpu::total_cpu_available;
39use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
40use risingwave_common::util::tokio_util::sync::CancellationToken;
41use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
42use serde::{Deserialize, Serialize};
43
44const DEFAULT_MEMORY_PROPORTION: f64 = 0.7;
47
48#[derive(Parser, Clone, Debug, OverrideConfig)]
50#[command(
51 version,
52 about = "The worker node that executes query plans and handles data ingestion and output"
53)]
54pub struct ComputeNodeOpts {
55 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5688")]
59 pub listen_addr: String,
60
61 #[clap(long, env = "RW_ADVERTISE_ADDR")]
66 pub advertise_addr: Option<String>,
67
68 #[clap(
71 long,
72 env = "RW_PROMETHEUS_LISTENER_ADDR",
73 default_value = "127.0.0.1:1222"
74 )]
75 pub prometheus_listener_addr: String,
76
77 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
78 pub meta_address: MetaAddressStrategy,
79
80 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
84 pub config_path: String,
85
86 #[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
88 pub total_memory_bytes: usize,
89
90 #[clap(long, env = "RW_RESERVED_MEMORY_BYTES")]
96 pub reserved_memory_bytes: Option<usize>,
97
98 #[clap(long, env = "RW_MEMORY_MANAGER_TARGET_BYTES")]
110 pub memory_manager_target_bytes: Option<usize>,
111
112 #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
114 #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
115 pub parallelism: usize,
116
117 #[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
119 pub resource_group: String,
120
121 #[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
123 pub role: Role,
124
125 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
130 #[override_opts(path = server.metrics_level)]
131 pub metrics_level: Option<MetricLevel>,
132
133 #[clap(long, hide = true, env = "RW_DATA_FILE_CACHE_DIR")]
136 #[override_opts(path = storage.data_file_cache.dir)]
137 pub data_file_cache_dir: Option<String>,
138
139 #[clap(long, hide = true, env = "RW_META_FILE_CACHE_DIR")]
142 #[override_opts(path = storage.meta_file_cache.dir)]
143 pub meta_file_cache_dir: Option<String>,
144
145 #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
147 #[override_opts(path = streaming.async_stack_trace)]
148 pub async_stack_trace: Option<AsyncStackTraceOption>,
149
150 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
152 #[override_opts(path = server.heap_profiling.dir)]
153 pub heap_profiling_dir: Option<String>,
154
155 #[deprecated = "connector node has been deprecated."]
157 #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
158 pub connector_rpc_endpoint: Option<String>,
159
160 #[clap(
162 long,
163 hide = true,
164 env = "RW_TEMP_SECRET_FILE_DIR",
165 default_value = "./secrets"
166 )]
167 pub temp_secret_file_dir: String,
168}
169
170impl risingwave_common::opts::Opts for ComputeNodeOpts {
171 fn name() -> &'static str {
172 "compute"
173 }
174
175 fn meta_addr(&self) -> MetaAddressStrategy {
176 self.meta_address.clone()
177 }
178}
179
180#[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)]
181pub enum Role {
182 Serving,
183 Streaming,
184 #[default]
185 Both,
186}
187
188impl Role {
189 pub fn for_streaming(&self) -> bool {
190 match self {
191 Role::Serving => false,
192 Role::Streaming => true,
193 Role::Both => true,
194 }
195 }
196
197 pub fn for_serving(&self) -> bool {
198 match self {
199 Role::Serving => true,
200 Role::Streaming => false,
201 Role::Both => true,
202 }
203 }
204}
205
206fn validate_opts(opts: &ComputeNodeOpts) {
207 let system_memory_available_bytes = system_memory_available_bytes();
208 if opts.total_memory_bytes > system_memory_available_bytes {
209 let error_msg = format!(
210 "total_memory_bytes {} is larger than the total memory available bytes {} that can be acquired.",
211 opts.total_memory_bytes, system_memory_available_bytes
212 );
213 tracing::error!(error_msg);
214 panic!("{}", error_msg);
215 }
216 if opts.parallelism == 0 {
217 let error_msg = "parallelism should not be zero";
218 tracing::error!(error_msg);
219 panic!("{}", error_msg);
220 }
221 let total_cpu_available = total_cpu_available().ceil() as usize;
222 if opts.parallelism > total_cpu_available {
223 let error_msg = format!(
224 "parallelism {} is larger than the total cpu available {} that can be acquired.",
225 opts.parallelism, total_cpu_available
226 );
227 tracing::warn!(error_msg);
228 }
229}
230
231use crate::server::compute_node_serve;
232
233pub fn start(
235 opts: ComputeNodeOpts,
236 shutdown: CancellationToken,
237) -> Pin<Box<dyn Future<Output = ()> + Send>> {
238 Box::pin(async move {
241 tracing::info!("options: {:?}", opts);
242 validate_opts(&opts);
243
244 let listen_addr = opts.listen_addr.parse().unwrap();
245
246 let advertise_addr = opts
247 .advertise_addr
248 .as_ref()
249 .unwrap_or_else(|| {
250 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
251 &opts.listen_addr
252 })
253 .parse()
254 .unwrap();
255 tracing::info!("advertise addr is {}", advertise_addr);
256
257 compute_node_serve(listen_addr, advertise_addr, Arc::new(opts), shutdown).await;
258 })
259}
260
261pub fn default_total_memory_bytes() -> usize {
262 (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
263}
264
265pub fn default_parallelism() -> usize {
266 total_cpu_available().ceil() as usize
267}
268
269pub fn default_resource_group() -> String {
270 DEFAULT_RESOURCE_GROUP.to_owned()
271}
272
273pub fn default_role() -> Role {
274 Role::Both
275}