1#![feature(trait_alias)]
16#![feature(coroutines)]
17#![feature(type_alias_impl_trait)]
18#![feature(let_chains)]
19#![feature(impl_trait_in_assoc_type)]
20#![cfg_attr(coverage, feature(coverage_attribute))]
21
22#[macro_use]
23extern crate tracing;
24
25pub mod memory;
26pub mod observer;
27pub mod rpc;
28pub mod server;
29pub mod telemetry;
30
31use std::future::Future;
32use std::pin::Pin;
33
34use clap::{Parser, ValueEnum};
35use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
36use risingwave_common::util::meta_addr::MetaAddressStrategy;
37use risingwave_common::util::resource_util::cpu::total_cpu_available;
38use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
39use risingwave_common::util::tokio_util::sync::CancellationToken;
40use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
41use serde::{Deserialize, Serialize};
42
43const DEFAULT_MEMORY_PROPORTION: f64 = 0.7;
46
47#[derive(Parser, Clone, Debug, OverrideConfig)]
49#[command(
50 version,
51 about = "The worker node that executes query plans and handles data ingestion and output"
52)]
53pub struct ComputeNodeOpts {
54 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5688")]
58 pub listen_addr: String,
59
60 #[clap(long, env = "RW_ADVERTISE_ADDR")]
65 pub advertise_addr: Option<String>,
66
67 #[clap(
70 long,
71 env = "RW_PROMETHEUS_LISTENER_ADDR",
72 default_value = "127.0.0.1:1222"
73 )]
74 pub prometheus_listener_addr: String,
75
76 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
77 pub meta_address: MetaAddressStrategy,
78
79 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
83 pub config_path: String,
84
85 #[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
87 pub total_memory_bytes: usize,
88
89 #[clap(long, env = "RW_RESERVED_MEMORY_BYTES")]
95 pub reserved_memory_bytes: Option<usize>,
96
97 #[clap(long, env = "RW_MEMORY_MANAGER_TARGET_BYTES")]
109 pub memory_manager_target_bytes: Option<usize>,
110
111 #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
113 #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
114 pub parallelism: usize,
115
116 #[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
118 pub resource_group: String,
119
120 #[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
122 pub role: Role,
123
124 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
129 #[override_opts(path = server.metrics_level)]
130 pub metrics_level: Option<MetricLevel>,
131
132 #[clap(long, hide = true, env = "RW_DATA_FILE_CACHE_DIR")]
135 #[override_opts(path = storage.data_file_cache.dir)]
136 pub data_file_cache_dir: Option<String>,
137
138 #[clap(long, hide = true, env = "RW_META_FILE_CACHE_DIR")]
141 #[override_opts(path = storage.meta_file_cache.dir)]
142 pub meta_file_cache_dir: Option<String>,
143
144 #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
146 #[override_opts(path = streaming.async_stack_trace)]
147 pub async_stack_trace: Option<AsyncStackTraceOption>,
148
149 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
151 #[override_opts(path = server.heap_profiling.dir)]
152 pub heap_profiling_dir: Option<String>,
153
154 #[deprecated = "connector node has been deprecated."]
156 #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
157 pub connector_rpc_endpoint: Option<String>,
158
159 #[clap(
161 long,
162 hide = true,
163 env = "RW_TEMP_SECRET_FILE_DIR",
164 default_value = "./secrets"
165 )]
166 pub temp_secret_file_dir: String,
167}
168
169impl risingwave_common::opts::Opts for ComputeNodeOpts {
170 fn name() -> &'static str {
171 "compute"
172 }
173
174 fn meta_addr(&self) -> MetaAddressStrategy {
175 self.meta_address.clone()
176 }
177}
178
179#[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)]
180pub enum Role {
181 Serving,
182 Streaming,
183 #[default]
184 Both,
185}
186
187impl Role {
188 pub fn for_streaming(&self) -> bool {
189 match self {
190 Role::Serving => false,
191 Role::Streaming => true,
192 Role::Both => true,
193 }
194 }
195
196 pub fn for_serving(&self) -> bool {
197 match self {
198 Role::Serving => true,
199 Role::Streaming => false,
200 Role::Both => true,
201 }
202 }
203}
204
205fn validate_opts(opts: &ComputeNodeOpts) {
206 let system_memory_available_bytes = system_memory_available_bytes();
207 if opts.total_memory_bytes > system_memory_available_bytes {
208 let error_msg = format!(
209 "total_memory_bytes {} is larger than the total memory available bytes {} that can be acquired.",
210 opts.total_memory_bytes, system_memory_available_bytes
211 );
212 tracing::error!(error_msg);
213 panic!("{}", error_msg);
214 }
215 if opts.parallelism == 0 {
216 let error_msg = "parallelism should not be zero";
217 tracing::error!(error_msg);
218 panic!("{}", error_msg);
219 }
220 let total_cpu_available = total_cpu_available().ceil() as usize;
221 if opts.parallelism > total_cpu_available {
222 let error_msg = format!(
223 "parallelism {} is larger than the total cpu available {} that can be acquired.",
224 opts.parallelism, total_cpu_available
225 );
226 tracing::warn!(error_msg);
227 }
228}
229
230use crate::server::compute_node_serve;
231
232pub fn start(
234 opts: ComputeNodeOpts,
235 shutdown: CancellationToken,
236) -> Pin<Box<dyn Future<Output = ()> + Send>> {
237 Box::pin(async move {
240 tracing::info!("options: {:?}", opts);
241 validate_opts(&opts);
242
243 let listen_addr = opts.listen_addr.parse().unwrap();
244
245 let advertise_addr = opts
246 .advertise_addr
247 .as_ref()
248 .unwrap_or_else(|| {
249 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
250 &opts.listen_addr
251 })
252 .parse()
253 .unwrap();
254 tracing::info!("advertise addr is {}", advertise_addr);
255
256 compute_node_serve(listen_addr, advertise_addr, opts, shutdown).await;
257 })
258}
259
260pub fn default_total_memory_bytes() -> usize {
261 (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
262}
263
264pub fn default_parallelism() -> usize {
265 total_cpu_available().ceil() as usize
266}
267
268pub fn default_resource_group() -> String {
269 DEFAULT_RESOURCE_GROUP.to_owned()
270}
271
272pub fn default_role() -> Role {
273 Role::Both
274}