1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_batch::monitor::{
20 GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
21};
22use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
23use risingwave_batch::spill::spill_op::SpillOp;
24use risingwave_batch::task::{BatchEnvironment, BatchManager};
25use risingwave_common::config::{
26 AsyncStackTraceOption, MAX_CONNECTION_WINDOW_SIZE, MetricLevel, STREAM_WINDOW_SIZE,
27 StorageMemoryConfig, load_config,
28};
29use risingwave_common::lru::init_global_sequencer_args;
30use risingwave_common::monitor::{RouterExt, TcpConfig};
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::telemetry::manager::TelemetryManager;
35use risingwave_common::telemetry::telemetry_env_enabled;
36use risingwave_common::util::addr::HostAddr;
37use risingwave_common::util::pretty_bytes::convert;
38use risingwave_common::util::tokio_util::sync::CancellationToken;
39use risingwave_common::{GIT_SHA, RW_VERSION};
40use risingwave_common_heap_profiling::HeapProfiler;
41use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
42use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS;
43use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
44use risingwave_dml::dml_manager::DmlManager;
45use risingwave_pb::common::WorkerType;
46use risingwave_pb::common::worker_node::Property;
47use risingwave_pb::compute::config_service_server::ConfigServiceServer;
48use risingwave_pb::health::health_server::HealthServer;
49use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
50use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
51use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
52use risingwave_pb::task_service::task_service_server::TaskServiceServer;
53use risingwave_rpc_client::{ComputeClientPool, MetaClient};
54use risingwave_storage::StateStoreImpl;
55use risingwave_storage::hummock::MemoryLimiter;
56use risingwave_storage::hummock::compactor::{
57 CompactionExecutor, CompactorContext, new_compaction_await_tree_reg_ref, start_compactor,
58};
59use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
60use risingwave_storage::hummock::utils::HummockMemoryCollector;
61use risingwave_storage::monitor::{
62 GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, GLOBAL_OBJECT_STORE_METRICS,
63 global_hummock_state_store_metrics, global_storage_metrics, monitor_cache,
64};
65use risingwave_storage::opts::StorageOpts;
66use risingwave_stream::executor::monitor::global_streaming_metrics;
67use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
68use tokio::sync::oneshot::Sender;
69use tokio::task::JoinHandle;
70use tower::Layer;
71
72use crate::ComputeNodeOpts;
73use crate::memory::config::{
74 MIN_COMPUTE_MEMORY_MB, batch_mem_limit, reserve_memory_bytes, storage_memory_config,
75};
76use crate::memory::manager::{MemoryManager, MemoryManagerConfig};
77use crate::observer::observer_manager::ComputeObserverNode;
78use crate::rpc::service::config_service::ConfigServiceImpl;
79use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS;
80use crate::rpc::service::exchange_service::ExchangeServiceImpl;
81use crate::rpc::service::health_service::HealthServiceImpl;
82use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl};
83use crate::rpc::service::stream_service::StreamServiceImpl;
84use crate::telemetry::ComputeTelemetryCreator;
85
86pub async fn compute_node_serve(
90 listen_addr: SocketAddr,
91 advertise_addr: HostAddr,
92 opts: ComputeNodeOpts,
93 shutdown: CancellationToken,
94) {
95 let config = load_config(&opts.config_path, &opts);
97 info!("Starting compute node",);
98 info!("> config: {:?}", config);
99 info!(
100 "> debug assertions: {}",
101 if cfg!(debug_assertions) { "on" } else { "off" }
102 );
103 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
104
105 let stream_config = Arc::new(config.streaming.clone());
107 let batch_config = Arc::new(config.batch.clone());
108
109 init_global_sequencer_args(
111 config
112 .streaming
113 .developer
114 .memory_controller_sequence_tls_step,
115 config
116 .streaming
117 .developer
118 .memory_controller_sequence_tls_lag,
119 );
120
121 let (meta_client, system_params) = MetaClient::register_new(
123 opts.meta_address.clone(),
124 WorkerType::ComputeNode,
125 &advertise_addr,
126 Property {
127 parallelism: opts.parallelism as u32,
128 is_streaming: opts.role.for_streaming(),
129 is_serving: opts.role.for_serving(),
130 is_unschedulable: false,
131 internal_rpc_host_addr: "".to_owned(),
132 resource_group: Some(opts.resource_group.clone()),
133 },
134 &config.meta,
135 )
136 .await;
137
138 let state_store_url = system_params.state_store();
139
140 let embedded_compactor_enabled =
141 embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);
142
143 let (reserved_memory_bytes, non_reserved_memory_bytes) = reserve_memory_bytes(&opts);
144 let storage_memory_config = storage_memory_config(
145 non_reserved_memory_bytes,
146 embedded_compactor_enabled,
147 &config.storage,
148 !opts.role.for_streaming(),
149 );
150
151 let storage_memory_bytes = total_storage_memory_limit_bytes(&storage_memory_config);
152 let compute_memory_bytes = validate_compute_node_memory_config(
153 opts.total_memory_bytes,
154 reserved_memory_bytes,
155 storage_memory_bytes,
156 );
157 print_memory_config(
158 opts.total_memory_bytes,
159 compute_memory_bytes,
160 storage_memory_bytes,
161 &storage_memory_config,
162 embedded_compactor_enabled,
163 reserved_memory_bytes,
164 );
165
166 let storage_opts = Arc::new(StorageOpts::from((
167 &config,
168 &system_params,
169 &storage_memory_config,
170 )));
171
172 let worker_id = meta_client.worker_id();
173 info!("Assigned worker node id {}", worker_id);
174
175 let mut sub_tasks: Vec<(JoinHandle<()>, Sender<()>)> = vec![];
177 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
179 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
180 let streaming_metrics = Arc::new(global_streaming_metrics(config.server.metrics_level));
181 let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
182 let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone());
183 let exchange_srv_metrics = Arc::new(GLOBAL_EXCHANGE_SERVICE_METRICS.clone());
184 let batch_spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
185 let iceberg_scan_metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
186
187 let state_store_metrics = Arc::new(global_hummock_state_store_metrics(
189 config.server.metrics_level,
190 ));
191 let object_store_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
192 let storage_metrics = Arc::new(global_storage_metrics(config.server.metrics_level));
193 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
194 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
195 meta_client.clone(),
196 hummock_metrics.clone(),
197 ));
198
199 let await_tree_config = match &config.streaming.async_stack_trace {
200 AsyncStackTraceOption::Off => None,
201 c => await_tree::ConfigBuilder::default()
202 .verbose(c.is_verbose().unwrap())
203 .build()
204 .ok(),
205 };
206
207 let state_store = StateStoreImpl::new(
208 state_store_url,
209 storage_opts.clone(),
210 hummock_meta_client.clone(),
211 state_store_metrics.clone(),
212 object_store_metrics,
213 storage_metrics.clone(),
214 compactor_metrics.clone(),
215 await_tree_config.clone(),
216 system_params.use_new_object_prefix_strategy(),
217 )
218 .await
219 .unwrap();
220
221 LocalSecretManager::init(
222 opts.temp_secret_file_dir,
223 meta_client.cluster_id().to_owned(),
224 worker_id,
225 );
226
227 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
229 let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
230 let observer_manager =
231 ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
232 observer_manager.start().await;
233
234 if let Some(storage) = state_store.as_hummock() {
235 if embedded_compactor_enabled {
236 tracing::info!("start embedded compactor");
237 let memory_limiter = Arc::new(MemoryLimiter::new(
238 storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
239 ));
240
241 let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
242 let compactor_context = CompactorContext {
243 storage_opts,
244 sstable_store: storage.sstable_store(),
245 compactor_metrics: compactor_metrics.clone(),
246 is_share_buffer_compact: false,
247 compaction_executor,
248 memory_limiter,
249
250 task_progress_manager: Default::default(),
251 await_tree_reg: await_tree_config
252 .clone()
253 .map(new_compaction_await_tree_reg_ref),
254 };
255
256 let (handle, shutdown_sender) = start_compactor(
257 compactor_context,
258 hummock_meta_client.clone(),
259 storage.sstable_object_id_manager().clone(),
260 storage.compaction_catalog_manager_ref().clone(),
261 );
262 sub_tasks.push((handle, shutdown_sender));
263 }
264 let flush_limiter = storage.get_memory_limiter();
265 let memory_collector = Arc::new(HummockMemoryCollector::new(
266 storage.sstable_store(),
267 flush_limiter,
268 storage_memory_config,
269 ));
270 monitor_cache(memory_collector);
271 let backup_reader = storage.backup_reader();
272 let system_params_mgr = system_params_manager.clone();
273 tokio::spawn(async move {
274 backup_reader
275 .watch_config_change(system_params_mgr.watch_params())
276 .await;
277 });
278 }
279
280 sub_tasks.push(MetaClient::start_heartbeat_loop(
281 meta_client.clone(),
282 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
283 ));
284
285 let batch_mgr = Arc::new(BatchManager::new(
287 config.batch.clone(),
288 batch_manager_metrics,
289 batch_mem_limit(compute_memory_bytes, opts.role.for_serving()),
290 ));
291
292 let target_memory = if let Some(v) = opts.memory_manager_target_bytes {
293 v
294 } else {
295 compute_memory_bytes + storage_memory_bytes
296 };
297
298 let memory_mgr = MemoryManager::new(MemoryManagerConfig {
299 target_memory,
300 threshold_aggressive: config
301 .streaming
302 .developer
303 .memory_controller_threshold_aggressive,
304 threshold_graceful: config
305 .streaming
306 .developer
307 .memory_controller_threshold_graceful,
308 threshold_stable: config
309 .streaming
310 .developer
311 .memory_controller_threshold_stable,
312 eviction_factor_stable: config
313 .streaming
314 .developer
315 .memory_controller_eviction_factor_stable,
316 eviction_factor_graceful: config
317 .streaming
318 .developer
319 .memory_controller_eviction_factor_graceful,
320 eviction_factor_aggressive: config
321 .streaming
322 .developer
323 .memory_controller_eviction_factor_aggressive,
324 metrics: streaming_metrics.clone(),
325 });
326
327 tokio::spawn(
329 memory_mgr.clone().run(Duration::from_millis(
330 config
331 .streaming
332 .developer
333 .memory_controller_update_interval_ms as _,
334 )),
335 );
336
337 let heap_profiler = HeapProfiler::new(
338 opts.total_memory_bytes,
339 config.server.heap_profiling.clone(),
340 );
341 heap_profiler.start();
343
344 let dml_mgr = Arc::new(DmlManager::new(
345 worker_id,
346 config.streaming.developer.dml_channel_initial_permits,
347 ));
348
349 let batch_client_pool = Arc::new(ComputeClientPool::new(
351 config.batch_exchange_connection_pool_size(),
352 config.batch.developer.compute_client_config.clone(),
353 ));
354 let batch_env = BatchEnvironment::new(
355 batch_mgr.clone(),
356 advertise_addr.clone(),
357 batch_config,
358 worker_id,
359 state_store.clone(),
360 batch_executor_metrics.clone(),
361 batch_client_pool,
362 dml_mgr.clone(),
363 source_metrics.clone(),
364 batch_spill_metrics.clone(),
365 iceberg_scan_metrics.clone(),
366 config.server.metrics_level,
367 );
368
369 let stream_client_pool = Arc::new(ComputeClientPool::new(
371 config.streaming_exchange_connection_pool_size(),
372 config.streaming.developer.compute_client_config.clone(),
373 ));
374 let stream_env = StreamEnvironment::new(
375 advertise_addr.clone(),
376 stream_config,
377 worker_id,
378 state_store.clone(),
379 dml_mgr,
380 system_params_manager.clone(),
381 source_metrics,
382 meta_client.clone(),
383 stream_client_pool,
384 );
385
386 let stream_mgr = LocalStreamManager::new(
387 stream_env.clone(),
388 streaming_metrics.clone(),
389 await_tree_config.clone(),
390 memory_mgr.get_watermark_sequence(),
391 );
392
393 let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env);
395 let exchange_srv =
396 ExchangeServiceImpl::new(batch_mgr.clone(), stream_mgr.clone(), exchange_srv_metrics);
397 let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone());
398 let (meta_cache, block_cache) = if let Some(hummock) = state_store.as_hummock() {
399 (
400 Some(hummock.sstable_store().meta_cache().clone()),
401 Some(hummock.sstable_store().block_cache().clone()),
402 )
403 } else {
404 (None, None)
405 };
406 let monitor_srv = MonitorServiceImpl::new(
407 stream_mgr.clone(),
408 config.server.clone(),
409 meta_cache.clone(),
410 block_cache.clone(),
411 );
412 let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
413 let health_srv = HealthServiceImpl::new();
414
415 let telemetry_manager = TelemetryManager::new(
416 Arc::new(meta_client.clone()),
417 Arc::new(ComputeTelemetryCreator::new()),
418 );
419
420 if config.server.telemetry_enabled && telemetry_env_enabled() {
423 sub_tasks.push(telemetry_manager.start().await);
424 } else {
425 tracing::info!("Telemetry didn't start due to config");
426 }
427
428 #[cfg(not(madsim))]
430 if config.batch.enable_spill {
431 SpillOp::clean_spill_directory().await.unwrap();
432 }
433
434 let server = tonic::transport::Server::builder()
435 .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
436 .initial_stream_window_size(STREAM_WINDOW_SIZE)
437 .http2_max_pending_accept_reset_streams(Some(config.server.grpc_max_reset_stream as usize))
438 .layer(TracingExtractLayer::new())
439 .add_service(TaskServiceServer::new(batch_srv).max_decoding_message_size(usize::MAX))
441 .add_service(ExchangeServiceServer::new(exchange_srv).max_decoding_message_size(usize::MAX))
442 .add_service({
443 let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned();
444 let srv = StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX);
445 #[cfg(madsim)]
446 {
447 srv
448 }
449 #[cfg(not(madsim))]
450 {
451 AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv)
452 }
453 })
454 .add_service(MonitorServiceServer::new(monitor_srv))
455 .add_service(ConfigServiceServer::new(config_srv))
456 .add_service(HealthServer::new(health_srv))
457 .monitored_serve_with_shutdown(
458 listen_addr,
459 "grpc-compute-node-service",
460 TcpConfig {
461 tcp_nodelay: true,
462 keepalive_duration: None,
463 },
464 shutdown.clone().cancelled_owned(),
465 );
466 let _server_handle = tokio::spawn(server);
467
468 if config.server.metrics_level > MetricLevel::Disabled {
470 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
471 }
472
473 meta_client.activate(&advertise_addr).await.unwrap();
475 shutdown.cancelled().await;
477
478 meta_client.try_unregister().await;
482 let _ = stream_mgr.shutdown().await;
484
485 }
490
491fn validate_compute_node_memory_config(
497 cn_total_memory_bytes: usize,
498 reserved_memory_bytes: usize,
499 storage_memory_bytes: usize,
500) -> usize {
501 if storage_memory_bytes > cn_total_memory_bytes {
502 tracing::warn!(
503 "The storage memory exceeds the total compute node memory:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
504 convert(cn_total_memory_bytes as _),
505 convert(storage_memory_bytes as _)
506 );
507 MIN_COMPUTE_MEMORY_MB << 20
508 } else if storage_memory_bytes + (MIN_COMPUTE_MEMORY_MB << 20) + reserved_memory_bytes
509 >= cn_total_memory_bytes
510 {
511 tracing::warn!(
512 "No enough memory for computing and other system usage:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
513 convert(cn_total_memory_bytes as _),
514 convert(storage_memory_bytes as _)
515 );
516 MIN_COMPUTE_MEMORY_MB << 20
517 } else {
518 cn_total_memory_bytes - storage_memory_bytes - reserved_memory_bytes
519 }
520}
521
522fn total_storage_memory_limit_bytes(storage_memory_config: &StorageMemoryConfig) -> usize {
525 let total_storage_memory_mb = storage_memory_config.block_cache_capacity_mb
526 + storage_memory_config.meta_cache_capacity_mb
527 + storage_memory_config.shared_buffer_capacity_mb
528 + storage_memory_config.compactor_memory_limit_mb;
529 total_storage_memory_mb << 20
530}
531
532fn embedded_compactor_enabled(state_store_url: &str, disable_remote_compactor: bool) -> bool {
534 state_store_url == "hummock+memory"
537 || state_store_url.starts_with("hummock+disk")
538 || disable_remote_compactor
539}
540
541fn print_memory_config(
543 cn_total_memory_bytes: usize,
544 compute_memory_bytes: usize,
545 storage_memory_bytes: usize,
546 storage_memory_config: &StorageMemoryConfig,
547 embedded_compactor_enabled: bool,
548 reserved_memory_bytes: usize,
549) {
550 let memory_config = format!(
551 "Memory outline:\n\
552 > total_memory: {}\n\
553 > storage_memory: {}\n\
554 > block_cache_capacity: {}\n\
555 > meta_cache_capacity: {}\n\
556 > shared_buffer_capacity: {}\n\
557 > compactor_memory_limit: {}\n\
558 > compute_memory: {}\n\
559 > reserved_memory: {}",
560 convert(cn_total_memory_bytes as _),
561 convert(storage_memory_bytes as _),
562 convert((storage_memory_config.block_cache_capacity_mb << 20) as _),
563 convert((storage_memory_config.meta_cache_capacity_mb << 20) as _),
564 convert((storage_memory_config.shared_buffer_capacity_mb << 20) as _),
565 if embedded_compactor_enabled {
566 convert((storage_memory_config.compactor_memory_limit_mb << 20) as _)
567 } else {
568 "Not enabled".to_owned()
569 },
570 convert(compute_memory_bytes as _),
571 convert(reserved_memory_bytes as _),
572 );
573 info!("{}", memory_config);
574}