1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20 AsyncStackTraceOption, MetricLevel, RwConfig, extract_storage_memory_config, load_config,
21};
22use risingwave_common::monitor::{RouterExt, TcpConfig};
23use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
24use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
25use risingwave_common::telemetry::manager::TelemetryManager;
26use risingwave_common::telemetry::telemetry_env_enabled;
27use risingwave_common::util::addr::HostAddr;
28use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
29use risingwave_common::util::tokio_util::sync::CancellationToken;
30use risingwave_common::{GIT_SHA, RW_VERSION};
31use risingwave_common_heap_profiling::HeapProfiler;
32use risingwave_common_service::{MetricsManager, ObserverManager};
33use risingwave_object_store::object::build_remote_object_store;
34use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
35use risingwave_pb::common::WorkerType;
36use risingwave_pb::common::worker_node::Property;
37use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
38use risingwave_pb::configured_monitor_service_server;
39use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
40use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
41use risingwave_storage::compaction_catalog_manager::{
42 CompactionCatalogManager, RemoteTableAccessor,
43};
44use risingwave_storage::hummock::compactor::{
45 CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
46 new_compaction_await_tree_reg_ref,
47};
48use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
49use risingwave_storage::hummock::utils::HummockMemoryCollector;
50use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
51use risingwave_storage::monitor::{
52 CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
53};
54use risingwave_storage::opts::StorageOpts;
55use tokio::sync::mpsc;
56use tracing::info;
57
58use super::compactor_observer::observer_manager::CompactorObserverNode;
59use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
60use crate::telemetry::CompactorTelemetryCreator;
61use crate::{
62 CompactorMode, CompactorOpts, default_rpc_max_decoding_message_size_bytes,
63 default_rpc_max_encoding_message_size_bytes,
64};
65
66pub async fn prepare_start_parameters(
67 compactor_opts: &CompactorOpts,
68 config: RwConfig,
69 system_params_reader: SystemParamsReader,
70) -> (
71 Arc<SstableStore>,
72 Arc<MemoryLimiter>,
73 HeapProfiler,
74 Option<CompactionAwaitTreeRegRef>,
75 Arc<StorageOpts>,
76 Arc<CompactorMetrics>,
77) {
78 let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
80 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
81
82 let state_store_url = system_params_reader.state_store();
83
84 let storage_memory_config = extract_storage_memory_config(&config);
85 let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
86 &config,
87 &system_params_reader,
88 &storage_memory_config,
89 )));
90 let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
91 * config.storage.compactor_memory_available_proportion)
92 as usize;
93 let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
94 let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
95 Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
96 None => non_reserved_memory_bytes,
97 };
98
99 compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
100 panic!(
101 "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
102 compactor_memory_limit_bytes,
103 meta_cache_capacity_bytes
104 );
105 });
106
107 tracing::info!(
108 "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
109 non_reserved_memory_bytes,
110 meta_cache_capacity_bytes,
111 compactor_memory_limit_bytes,
112 storage_opts.sstable_size_mb * (1 << 20),
113 storage_opts.block_size_kb * (1 << 10),
114 );
115
116 {
118 let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
121 + storage_opts.block_size_kb * (1 << 10))
122 as u64;
123
124 assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
125 }
126
127 let object_store = build_remote_object_store(
128 state_store_url
129 .strip_prefix("hummock+")
130 .expect("object store must be hummock for compactor server"),
131 object_metrics,
132 "Hummock",
133 Arc::new(config.storage.object_store.clone()),
134 )
135 .await;
136
137 let object_store = Arc::new(object_store);
138 let sstable_store = Arc::new(
139 SstableStore::for_compactor(
140 object_store,
141 storage_opts.data_directory.clone(),
142 0,
143 meta_cache_capacity_bytes,
144 system_params_reader.use_new_object_prefix_strategy(),
145 )
146 .await
147 .unwrap(),
149 );
150
151 let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
152 let storage_memory_config = extract_storage_memory_config(&config);
153 let memory_collector = Arc::new(HummockMemoryCollector::new(
154 sstable_store.clone(),
155 memory_limiter.clone(),
156 storage_memory_config,
157 ));
158
159 let heap_profiler = HeapProfiler::new(
160 system_memory_available_bytes(),
161 config.server.heap_profiling.clone(),
162 );
163
164 monitor_cache(memory_collector);
165
166 let await_tree_config = match &config.streaming.async_stack_trace {
167 AsyncStackTraceOption::Off => None,
168 c => await_tree::ConfigBuilder::default()
169 .verbose(c.is_verbose().unwrap())
170 .build()
171 .ok(),
172 };
173 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
174
175 (
176 sstable_store,
177 memory_limiter,
178 heap_profiler,
179 await_tree_reg,
180 storage_opts,
181 compactor_metrics,
182 )
183}
184
185pub async fn compactor_serve(
189 listen_addr: SocketAddr,
190 advertise_addr: HostAddr,
191 opts: CompactorOpts,
192 shutdown: CancellationToken,
193 compactor_mode: CompactorMode,
194) {
195 let config = load_config(&opts.config_path, &opts);
196 info!("Starting compactor node",);
197 info!("> config: {:?}", config);
198 info!(
199 "> debug assertions: {}",
200 if cfg!(debug_assertions) { "on" } else { "off" }
201 );
202 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
203
204 let is_iceberg_compactor = matches!(
205 compactor_mode,
206 CompactorMode::DedicatedIceberg | CompactorMode::SharedIceberg
207 );
208
209 let compaction_executor = Arc::new(CompactionExecutor::new(
210 opts.compaction_worker_threads_number,
211 ));
212
213 let max_task_parallelism: u32 = (compaction_executor.worker_num() as f32
214 * config.storage.compactor_max_task_multiplier)
215 .ceil() as u32;
216
217 let (meta_client, system_params_reader) = MetaClient::register_new(
219 opts.meta_address.clone(),
220 WorkerType::Compactor,
221 &advertise_addr,
222 Property {
223 is_iceberg_compactor,
224 parallelism: max_task_parallelism,
225 ..Default::default()
226 },
227 Arc::new(config.meta.clone()),
228 )
229 .await;
230
231 info!("Assigned compactor id {}", meta_client.worker_id());
232
233 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
234
235 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
236 meta_client.clone(),
237 hummock_metrics.clone(),
238 ));
239
240 let (
241 sstable_store,
242 memory_limiter,
243 heap_profiler,
244 await_tree_reg,
245 storage_opts,
246 compactor_metrics,
247 ) = Box::pin(prepare_start_parameters(
248 &opts,
249 config.clone(),
250 system_params_reader.clone(),
251 ))
252 .await;
253
254 let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
255 RemoteTableAccessor::new(meta_client.clone()),
256 )));
257
258 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
259 let compactor_observer_node = CompactorObserverNode::new(
260 compaction_catalog_manager_ref.clone(),
261 system_params_manager.clone(),
262 );
263 let observer_manager =
264 ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
265
266 heap_profiler.start();
268
269 let _observer_join_handle = observer_manager.start().await;
272
273 let object_id_manager = Arc::new(ObjectIdManager::new(
274 hummock_meta_client.clone(),
275 storage_opts.sstable_id_remote_fetch_number,
276 ));
277
278 let compactor_context = CompactorContext {
279 storage_opts,
280 sstable_store: sstable_store.clone(),
281 compactor_metrics,
282 is_share_buffer_compact: false,
283 compaction_executor,
284 memory_limiter,
285 task_progress_manager: Default::default(),
286 await_tree_reg: await_tree_reg.clone(),
287 };
288
289 let mut sub_tasks = vec![
291 MetaClient::start_heartbeat_loop(
292 meta_client.clone(),
293 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
294 ),
295 match compactor_mode {
296 CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
297 compactor_context.clone(),
298 hummock_meta_client.clone(),
299 object_id_manager.clone(),
300 compaction_catalog_manager_ref,
301 ),
302 CompactorMode::Shared => unreachable!(),
303 CompactorMode::DedicatedIceberg => {
304 risingwave_storage::hummock::compactor::start_iceberg_compactor(
305 compactor_context.clone(),
306 hummock_meta_client.clone(),
307 )
308 }
309 CompactorMode::SharedIceberg => unreachable!(),
310 },
311 ];
312
313 let telemetry_manager = TelemetryManager::new(
314 Arc::new(meta_client.clone()),
315 Arc::new(CompactorTelemetryCreator::new()),
316 );
317 if config.server.telemetry_enabled && telemetry_env_enabled() {
320 sub_tasks.push(telemetry_manager.start().await);
321 } else {
322 tracing::info!("Telemetry didn't start due to config");
323 }
324
325 let compactor_srv = CompactorServiceImpl::default();
326 let monitor_srv = MonitorServiceImpl::new(await_tree_reg, config.server.clone());
327 let server = tonic::transport::Server::builder()
328 .add_service(CompactorServiceServer::new(compactor_srv))
329 .add_service(configured_monitor_service_server(
330 MonitorServiceServer::new(monitor_srv),
331 ))
332 .monitored_serve_with_shutdown(
333 listen_addr,
334 "grpc-compactor-node-service",
335 TcpConfig {
336 tcp_nodelay: true,
337 keepalive_duration: None,
338 },
339 shutdown.clone().cancelled_owned(),
340 );
341 let _server_handle = tokio::spawn(server);
342
343 if config.server.metrics_level > MetricLevel::Disabled {
345 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
346 }
347
348 meta_client.activate(&advertise_addr).await.unwrap();
350
351 shutdown.cancelled().await;
353 meta_client.try_unregister().await;
355}
356
357pub async fn shared_compactor_serve(
361 listen_addr: SocketAddr,
362 opts: CompactorOpts,
363 shutdown: CancellationToken,
364) {
365 let config = load_config(&opts.config_path, &opts);
366 info!("Starting shared compactor node",);
367 info!("> config: {:?}", config);
368 info!(
369 "> debug assertions: {}",
370 if cfg!(debug_assertions) { "on" } else { "off" }
371 );
372 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
373
374 let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
375 let system_params_response = grpc_proxy_client
376 .get_system_params()
377 .await
378 .expect("Fail to get system params, the compactor pod cannot be started.");
379 let system_params = system_params_response.into_inner().params.unwrap();
380
381 let (
382 sstable_store,
383 memory_limiter,
384 heap_profiler,
385 await_tree_reg,
386 storage_opts,
387 compactor_metrics,
388 ) = Box::pin(prepare_start_parameters(
389 &opts,
390 config.clone(),
391 system_params.into(),
392 ))
393 .await;
394 let (sender, receiver) = mpsc::unbounded_channel();
395 let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
396
397 let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone(), config.server.clone());
398
399 heap_profiler.start();
401
402 let compaction_executor = Arc::new(CompactionExecutor::new(
403 opts.compaction_worker_threads_number,
404 ));
405 let compactor_context = CompactorContext {
406 storage_opts,
407 sstable_store,
408 compactor_metrics,
409 is_share_buffer_compact: false,
410 compaction_executor,
411 memory_limiter,
412 task_progress_manager: Default::default(),
413 await_tree_reg,
414 };
415
416 let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
419 grpc_proxy_client,
420 receiver,
421 compactor_context,
422 );
423
424 let rpc_max_encoding_message_size_bytes = opts
425 .rpc_max_encoding_message_size_bytes
426 .unwrap_or(default_rpc_max_encoding_message_size_bytes());
427
428 let rpc_max_decoding_message_size_bytes = opts
429 .rpc_max_decoding_message_size_bytes
430 .unwrap_or(default_rpc_max_decoding_message_size_bytes());
431
432 let server = tonic::transport::Server::builder()
433 .add_service(
434 CompactorServiceServer::new(compactor_srv)
435 .max_decoding_message_size(rpc_max_decoding_message_size_bytes)
436 .max_encoding_message_size(rpc_max_encoding_message_size_bytes),
437 )
438 .add_service(configured_monitor_service_server(
439 MonitorServiceServer::new(monitor_srv),
440 ))
441 .monitored_serve_with_shutdown(
442 listen_addr,
443 "grpc-compactor-node-service",
444 TcpConfig {
445 tcp_nodelay: true,
446 keepalive_duration: None,
447 },
448 shutdown.clone().cancelled_owned(),
449 );
450
451 let _server_handle = tokio::spawn(server);
452
453 if config.server.metrics_level > MetricLevel::Disabled {
455 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
456 }
457
458 shutdown.cancelled().await;
460
461 }