1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_batch::monitor::{
20 GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
21};
22use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
23use risingwave_batch::spill::spill_op::SpillOp;
24use risingwave_batch::task::{BatchEnvironment, BatchManager};
25use risingwave_common::config::{
26 AsyncStackTraceOption, MAX_CONNECTION_WINDOW_SIZE, MetricLevel, STREAM_WINDOW_SIZE,
27 StorageMemoryConfig, load_config,
28};
29use risingwave_common::license::LicenseManager;
30use risingwave_common::lru::init_global_sequencer_args;
31use risingwave_common::monitor::{RouterExt, TcpConfig};
32use risingwave_common::secret::LocalSecretManager;
33use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::telemetry::manager::TelemetryManager;
36use risingwave_common::telemetry::telemetry_env_enabled;
37use risingwave_common::util::addr::HostAddr;
38use risingwave_common::util::pretty_bytes::convert;
39use risingwave_common::util::tokio_util::sync::CancellationToken;
40use risingwave_common::{DATA_DIRECTORY, GIT_SHA, RW_VERSION, STATE_STORE_URL};
41use risingwave_common_heap_profiling::HeapProfiler;
42use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
43use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS;
44use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
45use risingwave_dml::dml_manager::DmlManager;
46use risingwave_pb::common::WorkerType;
47use risingwave_pb::common::worker_node::Property;
48use risingwave_pb::compute::config_service_server::ConfigServiceServer;
49use risingwave_pb::health::health_server::HealthServer;
50use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
51use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
52use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
53use risingwave_pb::task_service::task_service_server::TaskServiceServer;
54use risingwave_rpc_client::{ComputeClientPool, MetaClient};
55use risingwave_storage::StateStoreImpl;
56use risingwave_storage::hummock::MemoryLimiter;
57use risingwave_storage::hummock::compactor::{
58 CompactionExecutor, CompactorContext, new_compaction_await_tree_reg_ref, start_compactor,
59};
60use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
61use risingwave_storage::hummock::utils::HummockMemoryCollector;
62use risingwave_storage::monitor::{
63 GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, GLOBAL_OBJECT_STORE_METRICS,
64 global_hummock_state_store_metrics, global_storage_metrics, monitor_cache,
65};
66use risingwave_storage::opts::StorageOpts;
67use risingwave_stream::executor::monitor::global_streaming_metrics;
68use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
69use tokio::sync::oneshot::Sender;
70use tokio::task::JoinHandle;
71use tower::Layer;
72
73use crate::ComputeNodeOpts;
74use crate::memory::config::{
75 MIN_COMPUTE_MEMORY_MB, batch_mem_limit, reserve_memory_bytes, storage_memory_config,
76};
77use crate::memory::manager::{MemoryManager, MemoryManagerConfig};
78use crate::observer::observer_manager::ComputeObserverNode;
79use crate::rpc::service::config_service::ConfigServiceImpl;
80use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS;
81use crate::rpc::service::exchange_service::ExchangeServiceImpl;
82use crate::rpc::service::health_service::HealthServiceImpl;
83use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl};
84use crate::rpc::service::stream_service::StreamServiceImpl;
85use crate::telemetry::ComputeTelemetryCreator;
86pub async fn compute_node_serve(
90 listen_addr: SocketAddr,
91 advertise_addr: HostAddr,
92 opts: Arc<ComputeNodeOpts>,
93 shutdown: CancellationToken,
94) {
95 let config = Arc::new(load_config(&opts.config_path, &*opts));
97 info!("Starting compute node",);
98 info!("> config: {:?}", &*config);
99 info!(
100 "> debug assertions: {}",
101 if cfg!(debug_assertions) { "on" } else { "off" }
102 );
103 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
104
105 let stream_config = Arc::new(config.streaming.clone());
107 let batch_config = Arc::new(config.batch.clone());
108
109 init_global_sequencer_args(
111 config
112 .streaming
113 .developer
114 .memory_controller_sequence_tls_step,
115 config
116 .streaming
117 .developer
118 .memory_controller_sequence_tls_lag,
119 );
120
121 let (meta_client, system_params) = MetaClient::register_new(
123 opts.meta_address.clone(),
124 WorkerType::ComputeNode,
125 &advertise_addr,
126 Property {
127 parallelism: opts.parallelism as u32,
128 is_streaming: opts.role.for_streaming(),
129 is_serving: opts.role.for_serving(),
130 is_unschedulable: false,
131 internal_rpc_host_addr: "".to_owned(),
132 resource_group: Some(opts.resource_group.clone()),
133 },
134 Arc::new(config.meta.clone()),
135 )
136 .await;
137 let mut sub_tasks: Vec<(JoinHandle<()>, Sender<()>)> = vec![];
139 sub_tasks.push(MetaClient::start_heartbeat_loop(
140 meta_client.clone(),
141 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
142 ));
143
144 let state_store_url = system_params.state_store();
145 let data_directory = system_params.data_directory();
146
147 let embedded_compactor_enabled =
148 embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);
149
150 let (reserved_memory_bytes, non_reserved_memory_bytes) = reserve_memory_bytes(&opts);
151 let storage_memory_config = storage_memory_config(
152 non_reserved_memory_bytes,
153 embedded_compactor_enabled,
154 &config.storage,
155 !opts.role.for_streaming(),
156 );
157
158 let storage_memory_bytes = total_storage_memory_limit_bytes(&storage_memory_config);
159 let compute_memory_bytes = validate_compute_node_memory_config(
160 opts.total_memory_bytes,
161 reserved_memory_bytes,
162 storage_memory_bytes,
163 );
164 print_memory_config(
165 opts.total_memory_bytes,
166 compute_memory_bytes,
167 storage_memory_bytes,
168 &storage_memory_config,
169 embedded_compactor_enabled,
170 reserved_memory_bytes,
171 );
172
173 let storage_opts = Arc::new(StorageOpts::from((
174 &*config,
175 &system_params,
176 &storage_memory_config,
177 )));
178
179 let worker_id = meta_client.worker_id();
180 info!("Assigned worker node id {}", worker_id);
181
182 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
184 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
185 let streaming_metrics = Arc::new(global_streaming_metrics(config.server.metrics_level));
186 let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
187 let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone());
188 let exchange_srv_metrics = Arc::new(GLOBAL_EXCHANGE_SERVICE_METRICS.clone());
189 let batch_spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
190 let iceberg_scan_metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
191
192 let state_store_metrics = Arc::new(global_hummock_state_store_metrics(
194 config.server.metrics_level,
195 ));
196 let object_store_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
197 let storage_metrics = Arc::new(global_storage_metrics(config.server.metrics_level));
198 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
199 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
200 meta_client.clone(),
201 hummock_metrics.clone(),
202 ));
203
204 let await_tree_config = match &config.streaming.async_stack_trace {
205 AsyncStackTraceOption::Off => None,
206 c => await_tree::ConfigBuilder::default()
207 .verbose(c.is_verbose().unwrap())
208 .build()
209 .ok(),
210 };
211 if let Err(existing_url) = STATE_STORE_URL.set(state_store_url.to_owned()) {
214 assert_eq!(
215 existing_url, state_store_url,
216 "STATE_STORE_URL already set with different value"
217 );
218 }
219
220 if let Err(existing_dir) = DATA_DIRECTORY.set(data_directory.to_owned()) {
223 assert_eq!(
224 existing_dir, data_directory,
225 "DATA_DIRECTORY already set with different value"
226 );
227 }
228 LicenseManager::get().refresh(system_params.license_key());
229 let state_store = Box::pin(StateStoreImpl::new(
230 state_store_url,
231 storage_opts.clone(),
232 hummock_meta_client.clone(),
233 state_store_metrics.clone(),
234 object_store_metrics,
235 storage_metrics.clone(),
236 compactor_metrics.clone(),
237 await_tree_config.clone(),
238 system_params.use_new_object_prefix_strategy(),
239 ))
240 .await
241 .unwrap();
242
243 LocalSecretManager::init(
244 opts.temp_secret_file_dir.clone(),
245 meta_client.cluster_id().to_owned(),
246 worker_id,
247 );
248
249 let batch_client_pool = Arc::new(ComputeClientPool::new(
251 config.batch_exchange_connection_pool_size(),
252 config.batch.developer.compute_client_config.clone(),
253 ));
254 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
255 let compute_observer_node =
256 ComputeObserverNode::new(system_params_manager.clone(), batch_client_pool.clone());
257 let observer_manager =
258 ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
259 observer_manager.start().await;
260
261 if let Some(storage) = state_store.as_hummock() {
262 if embedded_compactor_enabled {
263 tracing::info!("start embedded compactor");
264 let memory_limiter = Arc::new(MemoryLimiter::new(
265 storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
266 ));
267
268 let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
269 let compactor_context = CompactorContext {
270 storage_opts,
271 sstable_store: storage.sstable_store(),
272 compactor_metrics: compactor_metrics.clone(),
273 is_share_buffer_compact: false,
274 compaction_executor,
275 memory_limiter,
276
277 task_progress_manager: Default::default(),
278 await_tree_reg: await_tree_config
279 .clone()
280 .map(new_compaction_await_tree_reg_ref),
281 };
282
283 let (handle, shutdown_sender) = start_compactor(
284 compactor_context,
285 hummock_meta_client.clone(),
286 storage.object_id_manager().clone(),
287 storage.compaction_catalog_manager_ref(),
288 );
289 sub_tasks.push((handle, shutdown_sender));
290 }
291 let flush_limiter = storage.get_memory_limiter();
292 let memory_collector = Arc::new(HummockMemoryCollector::new(
293 storage.sstable_store(),
294 flush_limiter,
295 storage_memory_config,
296 ));
297 monitor_cache(memory_collector);
298 let backup_reader = storage.backup_reader();
299 let system_params_mgr = system_params_manager.clone();
300 tokio::spawn(async move {
301 backup_reader
302 .watch_config_change(system_params_mgr.watch_params())
303 .await;
304 });
305 }
306
307 let batch_mgr = Arc::new(BatchManager::new(
309 config.batch.clone(),
310 batch_manager_metrics,
311 batch_mem_limit(compute_memory_bytes, opts.role.for_serving()),
312 ));
313
314 let target_memory = if let Some(v) = opts.memory_manager_target_bytes {
315 v
316 } else {
317 compute_memory_bytes + storage_memory_bytes
318 };
319
320 let memory_mgr = MemoryManager::new(MemoryManagerConfig {
321 target_memory,
322 threshold_aggressive: config
323 .streaming
324 .developer
325 .memory_controller_threshold_aggressive,
326 threshold_graceful: config
327 .streaming
328 .developer
329 .memory_controller_threshold_graceful,
330 threshold_stable: config
331 .streaming
332 .developer
333 .memory_controller_threshold_stable,
334 eviction_factor_stable: config
335 .streaming
336 .developer
337 .memory_controller_eviction_factor_stable,
338 eviction_factor_graceful: config
339 .streaming
340 .developer
341 .memory_controller_eviction_factor_graceful,
342 eviction_factor_aggressive: config
343 .streaming
344 .developer
345 .memory_controller_eviction_factor_aggressive,
346 metrics: streaming_metrics.clone(),
347 });
348
349 tokio::spawn(
351 memory_mgr.clone().run(Duration::from_millis(
352 config
353 .streaming
354 .developer
355 .memory_controller_update_interval_ms as _,
356 )),
357 );
358
359 let heap_profiler = HeapProfiler::new(
360 opts.total_memory_bytes,
361 config.server.heap_profiling.clone(),
362 );
363 heap_profiler.start();
365
366 let dml_mgr = Arc::new(DmlManager::new(
367 worker_id,
368 config.streaming.developer.dml_channel_initial_permits,
369 ));
370
371 let batch_env = BatchEnvironment::new(
373 batch_mgr.clone(),
374 advertise_addr.clone(),
375 batch_config,
376 worker_id,
377 state_store.clone(),
378 batch_executor_metrics.clone(),
379 batch_client_pool,
380 dml_mgr.clone(),
381 source_metrics.clone(),
382 batch_spill_metrics.clone(),
383 iceberg_scan_metrics.clone(),
384 config.server.metrics_level,
385 );
386
387 let stream_client_pool = Arc::new(ComputeClientPool::new(
389 config.streaming_exchange_connection_pool_size(),
390 config.streaming.developer.compute_client_config.clone(),
391 ));
392 let stream_env = StreamEnvironment::new(
393 advertise_addr.clone(),
394 stream_config,
395 worker_id,
396 state_store.clone(),
397 dml_mgr,
398 system_params_manager.clone(),
399 source_metrics,
400 meta_client.clone(),
401 stream_client_pool,
402 );
403
404 let stream_mgr = LocalStreamManager::new(
405 stream_env.clone(),
406 streaming_metrics.clone(),
407 await_tree_config.clone(),
408 memory_mgr.get_watermark_sequence(),
409 );
410
411 let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env);
413 let exchange_srv =
414 ExchangeServiceImpl::new(batch_mgr.clone(), stream_mgr.clone(), exchange_srv_metrics);
415 let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone());
416 let (meta_cache, block_cache) = if let Some(hummock) = state_store.as_hummock() {
417 (
418 Some(hummock.sstable_store().meta_cache().clone()),
419 Some(hummock.sstable_store().block_cache().clone()),
420 )
421 } else {
422 (None, None)
423 };
424 let monitor_srv = MonitorServiceImpl::new(
425 stream_mgr.clone(),
426 config.server.clone(),
427 meta_cache.clone(),
428 block_cache.clone(),
429 );
430 let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
431 let health_srv = HealthServiceImpl::new();
432
433 let telemetry_manager = TelemetryManager::new(
434 Arc::new(meta_client.clone()),
435 Arc::new(ComputeTelemetryCreator::new()),
436 );
437
438 if config.server.telemetry_enabled && telemetry_env_enabled() {
441 sub_tasks.push(telemetry_manager.start().await);
442 } else {
443 tracing::info!("Telemetry didn't start due to config");
444 }
445
446 #[cfg(not(madsim))]
448 if config.batch.enable_spill {
449 SpillOp::clean_spill_directory().await.unwrap();
450 }
451
452 let server = tonic::transport::Server::builder()
453 .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
454 .initial_stream_window_size(STREAM_WINDOW_SIZE)
455 .http2_max_pending_accept_reset_streams(Some(config.server.grpc_max_reset_stream as usize))
456 .layer(TracingExtractLayer::new())
457 .add_service(TaskServiceServer::new(batch_srv).max_decoding_message_size(usize::MAX))
459 .add_service(ExchangeServiceServer::new(exchange_srv).max_decoding_message_size(usize::MAX))
460 .add_service({
461 let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned();
462 let srv = StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX);
463 #[cfg(madsim)]
464 {
465 srv
466 }
467 #[cfg(not(madsim))]
468 {
469 AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv)
470 }
471 })
472 .add_service(MonitorServiceServer::new(monitor_srv))
473 .add_service(ConfigServiceServer::new(config_srv))
474 .add_service(HealthServer::new(health_srv))
475 .monitored_serve_with_shutdown(
476 listen_addr,
477 "grpc-compute-node-service",
478 TcpConfig {
479 tcp_nodelay: true,
480 keepalive_duration: None,
481 },
482 shutdown.clone().cancelled_owned(),
483 );
484 let _server_handle = tokio::spawn(server);
485
486 if config.server.metrics_level > MetricLevel::Disabled {
488 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
489 }
490
491 meta_client.activate(&advertise_addr).await.unwrap();
493 shutdown.cancelled().await;
495
496 meta_client.try_unregister().await;
500 let _ = stream_mgr.shutdown().await;
502
503 }
508
509fn validate_compute_node_memory_config(
515 cn_total_memory_bytes: usize,
516 reserved_memory_bytes: usize,
517 storage_memory_bytes: usize,
518) -> usize {
519 if storage_memory_bytes > cn_total_memory_bytes {
520 tracing::warn!(
521 "The storage memory exceeds the total compute node memory:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
522 convert(cn_total_memory_bytes as _),
523 convert(storage_memory_bytes as _)
524 );
525 MIN_COMPUTE_MEMORY_MB << 20
526 } else if storage_memory_bytes + (MIN_COMPUTE_MEMORY_MB << 20) + reserved_memory_bytes
527 >= cn_total_memory_bytes
528 {
529 tracing::warn!(
530 "No enough memory for computing and other system usage:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
531 convert(cn_total_memory_bytes as _),
532 convert(storage_memory_bytes as _)
533 );
534 MIN_COMPUTE_MEMORY_MB << 20
535 } else {
536 cn_total_memory_bytes - storage_memory_bytes - reserved_memory_bytes
537 }
538}
539
540fn total_storage_memory_limit_bytes(storage_memory_config: &StorageMemoryConfig) -> usize {
543 let total_storage_memory_mb = storage_memory_config.block_cache_capacity_mb
544 + storage_memory_config.meta_cache_capacity_mb
545 + storage_memory_config.shared_buffer_capacity_mb
546 + storage_memory_config.compactor_memory_limit_mb;
547 total_storage_memory_mb << 20
548}
549
550fn embedded_compactor_enabled(state_store_url: &str, disable_remote_compactor: bool) -> bool {
552 state_store_url.starts_with("hummock+memory")
554 || state_store_url.starts_with("hummock+disk")
555 || disable_remote_compactor
556}
557
558fn print_memory_config(
560 cn_total_memory_bytes: usize,
561 compute_memory_bytes: usize,
562 storage_memory_bytes: usize,
563 storage_memory_config: &StorageMemoryConfig,
564 embedded_compactor_enabled: bool,
565 reserved_memory_bytes: usize,
566) {
567 let memory_config = format!(
568 "Memory outline:\n\
569 > total_memory: {}\n\
570 > storage_memory: {}\n\
571 > block_cache_capacity: {}\n\
572 > meta_cache_capacity: {}\n\
573 > shared_buffer_capacity: {}\n\
574 > compactor_memory_limit: {}\n\
575 > compute_memory: {}\n\
576 > reserved_memory: {}",
577 convert(cn_total_memory_bytes as _),
578 convert(storage_memory_bytes as _),
579 convert((storage_memory_config.block_cache_capacity_mb << 20) as _),
580 convert((storage_memory_config.meta_cache_capacity_mb << 20) as _),
581 convert((storage_memory_config.shared_buffer_capacity_mb << 20) as _),
582 if embedded_compactor_enabled {
583 convert((storage_memory_config.compactor_memory_limit_mb << 20) as _)
584 } else {
585 "Not enabled".to_owned()
586 },
587 convert(compute_memory_bytes as _),
588 convert(reserved_memory_bytes as _),
589 );
590 info!("{}", memory_config);
591}