1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_batch::monitor::{
20 GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
21};
22use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
23use risingwave_batch::spill::spill_op::SpillOp;
24use risingwave_batch::task::{BatchEnvironment, BatchManager};
25use risingwave_common::config::{
26 AsyncStackTraceOption, MAX_CONNECTION_WINDOW_SIZE, MetricLevel, STREAM_WINDOW_SIZE,
27 StorageMemoryConfig, load_config,
28};
29use risingwave_common::license::LicenseManager;
30use risingwave_common::lru::init_global_sequencer_args;
31use risingwave_common::monitor::{RouterExt, TcpConfig};
32use risingwave_common::secret::LocalSecretManager;
33use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::telemetry::manager::TelemetryManager;
36use risingwave_common::telemetry::telemetry_env_enabled;
37use risingwave_common::util::addr::HostAddr;
38use risingwave_common::util::pretty_bytes::convert;
39use risingwave_common::util::tokio_util::sync::CancellationToken;
40use risingwave_common::{DATA_DIRECTORY, GIT_SHA, RW_VERSION, STATE_STORE_URL};
41use risingwave_common_heap_profiling::HeapProfiler;
42use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
43use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS;
44use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
45use risingwave_dml::dml_manager::DmlManager;
46use risingwave_pb::common::WorkerType;
47use risingwave_pb::common::worker_node::Property;
48use risingwave_pb::compute::config_service_server::ConfigServiceServer;
49use risingwave_pb::health::health_server::HealthServer;
50use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
51use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
52use risingwave_pb::task_service::batch_exchange_service_server::BatchExchangeServiceServer;
53use risingwave_pb::task_service::stream_exchange_service_server::StreamExchangeServiceServer;
54use risingwave_pb::task_service::task_service_server::TaskServiceServer;
55use risingwave_rpc_client::{ComputeClientPool, MetaClient};
56use risingwave_storage::StateStoreImpl;
57use risingwave_storage::hummock::MemoryLimiter;
58use risingwave_storage::hummock::compactor::{
59 CompactionExecutor, CompactorContext, new_compaction_await_tree_reg_ref, start_compactor,
60};
61use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
62use risingwave_storage::hummock::utils::HummockMemoryCollector;
63use risingwave_storage::monitor::{
64 GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, GLOBAL_OBJECT_STORE_METRICS,
65 global_hummock_state_store_metrics, global_storage_metrics, monitor_cache,
66};
67use risingwave_storage::opts::StorageOpts;
68use risingwave_stream::executor::monitor::global_streaming_metrics;
69use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
70use tokio::sync::oneshot::Sender;
71use tokio::task::JoinHandle;
72use tower::Layer;
73
74use crate::ComputeNodeOpts;
75use crate::memory::config::{
76 MIN_COMPUTE_MEMORY_MB, batch_mem_limit, reserve_memory_bytes, storage_memory_config,
77};
78use crate::memory::manager::{MemoryManager, MemoryManagerConfig};
79use crate::observer::observer_manager::ComputeObserverNode;
80use crate::rpc::service::batch_exchange_service::BatchExchangeServiceImpl;
81use crate::rpc::service::config_service::ConfigServiceImpl;
82use crate::rpc::service::health_service::HealthServiceImpl;
83use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl};
84use crate::rpc::service::stream_exchange_service::{
85 GLOBAL_STREAM_EXCHANGE_SERVICE_METRICS, StreamExchangeServiceImpl,
86};
87use crate::rpc::service::stream_service::StreamServiceImpl;
88use crate::telemetry::ComputeTelemetryCreator;
89pub async fn compute_node_serve(
93 listen_addr: SocketAddr,
94 advertise_addr: HostAddr,
95 opts: Arc<ComputeNodeOpts>,
96 shutdown: CancellationToken,
97) {
98 let config = Arc::new(load_config(&opts.config_path, &*opts));
100 info!("Starting compute node",);
101 info!("> config: {:?}", &*config);
102 info!(
103 "> debug assertions: {}",
104 if cfg!(debug_assertions) { "on" } else { "off" }
105 );
106 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
107
108 let stream_config = Arc::new(config.streaming.clone());
110 let batch_config = Arc::new(config.batch.clone());
111
112 init_global_sequencer_args(
114 config
115 .streaming
116 .developer
117 .memory_controller_sequence_tls_step,
118 config
119 .streaming
120 .developer
121 .memory_controller_sequence_tls_lag,
122 );
123
124 let (meta_client, system_params) = MetaClient::register_new(
126 opts.meta_address.clone(),
127 WorkerType::ComputeNode,
128 &advertise_addr,
129 Property {
130 parallelism: opts.parallelism as u32,
131 is_streaming: opts.role.for_streaming(),
132 is_serving: opts.role.for_serving(),
133 is_unschedulable: false,
134 internal_rpc_host_addr: "".to_owned(),
135 resource_group: Some(opts.resource_group.clone()),
136 is_iceberg_compactor: false,
137 },
138 Arc::new(config.meta.clone()),
139 )
140 .await;
141 let mut sub_tasks: Vec<(JoinHandle<()>, Sender<()>)> = vec![];
143 sub_tasks.push(MetaClient::start_heartbeat_loop(
144 meta_client.clone(),
145 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
146 ));
147
148 let state_store_url = system_params.state_store();
149 let data_directory = system_params.data_directory();
150
151 let embedded_compactor_enabled =
152 embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);
153
154 let (reserved_memory_bytes, non_reserved_memory_bytes) = reserve_memory_bytes(&opts);
155 let storage_memory_config = storage_memory_config(
156 non_reserved_memory_bytes,
157 embedded_compactor_enabled,
158 &config.storage,
159 !opts.role.for_streaming(),
160 );
161
162 let storage_memory_bytes = total_storage_memory_limit_bytes(&storage_memory_config);
163 let compute_memory_bytes = validate_compute_node_memory_config(
164 opts.total_memory_bytes,
165 reserved_memory_bytes,
166 storage_memory_bytes,
167 );
168 print_memory_config(
169 opts.total_memory_bytes,
170 compute_memory_bytes,
171 storage_memory_bytes,
172 &storage_memory_config,
173 embedded_compactor_enabled,
174 reserved_memory_bytes,
175 );
176
177 let storage_opts = Arc::new(StorageOpts::from((
178 &*config,
179 &system_params,
180 &storage_memory_config,
181 )));
182
183 let worker_id = meta_client.worker_id();
184 info!("Assigned worker node id {}", worker_id);
185
186 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
188 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
189 let streaming_metrics = Arc::new(global_streaming_metrics(config.server.metrics_level));
190 let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
191 let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone());
192 let stream_exchange_srv_metrics = Arc::new(GLOBAL_STREAM_EXCHANGE_SERVICE_METRICS.clone());
193 let batch_spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
194 let iceberg_scan_metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
195
196 let state_store_metrics = Arc::new(global_hummock_state_store_metrics(
198 config.server.metrics_level,
199 ));
200 let object_store_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
201 let storage_metrics = Arc::new(global_storage_metrics(config.server.metrics_level));
202 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
203 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
204 meta_client.clone(),
205 hummock_metrics.clone(),
206 ));
207
208 let await_tree_config = match &config.streaming.async_stack_trace {
209 AsyncStackTraceOption::Off => None,
210 c => await_tree::ConfigBuilder::default()
211 .verbose(c.is_verbose().unwrap())
212 .build()
213 .ok(),
214 };
215 if let Err(existing_url) = STATE_STORE_URL.set(state_store_url.to_owned()) {
218 assert_eq!(
219 existing_url, state_store_url,
220 "STATE_STORE_URL already set with different value"
221 );
222 }
223
224 if let Err(existing_dir) = DATA_DIRECTORY.set(data_directory.to_owned()) {
227 assert_eq!(
228 existing_dir, data_directory,
229 "DATA_DIRECTORY already set with different value"
230 );
231 }
232 LicenseManager::get().refresh(system_params.license_key());
233 let state_store = Box::pin(StateStoreImpl::new(
234 state_store_url,
235 storage_opts.clone(),
236 hummock_meta_client.clone(),
237 state_store_metrics.clone(),
238 object_store_metrics,
239 storage_metrics.clone(),
240 compactor_metrics.clone(),
241 await_tree_config.clone(),
242 system_params.use_new_object_prefix_strategy(),
243 ))
244 .await
245 .unwrap();
246
247 LocalSecretManager::init(
248 opts.temp_secret_file_dir.clone(),
249 meta_client.cluster_id().to_owned(),
250 worker_id,
251 );
252
253 let batch_client_pool = Arc::new(ComputeClientPool::new(
255 config.batch_exchange_connection_pool_size(),
256 config.batch.developer.compute_client_config.clone(),
257 ));
258 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
259 let compute_observer_node =
260 ComputeObserverNode::new(system_params_manager.clone(), batch_client_pool.clone());
261 let observer_manager =
262 ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
263 observer_manager.start().await;
264
265 if let Some(storage) = state_store.as_hummock() {
266 if embedded_compactor_enabled {
267 tracing::info!("start embedded compactor");
268 let memory_limiter = Arc::new(MemoryLimiter::new(
269 storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
270 ));
271
272 let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
273 let compactor_context = CompactorContext {
274 storage_opts,
275 sstable_store: storage.sstable_store(),
276 compactor_metrics: compactor_metrics.clone(),
277 is_share_buffer_compact: false,
278 compaction_executor,
279 memory_limiter,
280
281 task_progress_manager: Default::default(),
282 await_tree_reg: await_tree_config
283 .clone()
284 .map(new_compaction_await_tree_reg_ref),
285 };
286
287 let (handle, shutdown_sender) = start_compactor(
288 compactor_context,
289 hummock_meta_client.clone(),
290 storage.object_id_manager().clone(),
291 storage.compaction_catalog_manager_ref(),
292 );
293 sub_tasks.push((handle, shutdown_sender));
294 }
295 let flush_limiter = storage.get_memory_limiter();
296 let memory_collector = Arc::new(HummockMemoryCollector::new(
297 storage.sstable_store(),
298 flush_limiter,
299 storage_memory_config,
300 ));
301 monitor_cache(memory_collector);
302 let backup_reader = storage.backup_reader();
303 let system_params_mgr = system_params_manager.clone();
304 tokio::spawn(async move {
305 backup_reader
306 .watch_config_change(system_params_mgr.watch_params())
307 .await;
308 });
309 }
310
311 let batch_mgr = Arc::new(BatchManager::new(
313 config.batch.clone(),
314 batch_manager_metrics,
315 batch_mem_limit(compute_memory_bytes, opts.role.for_serving()),
316 ));
317
318 let target_memory = if let Some(v) = opts.memory_manager_target_bytes {
319 v
320 } else {
321 compute_memory_bytes + storage_memory_bytes
322 };
323
324 let memory_mgr = MemoryManager::new(MemoryManagerConfig {
325 target_memory,
326 threshold_aggressive: config
327 .streaming
328 .developer
329 .memory_controller_threshold_aggressive,
330 threshold_graceful: config
331 .streaming
332 .developer
333 .memory_controller_threshold_graceful,
334 threshold_stable: config
335 .streaming
336 .developer
337 .memory_controller_threshold_stable,
338 eviction_factor_stable: config
339 .streaming
340 .developer
341 .memory_controller_eviction_factor_stable,
342 eviction_factor_graceful: config
343 .streaming
344 .developer
345 .memory_controller_eviction_factor_graceful,
346 eviction_factor_aggressive: config
347 .streaming
348 .developer
349 .memory_controller_eviction_factor_aggressive,
350 metrics: streaming_metrics.clone(),
351 });
352
353 tokio::spawn(
355 memory_mgr.clone().run(Duration::from_millis(
356 config
357 .streaming
358 .developer
359 .memory_controller_update_interval_ms as _,
360 )),
361 );
362
363 let heap_profiler = HeapProfiler::new(
364 opts.total_memory_bytes,
365 config.server.heap_profiling.clone(),
366 );
367 heap_profiler.start();
369
370 let dml_mgr = Arc::new(DmlManager::new(
371 worker_id,
372 config.streaming.developer.dml_channel_initial_permits,
373 ));
374
375 let batch_env = BatchEnvironment::new(
377 batch_mgr.clone(),
378 advertise_addr.clone(),
379 batch_config,
380 worker_id,
381 state_store.clone(),
382 batch_executor_metrics.clone(),
383 batch_client_pool,
384 dml_mgr.clone(),
385 source_metrics.clone(),
386 batch_spill_metrics.clone(),
387 iceberg_scan_metrics.clone(),
388 config.server.metrics_level,
389 );
390
391 let stream_client_pool = Arc::new(ComputeClientPool::new(
393 config.streaming_exchange_connection_pool_size(),
394 config.streaming.developer.compute_client_config.clone(),
395 ));
396 let stream_env = StreamEnvironment::new(
397 advertise_addr.clone(),
398 stream_config,
399 worker_id,
400 state_store.clone(),
401 dml_mgr,
402 system_params_manager.clone(),
403 source_metrics,
404 meta_client.clone(),
405 stream_client_pool,
406 );
407
408 let stream_mgr = LocalStreamManager::new(
409 stream_env.clone(),
410 streaming_metrics.clone(),
411 await_tree_config.clone(),
412 memory_mgr.get_watermark_sequence(),
413 );
414
415 let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env);
417 let batch_exchange_srv = BatchExchangeServiceImpl::new(batch_mgr.clone());
418 let stream_exchange_srv =
419 StreamExchangeServiceImpl::new(stream_mgr.clone(), stream_exchange_srv_metrics);
420 let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone());
421 let (meta_cache, block_cache) = if let Some(hummock) = state_store.as_hummock() {
422 (
423 Some(hummock.sstable_store().meta_cache().clone()),
424 Some(hummock.sstable_store().block_cache().clone()),
425 )
426 } else {
427 (None, None)
428 };
429 let monitor_srv = MonitorServiceImpl::new(
430 stream_mgr.clone(),
431 config.server.clone(),
432 meta_cache.clone(),
433 block_cache.clone(),
434 );
435 let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
436 let health_srv = HealthServiceImpl::new();
437
438 let telemetry_manager = TelemetryManager::new(
439 Arc::new(meta_client.clone()),
440 Arc::new(ComputeTelemetryCreator::new()),
441 );
442
443 if config.server.telemetry_enabled && telemetry_env_enabled() {
446 sub_tasks.push(telemetry_manager.start().await);
447 } else {
448 tracing::info!("Telemetry didn't start due to config");
449 }
450
451 #[cfg(not(madsim))]
453 if config.batch.enable_spill {
454 SpillOp::clean_spill_directory().await.unwrap();
455 }
456
457 let server = tonic::transport::Server::builder()
458 .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
459 .initial_stream_window_size(STREAM_WINDOW_SIZE)
460 .http2_max_pending_accept_reset_streams(Some(config.server.grpc_max_reset_stream as usize))
461 .layer(TracingExtractLayer::new())
462 .add_service(TaskServiceServer::new(batch_srv).max_decoding_message_size(usize::MAX))
464 .add_service(
465 BatchExchangeServiceServer::new(batch_exchange_srv)
466 .max_decoding_message_size(usize::MAX),
467 )
468 .add_service(
469 StreamExchangeServiceServer::new(stream_exchange_srv)
470 .max_decoding_message_size(usize::MAX),
471 )
472 .add_service({
473 let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned();
474 let srv = StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX);
475 #[cfg(madsim)]
476 {
477 srv
478 }
479 #[cfg(not(madsim))]
480 {
481 AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv)
482 }
483 })
484 .add_service(MonitorServiceServer::new(monitor_srv))
485 .add_service(ConfigServiceServer::new(config_srv))
486 .add_service(HealthServer::new(health_srv))
487 .monitored_serve_with_shutdown(
488 listen_addr,
489 "grpc-compute-node-service",
490 TcpConfig {
491 tcp_nodelay: true,
492 keepalive_duration: None,
493 },
494 shutdown.clone().cancelled_owned(),
495 );
496 let _server_handle = tokio::spawn(server);
497
498 if config.server.metrics_level > MetricLevel::Disabled {
500 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
501 }
502
503 meta_client.activate(&advertise_addr).await.unwrap();
505 shutdown.cancelled().await;
507
508 meta_client.try_unregister().await;
512 let _ = stream_mgr.shutdown().await;
514
515 }
520
521fn validate_compute_node_memory_config(
527 cn_total_memory_bytes: usize,
528 reserved_memory_bytes: usize,
529 storage_memory_bytes: usize,
530) -> usize {
531 if storage_memory_bytes > cn_total_memory_bytes {
532 tracing::warn!(
533 "The storage memory exceeds the total compute node memory:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
534 convert(cn_total_memory_bytes as _),
535 convert(storage_memory_bytes as _)
536 );
537 MIN_COMPUTE_MEMORY_MB << 20
538 } else if storage_memory_bytes + (MIN_COMPUTE_MEMORY_MB << 20) + reserved_memory_bytes
539 >= cn_total_memory_bytes
540 {
541 tracing::warn!(
542 "No enough memory for computing and other system usage:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
543 convert(cn_total_memory_bytes as _),
544 convert(storage_memory_bytes as _)
545 );
546 MIN_COMPUTE_MEMORY_MB << 20
547 } else {
548 cn_total_memory_bytes - storage_memory_bytes - reserved_memory_bytes
549 }
550}
551
552fn total_storage_memory_limit_bytes(storage_memory_config: &StorageMemoryConfig) -> usize {
555 let total_storage_memory_mb = storage_memory_config.block_cache_capacity_mb
556 + storage_memory_config.meta_cache_capacity_mb
557 + storage_memory_config.shared_buffer_capacity_mb
558 + storage_memory_config.compactor_memory_limit_mb;
559 total_storage_memory_mb << 20
560}
561
562fn embedded_compactor_enabled(state_store_url: &str, disable_remote_compactor: bool) -> bool {
564 state_store_url.starts_with("hummock+memory")
566 || state_store_url.starts_with("hummock+disk")
567 || disable_remote_compactor
568}
569
570fn print_memory_config(
572 cn_total_memory_bytes: usize,
573 compute_memory_bytes: usize,
574 storage_memory_bytes: usize,
575 storage_memory_config: &StorageMemoryConfig,
576 embedded_compactor_enabled: bool,
577 reserved_memory_bytes: usize,
578) {
579 let memory_config = format!(
580 "Memory outline:\n\
581 > total_memory: {}\n\
582 > storage_memory: {}\n\
583 > block_cache_capacity: {}\n\
584 > meta_cache_capacity: {}\n\
585 > shared_buffer_capacity: {}\n\
586 > compactor_memory_limit: {}\n\
587 > compute_memory: {}\n\
588 > reserved_memory: {}",
589 convert(cn_total_memory_bytes as _),
590 convert(storage_memory_bytes as _),
591 convert((storage_memory_config.block_cache_capacity_mb << 20) as _),
592 convert((storage_memory_config.meta_cache_capacity_mb << 20) as _),
593 convert((storage_memory_config.shared_buffer_capacity_mb << 20) as _),
594 if embedded_compactor_enabled {
595 convert((storage_memory_config.compactor_memory_limit_mb << 20) as _)
596 } else {
597 "Not enabled".to_owned()
598 },
599 convert(compute_memory_bytes as _),
600 convert(reserved_memory_bytes as _),
601 );
602 info!("{}", memory_config);
603}