1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_batch::monitor::{
20 GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
21};
22use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
23use risingwave_batch::spill::spill_op::SpillOp;
24use risingwave_batch::task::{BatchEnvironment, BatchManager};
25use risingwave_common::config::{
26 AsyncStackTraceOption, MAX_CONNECTION_WINDOW_SIZE, MetricLevel, STREAM_WINDOW_SIZE,
27 StorageMemoryConfig, load_config,
28};
29use risingwave_common::license::LicenseManager;
30use risingwave_common::lru::init_global_sequencer_args;
31use risingwave_common::monitor::{RouterExt, TcpConfig};
32use risingwave_common::secret::LocalSecretManager;
33use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::telemetry::manager::TelemetryManager;
36use risingwave_common::telemetry::telemetry_env_enabled;
37use risingwave_common::util::addr::HostAddr;
38use risingwave_common::util::pretty_bytes::convert;
39use risingwave_common::util::tokio_util::sync::CancellationToken;
40use risingwave_common::{GIT_SHA, RW_VERSION};
41use risingwave_common_heap_profiling::HeapProfiler;
42use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
43use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS;
44use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
45use risingwave_dml::dml_manager::DmlManager;
46use risingwave_pb::common::WorkerType;
47use risingwave_pb::common::worker_node::Property;
48use risingwave_pb::compute::config_service_server::ConfigServiceServer;
49use risingwave_pb::health::health_server::HealthServer;
50use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
51use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
52use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
53use risingwave_pb::task_service::task_service_server::TaskServiceServer;
54use risingwave_rpc_client::{ComputeClientPool, MetaClient};
55use risingwave_storage::StateStoreImpl;
56use risingwave_storage::hummock::MemoryLimiter;
57use risingwave_storage::hummock::compactor::{
58 CompactionExecutor, CompactorContext, new_compaction_await_tree_reg_ref, start_compactor,
59};
60use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
61use risingwave_storage::hummock::utils::HummockMemoryCollector;
62use risingwave_storage::monitor::{
63 GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, GLOBAL_OBJECT_STORE_METRICS,
64 global_hummock_state_store_metrics, global_storage_metrics, monitor_cache,
65};
66use risingwave_storage::opts::StorageOpts;
67use risingwave_stream::executor::monitor::global_streaming_metrics;
68use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
69use tokio::sync::oneshot::Sender;
70use tokio::task::JoinHandle;
71use tower::Layer;
72
73use crate::ComputeNodeOpts;
74use crate::memory::config::{
75 MIN_COMPUTE_MEMORY_MB, batch_mem_limit, reserve_memory_bytes, storage_memory_config,
76};
77use crate::memory::manager::{MemoryManager, MemoryManagerConfig};
78use crate::observer::observer_manager::ComputeObserverNode;
79use crate::rpc::service::config_service::ConfigServiceImpl;
80use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS;
81use crate::rpc::service::exchange_service::ExchangeServiceImpl;
82use crate::rpc::service::health_service::HealthServiceImpl;
83use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl};
84use crate::rpc::service::stream_service::StreamServiceImpl;
85use crate::telemetry::ComputeTelemetryCreator;
86
87pub async fn compute_node_serve(
91 listen_addr: SocketAddr,
92 advertise_addr: HostAddr,
93 opts: ComputeNodeOpts,
94 shutdown: CancellationToken,
95) {
96 let config = load_config(&opts.config_path, &opts);
98 info!("Starting compute node",);
99 info!("> config: {:?}", config);
100 info!(
101 "> debug assertions: {}",
102 if cfg!(debug_assertions) { "on" } else { "off" }
103 );
104 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
105
106 let stream_config = Arc::new(config.streaming.clone());
108 let batch_config = Arc::new(config.batch.clone());
109
110 init_global_sequencer_args(
112 config
113 .streaming
114 .developer
115 .memory_controller_sequence_tls_step,
116 config
117 .streaming
118 .developer
119 .memory_controller_sequence_tls_lag,
120 );
121
122 let (meta_client, system_params) = MetaClient::register_new(
124 opts.meta_address.clone(),
125 WorkerType::ComputeNode,
126 &advertise_addr,
127 Property {
128 parallelism: opts.parallelism as u32,
129 is_streaming: opts.role.for_streaming(),
130 is_serving: opts.role.for_serving(),
131 is_unschedulable: false,
132 internal_rpc_host_addr: "".to_owned(),
133 resource_group: Some(opts.resource_group.clone()),
134 },
135 &config.meta,
136 )
137 .await;
138
139 let state_store_url = system_params.state_store();
140
141 let embedded_compactor_enabled =
142 embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);
143
144 let (reserved_memory_bytes, non_reserved_memory_bytes) = reserve_memory_bytes(&opts);
145 let storage_memory_config = storage_memory_config(
146 non_reserved_memory_bytes,
147 embedded_compactor_enabled,
148 &config.storage,
149 !opts.role.for_streaming(),
150 );
151
152 let storage_memory_bytes = total_storage_memory_limit_bytes(&storage_memory_config);
153 let compute_memory_bytes = validate_compute_node_memory_config(
154 opts.total_memory_bytes,
155 reserved_memory_bytes,
156 storage_memory_bytes,
157 );
158 print_memory_config(
159 opts.total_memory_bytes,
160 compute_memory_bytes,
161 storage_memory_bytes,
162 &storage_memory_config,
163 embedded_compactor_enabled,
164 reserved_memory_bytes,
165 );
166
167 let storage_opts = Arc::new(StorageOpts::from((
168 &config,
169 &system_params,
170 &storage_memory_config,
171 )));
172
173 let worker_id = meta_client.worker_id();
174 info!("Assigned worker node id {}", worker_id);
175
176 let mut sub_tasks: Vec<(JoinHandle<()>, Sender<()>)> = vec![];
178 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
180 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
181 let streaming_metrics = Arc::new(global_streaming_metrics(config.server.metrics_level));
182 let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
183 let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone());
184 let exchange_srv_metrics = Arc::new(GLOBAL_EXCHANGE_SERVICE_METRICS.clone());
185 let batch_spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
186 let iceberg_scan_metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
187
188 let state_store_metrics = Arc::new(global_hummock_state_store_metrics(
190 config.server.metrics_level,
191 ));
192 let object_store_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
193 let storage_metrics = Arc::new(global_storage_metrics(config.server.metrics_level));
194 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
195 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
196 meta_client.clone(),
197 hummock_metrics.clone(),
198 ));
199
200 let await_tree_config = match &config.streaming.async_stack_trace {
201 AsyncStackTraceOption::Off => None,
202 c => await_tree::ConfigBuilder::default()
203 .verbose(c.is_verbose().unwrap())
204 .build()
205 .ok(),
206 };
207
208 LicenseManager::get().refresh(system_params.license_key());
209 let state_store = StateStoreImpl::new(
210 state_store_url,
211 storage_opts.clone(),
212 hummock_meta_client.clone(),
213 state_store_metrics.clone(),
214 object_store_metrics,
215 storage_metrics.clone(),
216 compactor_metrics.clone(),
217 await_tree_config.clone(),
218 system_params.use_new_object_prefix_strategy(),
219 )
220 .await
221 .unwrap();
222
223 LocalSecretManager::init(
224 opts.temp_secret_file_dir,
225 meta_client.cluster_id().to_owned(),
226 worker_id,
227 );
228
229 let batch_client_pool = Arc::new(ComputeClientPool::new(
231 config.batch_exchange_connection_pool_size(),
232 config.batch.developer.compute_client_config.clone(),
233 ));
234 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
235 let compute_observer_node =
236 ComputeObserverNode::new(system_params_manager.clone(), batch_client_pool.clone());
237 let observer_manager =
238 ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
239 observer_manager.start().await;
240
241 if let Some(storage) = state_store.as_hummock() {
242 if embedded_compactor_enabled {
243 tracing::info!("start embedded compactor");
244 let memory_limiter = Arc::new(MemoryLimiter::new(
245 storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
246 ));
247
248 let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
249 let compactor_context = CompactorContext {
250 storage_opts,
251 sstable_store: storage.sstable_store(),
252 compactor_metrics: compactor_metrics.clone(),
253 is_share_buffer_compact: false,
254 compaction_executor,
255 memory_limiter,
256
257 task_progress_manager: Default::default(),
258 await_tree_reg: await_tree_config
259 .clone()
260 .map(new_compaction_await_tree_reg_ref),
261 };
262
263 let (handle, shutdown_sender) = start_compactor(
264 compactor_context,
265 hummock_meta_client.clone(),
266 storage.object_id_manager().clone(),
267 storage.compaction_catalog_manager_ref().clone(),
268 );
269 sub_tasks.push((handle, shutdown_sender));
270 }
271 let flush_limiter = storage.get_memory_limiter();
272 let memory_collector = Arc::new(HummockMemoryCollector::new(
273 storage.sstable_store(),
274 flush_limiter,
275 storage_memory_config,
276 ));
277 monitor_cache(memory_collector);
278 let backup_reader = storage.backup_reader();
279 let system_params_mgr = system_params_manager.clone();
280 tokio::spawn(async move {
281 backup_reader
282 .watch_config_change(system_params_mgr.watch_params())
283 .await;
284 });
285 }
286
287 sub_tasks.push(MetaClient::start_heartbeat_loop(
288 meta_client.clone(),
289 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
290 ));
291
292 let batch_mgr = Arc::new(BatchManager::new(
294 config.batch.clone(),
295 batch_manager_metrics,
296 batch_mem_limit(compute_memory_bytes, opts.role.for_serving()),
297 ));
298
299 let target_memory = if let Some(v) = opts.memory_manager_target_bytes {
300 v
301 } else {
302 compute_memory_bytes + storage_memory_bytes
303 };
304
305 let memory_mgr = MemoryManager::new(MemoryManagerConfig {
306 target_memory,
307 threshold_aggressive: config
308 .streaming
309 .developer
310 .memory_controller_threshold_aggressive,
311 threshold_graceful: config
312 .streaming
313 .developer
314 .memory_controller_threshold_graceful,
315 threshold_stable: config
316 .streaming
317 .developer
318 .memory_controller_threshold_stable,
319 eviction_factor_stable: config
320 .streaming
321 .developer
322 .memory_controller_eviction_factor_stable,
323 eviction_factor_graceful: config
324 .streaming
325 .developer
326 .memory_controller_eviction_factor_graceful,
327 eviction_factor_aggressive: config
328 .streaming
329 .developer
330 .memory_controller_eviction_factor_aggressive,
331 metrics: streaming_metrics.clone(),
332 });
333
334 tokio::spawn(
336 memory_mgr.clone().run(Duration::from_millis(
337 config
338 .streaming
339 .developer
340 .memory_controller_update_interval_ms as _,
341 )),
342 );
343
344 let heap_profiler = HeapProfiler::new(
345 opts.total_memory_bytes,
346 config.server.heap_profiling.clone(),
347 );
348 heap_profiler.start();
350
351 let dml_mgr = Arc::new(DmlManager::new(
352 worker_id,
353 config.streaming.developer.dml_channel_initial_permits,
354 ));
355
356 let batch_env = BatchEnvironment::new(
358 batch_mgr.clone(),
359 advertise_addr.clone(),
360 batch_config,
361 worker_id,
362 state_store.clone(),
363 batch_executor_metrics.clone(),
364 batch_client_pool,
365 dml_mgr.clone(),
366 source_metrics.clone(),
367 batch_spill_metrics.clone(),
368 iceberg_scan_metrics.clone(),
369 config.server.metrics_level,
370 );
371
372 let stream_client_pool = Arc::new(ComputeClientPool::new(
374 config.streaming_exchange_connection_pool_size(),
375 config.streaming.developer.compute_client_config.clone(),
376 ));
377 let stream_env = StreamEnvironment::new(
378 advertise_addr.clone(),
379 stream_config,
380 worker_id,
381 state_store.clone(),
382 dml_mgr,
383 system_params_manager.clone(),
384 source_metrics,
385 meta_client.clone(),
386 stream_client_pool,
387 );
388
389 let stream_mgr = LocalStreamManager::new(
390 stream_env.clone(),
391 streaming_metrics.clone(),
392 await_tree_config.clone(),
393 memory_mgr.get_watermark_sequence(),
394 );
395
396 let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env);
398 let exchange_srv =
399 ExchangeServiceImpl::new(batch_mgr.clone(), stream_mgr.clone(), exchange_srv_metrics);
400 let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone());
401 let (meta_cache, block_cache) = if let Some(hummock) = state_store.as_hummock() {
402 (
403 Some(hummock.sstable_store().meta_cache().clone()),
404 Some(hummock.sstable_store().block_cache().clone()),
405 )
406 } else {
407 (None, None)
408 };
409 let monitor_srv = MonitorServiceImpl::new(
410 stream_mgr.clone(),
411 config.server.clone(),
412 meta_cache.clone(),
413 block_cache.clone(),
414 );
415 let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
416 let health_srv = HealthServiceImpl::new();
417
418 let telemetry_manager = TelemetryManager::new(
419 Arc::new(meta_client.clone()),
420 Arc::new(ComputeTelemetryCreator::new()),
421 );
422
423 if config.server.telemetry_enabled && telemetry_env_enabled() {
426 sub_tasks.push(telemetry_manager.start().await);
427 } else {
428 tracing::info!("Telemetry didn't start due to config");
429 }
430
431 #[cfg(not(madsim))]
433 if config.batch.enable_spill {
434 SpillOp::clean_spill_directory().await.unwrap();
435 }
436
437 let server = tonic::transport::Server::builder()
438 .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
439 .initial_stream_window_size(STREAM_WINDOW_SIZE)
440 .http2_max_pending_accept_reset_streams(Some(config.server.grpc_max_reset_stream as usize))
441 .layer(TracingExtractLayer::new())
442 .add_service(TaskServiceServer::new(batch_srv).max_decoding_message_size(usize::MAX))
444 .add_service(ExchangeServiceServer::new(exchange_srv).max_decoding_message_size(usize::MAX))
445 .add_service({
446 let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned();
447 let srv = StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX);
448 #[cfg(madsim)]
449 {
450 srv
451 }
452 #[cfg(not(madsim))]
453 {
454 AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv)
455 }
456 })
457 .add_service(MonitorServiceServer::new(monitor_srv))
458 .add_service(ConfigServiceServer::new(config_srv))
459 .add_service(HealthServer::new(health_srv))
460 .monitored_serve_with_shutdown(
461 listen_addr,
462 "grpc-compute-node-service",
463 TcpConfig {
464 tcp_nodelay: true,
465 keepalive_duration: None,
466 },
467 shutdown.clone().cancelled_owned(),
468 );
469 let _server_handle = tokio::spawn(server);
470
471 if config.server.metrics_level > MetricLevel::Disabled {
473 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
474 }
475
476 meta_client.activate(&advertise_addr).await.unwrap();
478 shutdown.cancelled().await;
480
481 meta_client.try_unregister().await;
485 let _ = stream_mgr.shutdown().await;
487
488 }
493
494fn validate_compute_node_memory_config(
500 cn_total_memory_bytes: usize,
501 reserved_memory_bytes: usize,
502 storage_memory_bytes: usize,
503) -> usize {
504 if storage_memory_bytes > cn_total_memory_bytes {
505 tracing::warn!(
506 "The storage memory exceeds the total compute node memory:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
507 convert(cn_total_memory_bytes as _),
508 convert(storage_memory_bytes as _)
509 );
510 MIN_COMPUTE_MEMORY_MB << 20
511 } else if storage_memory_bytes + (MIN_COMPUTE_MEMORY_MB << 20) + reserved_memory_bytes
512 >= cn_total_memory_bytes
513 {
514 tracing::warn!(
515 "No enough memory for computing and other system usage:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
516 convert(cn_total_memory_bytes as _),
517 convert(storage_memory_bytes as _)
518 );
519 MIN_COMPUTE_MEMORY_MB << 20
520 } else {
521 cn_total_memory_bytes - storage_memory_bytes - reserved_memory_bytes
522 }
523}
524
525fn total_storage_memory_limit_bytes(storage_memory_config: &StorageMemoryConfig) -> usize {
528 let total_storage_memory_mb = storage_memory_config.block_cache_capacity_mb
529 + storage_memory_config.meta_cache_capacity_mb
530 + storage_memory_config.shared_buffer_capacity_mb
531 + storage_memory_config.compactor_memory_limit_mb;
532 total_storage_memory_mb << 20
533}
534
535fn embedded_compactor_enabled(state_store_url: &str, disable_remote_compactor: bool) -> bool {
537 state_store_url.starts_with("hummock+memory")
539 || state_store_url.starts_with("hummock+disk")
540 || disable_remote_compactor
541}
542
543fn print_memory_config(
545 cn_total_memory_bytes: usize,
546 compute_memory_bytes: usize,
547 storage_memory_bytes: usize,
548 storage_memory_config: &StorageMemoryConfig,
549 embedded_compactor_enabled: bool,
550 reserved_memory_bytes: usize,
551) {
552 let memory_config = format!(
553 "Memory outline:\n\
554 > total_memory: {}\n\
555 > storage_memory: {}\n\
556 > block_cache_capacity: {}\n\
557 > meta_cache_capacity: {}\n\
558 > shared_buffer_capacity: {}\n\
559 > compactor_memory_limit: {}\n\
560 > compute_memory: {}\n\
561 > reserved_memory: {}",
562 convert(cn_total_memory_bytes as _),
563 convert(storage_memory_bytes as _),
564 convert((storage_memory_config.block_cache_capacity_mb << 20) as _),
565 convert((storage_memory_config.meta_cache_capacity_mb << 20) as _),
566 convert((storage_memory_config.shared_buffer_capacity_mb << 20) as _),
567 if embedded_compactor_enabled {
568 convert((storage_memory_config.compactor_memory_limit_mb << 20) as _)
569 } else {
570 "Not enabled".to_owned()
571 },
572 convert(compute_memory_bytes as _),
573 convert(reserved_memory_bytes as _),
574 );
575 info!("{}", memory_config);
576}