1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20 AsyncStackTraceOption, MetricLevel, RwConfig, extract_storage_memory_config, load_config,
21};
22use risingwave_common::monitor::{RouterExt, TcpConfig};
23use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
24use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
25use risingwave_common::telemetry::manager::TelemetryManager;
26use risingwave_common::telemetry::telemetry_env_enabled;
27use risingwave_common::util::addr::HostAddr;
28use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
29use risingwave_common::util::tokio_util::sync::CancellationToken;
30use risingwave_common::{GIT_SHA, RW_VERSION};
31use risingwave_common_heap_profiling::HeapProfiler;
32use risingwave_common_service::{MetricsManager, ObserverManager};
33use risingwave_object_store::object::build_remote_object_store;
34use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
35use risingwave_pb::common::WorkerType;
36use risingwave_pb::common::worker_node::Property;
37use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
38use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
39use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
40use risingwave_storage::compaction_catalog_manager::{
41 CompactionCatalogManager, RemoteTableAccessor,
42};
43use risingwave_storage::hummock::compactor::{
44 CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
45 new_compaction_await_tree_reg_ref,
46};
47use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
48use risingwave_storage::hummock::utils::HummockMemoryCollector;
49use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
50use risingwave_storage::monitor::{
51 CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
52};
53use risingwave_storage::opts::StorageOpts;
54use tokio::sync::mpsc;
55use tracing::info;
56
57use super::compactor_observer::observer_manager::CompactorObserverNode;
58use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
59use crate::telemetry::CompactorTelemetryCreator;
60use crate::{
61 CompactorMode, CompactorOpts, default_rpc_max_decoding_message_size_bytes,
62 default_rpc_max_encoding_message_size_bytes,
63};
64
65pub async fn prepare_start_parameters(
66 compactor_opts: &CompactorOpts,
67 config: RwConfig,
68 system_params_reader: SystemParamsReader,
69) -> (
70 Arc<SstableStore>,
71 Arc<MemoryLimiter>,
72 HeapProfiler,
73 Option<CompactionAwaitTreeRegRef>,
74 Arc<StorageOpts>,
75 Arc<CompactorMetrics>,
76) {
77 let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
79 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
80
81 let state_store_url = system_params_reader.state_store();
82
83 let storage_memory_config = extract_storage_memory_config(&config);
84 let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
85 &config,
86 &system_params_reader,
87 &storage_memory_config,
88 )));
89 let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
90 * config.storage.compactor_memory_available_proportion)
91 as usize;
92 let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
93 let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
94 Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
95 None => non_reserved_memory_bytes,
96 };
97
98 compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
99 panic!(
100 "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
101 compactor_memory_limit_bytes,
102 meta_cache_capacity_bytes
103 );
104 });
105
106 tracing::info!(
107 "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
108 non_reserved_memory_bytes,
109 meta_cache_capacity_bytes,
110 compactor_memory_limit_bytes,
111 storage_opts.sstable_size_mb * (1 << 20),
112 storage_opts.block_size_kb * (1 << 10),
113 );
114
115 {
117 let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
120 + storage_opts.block_size_kb * (1 << 10))
121 as u64;
122
123 assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
124 }
125
126 let object_store = build_remote_object_store(
127 state_store_url
128 .strip_prefix("hummock+")
129 .expect("object store must be hummock for compactor server"),
130 object_metrics,
131 "Hummock",
132 Arc::new(config.storage.object_store.clone()),
133 )
134 .await;
135
136 let object_store = Arc::new(object_store);
137 let sstable_store = Arc::new(
138 SstableStore::for_compactor(
139 object_store,
140 storage_opts.data_directory.clone(),
141 0,
142 meta_cache_capacity_bytes,
143 system_params_reader.use_new_object_prefix_strategy(),
144 )
145 .await
146 .unwrap(),
148 );
149
150 let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
151 let storage_memory_config = extract_storage_memory_config(&config);
152 let memory_collector = Arc::new(HummockMemoryCollector::new(
153 sstable_store.clone(),
154 memory_limiter.clone(),
155 storage_memory_config,
156 ));
157
158 let heap_profiler = HeapProfiler::new(
159 system_memory_available_bytes(),
160 config.server.heap_profiling.clone(),
161 );
162
163 monitor_cache(memory_collector);
164
165 let await_tree_config = match &config.streaming.async_stack_trace {
166 AsyncStackTraceOption::Off => None,
167 c => await_tree::ConfigBuilder::default()
168 .verbose(c.is_verbose().unwrap())
169 .build()
170 .ok(),
171 };
172 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
173
174 (
175 sstable_store,
176 memory_limiter,
177 heap_profiler,
178 await_tree_reg,
179 storage_opts,
180 compactor_metrics,
181 )
182}
183
184pub async fn compactor_serve(
188 listen_addr: SocketAddr,
189 advertise_addr: HostAddr,
190 opts: CompactorOpts,
191 shutdown: CancellationToken,
192 compactor_mode: CompactorMode,
193) {
194 let config = load_config(&opts.config_path, &opts);
195 info!("Starting compactor node",);
196 info!("> config: {:?}", config);
197 info!(
198 "> debug assertions: {}",
199 if cfg!(debug_assertions) { "on" } else { "off" }
200 );
201 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
202
203 let is_iceberg_compactor = matches!(
204 compactor_mode,
205 CompactorMode::DedicatedIceberg | CompactorMode::SharedIceberg
206 );
207
208 let compaction_executor = Arc::new(CompactionExecutor::new(
209 opts.compaction_worker_threads_number,
210 ));
211
212 let max_task_parallelism: u32 = (compaction_executor.worker_num() as f32
213 * config.storage.compactor_max_task_multiplier)
214 .ceil() as u32;
215
216 let (meta_client, system_params_reader) = MetaClient::register_new(
218 opts.meta_address.clone(),
219 WorkerType::Compactor,
220 &advertise_addr,
221 Property {
222 is_iceberg_compactor,
223 parallelism: max_task_parallelism,
224 ..Default::default()
225 },
226 Arc::new(config.meta.clone()),
227 )
228 .await;
229
230 info!("Assigned compactor id {}", meta_client.worker_id());
231
232 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
233
234 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
235 meta_client.clone(),
236 hummock_metrics.clone(),
237 ));
238
239 let (
240 sstable_store,
241 memory_limiter,
242 heap_profiler,
243 await_tree_reg,
244 storage_opts,
245 compactor_metrics,
246 ) = Box::pin(prepare_start_parameters(
247 &opts,
248 config.clone(),
249 system_params_reader.clone(),
250 ))
251 .await;
252
253 let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
254 RemoteTableAccessor::new(meta_client.clone()),
255 )));
256
257 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
258 let compactor_observer_node = CompactorObserverNode::new(
259 compaction_catalog_manager_ref.clone(),
260 system_params_manager.clone(),
261 );
262 let observer_manager =
263 ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
264
265 heap_profiler.start();
267
268 let _observer_join_handle = observer_manager.start().await;
271
272 let object_id_manager = Arc::new(ObjectIdManager::new(
273 hummock_meta_client.clone(),
274 storage_opts.sstable_id_remote_fetch_number,
275 ));
276
277 let compactor_context = CompactorContext {
278 storage_opts,
279 sstable_store: sstable_store.clone(),
280 compactor_metrics,
281 is_share_buffer_compact: false,
282 compaction_executor,
283 memory_limiter,
284 task_progress_manager: Default::default(),
285 await_tree_reg: await_tree_reg.clone(),
286 };
287
288 let mut sub_tasks = vec![
290 MetaClient::start_heartbeat_loop(
291 meta_client.clone(),
292 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
293 ),
294 match compactor_mode {
295 CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
296 compactor_context.clone(),
297 hummock_meta_client.clone(),
298 object_id_manager.clone(),
299 compaction_catalog_manager_ref,
300 ),
301 CompactorMode::Shared => unreachable!(),
302 CompactorMode::DedicatedIceberg => {
303 risingwave_storage::hummock::compactor::start_iceberg_compactor(
304 compactor_context.clone(),
305 hummock_meta_client.clone(),
306 )
307 }
308 CompactorMode::SharedIceberg => unreachable!(),
309 },
310 ];
311
312 let telemetry_manager = TelemetryManager::new(
313 Arc::new(meta_client.clone()),
314 Arc::new(CompactorTelemetryCreator::new()),
315 );
316 if config.server.telemetry_enabled && telemetry_env_enabled() {
319 sub_tasks.push(telemetry_manager.start().await);
320 } else {
321 tracing::info!("Telemetry didn't start due to config");
322 }
323
324 let compactor_srv = CompactorServiceImpl::default();
325 let monitor_srv = MonitorServiceImpl::new(await_tree_reg, config.server.clone());
326 let server = tonic::transport::Server::builder()
327 .add_service(CompactorServiceServer::new(compactor_srv))
328 .add_service(MonitorServiceServer::new(monitor_srv))
329 .monitored_serve_with_shutdown(
330 listen_addr,
331 "grpc-compactor-node-service",
332 TcpConfig {
333 tcp_nodelay: true,
334 keepalive_duration: None,
335 },
336 shutdown.clone().cancelled_owned(),
337 );
338 let _server_handle = tokio::spawn(server);
339
340 if config.server.metrics_level > MetricLevel::Disabled {
342 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
343 }
344
345 meta_client.activate(&advertise_addr).await.unwrap();
347
348 shutdown.cancelled().await;
350 meta_client.try_unregister().await;
352}
353
354pub async fn shared_compactor_serve(
358 listen_addr: SocketAddr,
359 opts: CompactorOpts,
360 shutdown: CancellationToken,
361) {
362 let config = load_config(&opts.config_path, &opts);
363 info!("Starting shared compactor node",);
364 info!("> config: {:?}", config);
365 info!(
366 "> debug assertions: {}",
367 if cfg!(debug_assertions) { "on" } else { "off" }
368 );
369 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
370
371 let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
372 let system_params_response = grpc_proxy_client
373 .get_system_params()
374 .await
375 .expect("Fail to get system params, the compactor pod cannot be started.");
376 let system_params = system_params_response.into_inner().params.unwrap();
377
378 let (
379 sstable_store,
380 memory_limiter,
381 heap_profiler,
382 await_tree_reg,
383 storage_opts,
384 compactor_metrics,
385 ) = Box::pin(prepare_start_parameters(
386 &opts,
387 config.clone(),
388 system_params.into(),
389 ))
390 .await;
391 let (sender, receiver) = mpsc::unbounded_channel();
392 let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
393
394 let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone(), config.server.clone());
395
396 heap_profiler.start();
398
399 let compaction_executor = Arc::new(CompactionExecutor::new(
400 opts.compaction_worker_threads_number,
401 ));
402 let compactor_context = CompactorContext {
403 storage_opts,
404 sstable_store,
405 compactor_metrics,
406 is_share_buffer_compact: false,
407 compaction_executor,
408 memory_limiter,
409 task_progress_manager: Default::default(),
410 await_tree_reg,
411 };
412
413 let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
416 grpc_proxy_client,
417 receiver,
418 compactor_context,
419 );
420
421 let rpc_max_encoding_message_size_bytes = opts
422 .rpc_max_encoding_message_size_bytes
423 .unwrap_or(default_rpc_max_encoding_message_size_bytes());
424
425 let rpc_max_decoding_message_size_bytes = opts
426 .rpc_max_decoding_message_size_bytes
427 .unwrap_or(default_rpc_max_decoding_message_size_bytes());
428
429 let server = tonic::transport::Server::builder()
430 .add_service(
431 CompactorServiceServer::new(compactor_srv)
432 .max_decoding_message_size(rpc_max_decoding_message_size_bytes)
433 .max_encoding_message_size(rpc_max_encoding_message_size_bytes),
434 )
435 .add_service(MonitorServiceServer::new(monitor_srv))
436 .monitored_serve_with_shutdown(
437 listen_addr,
438 "grpc-compactor-node-service",
439 TcpConfig {
440 tcp_nodelay: true,
441 keepalive_duration: None,
442 },
443 shutdown.clone().cancelled_owned(),
444 );
445
446 let _server_handle = tokio::spawn(server);
447
448 if config.server.metrics_level > MetricLevel::Disabled {
450 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
451 }
452
453 shutdown.cancelled().await;
455
456 }