1#![feature(trait_alias)]
16#![feature(coroutines)]
17#![feature(type_alias_impl_trait)]
18#![feature(let_chains)]
19#![feature(impl_trait_in_assoc_type)]
20#![feature(coverage_attribute)]
21#![warn(clippy::large_futures, clippy::large_stack_frames)]
22
23#[macro_use]
24extern crate tracing;
25
26pub mod memory;
27pub mod observer;
28pub mod rpc;
29pub mod server;
30pub mod telemetry;
31
32use std::future::Future;
33use std::pin::Pin;
34use std::sync::Arc;
35
36use clap::{Parser, ValueEnum};
37use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
38use risingwave_common::util::meta_addr::MetaAddressStrategy;
39use risingwave_common::util::resource_util::cpu::total_cpu_available;
40use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
41use risingwave_common::util::tokio_util::sync::CancellationToken;
42use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
43use serde::{Deserialize, Serialize};
44
45const DEFAULT_MEMORY_PROPORTION: f64 = 0.7;
48
49#[derive(Parser, Clone, Debug, OverrideConfig)]
51#[command(
52 version,
53 about = "The worker node that executes query plans and handles data ingestion and output"
54)]
55pub struct ComputeNodeOpts {
56 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5688")]
60 pub listen_addr: String,
61
62 #[clap(long, env = "RW_ADVERTISE_ADDR")]
67 pub advertise_addr: Option<String>,
68
69 #[clap(
72 long,
73 env = "RW_PROMETHEUS_LISTENER_ADDR",
74 default_value = "127.0.0.1:1222"
75 )]
76 pub prometheus_listener_addr: String,
77
78 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
79 pub meta_address: MetaAddressStrategy,
80
81 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
85 pub config_path: String,
86
87 #[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
89 pub total_memory_bytes: usize,
90
91 #[clap(long, env = "RW_RESERVED_MEMORY_BYTES")]
97 pub reserved_memory_bytes: Option<usize>,
98
99 #[clap(long, env = "RW_MEMORY_MANAGER_TARGET_BYTES")]
111 pub memory_manager_target_bytes: Option<usize>,
112
113 #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
115 #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
116 pub parallelism: usize,
117
118 #[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
120 pub resource_group: String,
121
122 #[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
124 pub role: Role,
125
126 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
131 #[override_opts(path = server.metrics_level)]
132 pub metrics_level: Option<MetricLevel>,
133
134 #[clap(long, hide = true, env = "RW_DATA_FILE_CACHE_DIR")]
137 #[override_opts(path = storage.data_file_cache.dir)]
138 pub data_file_cache_dir: Option<String>,
139
140 #[clap(long, hide = true, env = "RW_META_FILE_CACHE_DIR")]
143 #[override_opts(path = storage.meta_file_cache.dir)]
144 pub meta_file_cache_dir: Option<String>,
145
146 #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
148 #[override_opts(path = streaming.async_stack_trace)]
149 pub async_stack_trace: Option<AsyncStackTraceOption>,
150
151 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
153 #[override_opts(path = server.heap_profiling.dir)]
154 pub heap_profiling_dir: Option<String>,
155
156 #[deprecated = "connector node has been deprecated."]
158 #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
159 pub connector_rpc_endpoint: Option<String>,
160
161 #[clap(
163 long,
164 hide = true,
165 env = "RW_TEMP_SECRET_FILE_DIR",
166 default_value = "./secrets"
167 )]
168 pub temp_secret_file_dir: String,
169}
170
171impl risingwave_common::opts::Opts for ComputeNodeOpts {
172 fn name() -> &'static str {
173 "compute"
174 }
175
176 fn meta_addr(&self) -> MetaAddressStrategy {
177 self.meta_address.clone()
178 }
179}
180
181#[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)]
182pub enum Role {
183 Serving,
184 Streaming,
185 #[default]
186 Both,
187}
188
189impl Role {
190 pub fn for_streaming(&self) -> bool {
191 match self {
192 Role::Serving => false,
193 Role::Streaming => true,
194 Role::Both => true,
195 }
196 }
197
198 pub fn for_serving(&self) -> bool {
199 match self {
200 Role::Serving => true,
201 Role::Streaming => false,
202 Role::Both => true,
203 }
204 }
205}
206
207fn validate_opts(opts: &ComputeNodeOpts) {
208 let system_memory_available_bytes = system_memory_available_bytes();
209 if opts.total_memory_bytes > system_memory_available_bytes {
210 let error_msg = format!(
211 "total_memory_bytes {} is larger than the total memory available bytes {} that can be acquired.",
212 opts.total_memory_bytes, system_memory_available_bytes
213 );
214 tracing::error!(error_msg);
215 panic!("{}", error_msg);
216 }
217 if opts.parallelism == 0 {
218 let error_msg = "parallelism should not be zero";
219 tracing::error!(error_msg);
220 panic!("{}", error_msg);
221 }
222 let total_cpu_available = total_cpu_available().ceil() as usize;
223 if opts.parallelism > total_cpu_available {
224 let error_msg = format!(
225 "parallelism {} is larger than the total cpu available {} that can be acquired.",
226 opts.parallelism, total_cpu_available
227 );
228 tracing::warn!(error_msg);
229 }
230}
231
232use crate::server::compute_node_serve;
233
234pub fn start(
236 opts: ComputeNodeOpts,
237 shutdown: CancellationToken,
238) -> Pin<Box<dyn Future<Output = ()> + Send>> {
239 Box::pin(async move {
242 tracing::info!("options: {:?}", opts);
243 validate_opts(&opts);
244
245 let listen_addr = opts.listen_addr.parse().unwrap();
246
247 let advertise_addr = opts
248 .advertise_addr
249 .as_ref()
250 .unwrap_or_else(|| {
251 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
252 &opts.listen_addr
253 })
254 .parse()
255 .unwrap();
256 tracing::info!("advertise addr is {}", advertise_addr);
257
258 compute_node_serve(listen_addr, advertise_addr, Arc::new(opts), shutdown).await;
259 })
260}
261
262pub fn default_total_memory_bytes() -> usize {
263 (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
264}
265
266pub fn default_parallelism() -> usize {
267 total_cpu_available().ceil() as usize
268}
269
270pub fn default_resource_group() -> String {
271 DEFAULT_RESOURCE_GROUP.to_owned()
272}
273
274pub fn default_role() -> Role {
275 Role::Both
276}