1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20 AsyncStackTraceOption, CompactorMode, MetricLevel, RwConfig, extract_storage_memory_config,
21 load_config,
22};
23use risingwave_common::monitor::{RouterExt, TcpConfig};
24use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
25use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
26use risingwave_common::telemetry::manager::TelemetryManager;
27use risingwave_common::telemetry::telemetry_env_enabled;
28use risingwave_common::util::addr::HostAddr;
29use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
30use risingwave_common::util::tokio_util::sync::CancellationToken;
31use risingwave_common::{GIT_SHA, RW_VERSION};
32use risingwave_common_heap_profiling::HeapProfiler;
33use risingwave_common_service::{MetricsManager, ObserverManager};
34use risingwave_object_store::object::build_remote_object_store;
35use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
36use risingwave_pb::common::WorkerType;
37use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
38use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
39use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
40use risingwave_storage::compaction_catalog_manager::{
41 CompactionCatalogManager, RemoteTableAccessor,
42};
43use risingwave_storage::hummock::compactor::{
44 CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
45 new_compaction_await_tree_reg_ref,
46};
47use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
48use risingwave_storage::hummock::utils::HummockMemoryCollector;
49use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
50use risingwave_storage::monitor::{
51 CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
52};
53use risingwave_storage::opts::StorageOpts;
54use tokio::sync::mpsc;
55use tracing::info;
56
57use super::compactor_observer::observer_manager::CompactorObserverNode;
58use crate::CompactorOpts;
59use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
60use crate::telemetry::CompactorTelemetryCreator;
61
62pub async fn prepare_start_parameters(
63 compactor_opts: &CompactorOpts,
64 config: RwConfig,
65 system_params_reader: SystemParamsReader,
66) -> (
67 Arc<SstableStore>,
68 Arc<MemoryLimiter>,
69 HeapProfiler,
70 Option<CompactionAwaitTreeRegRef>,
71 Arc<StorageOpts>,
72 Arc<CompactorMetrics>,
73) {
74 let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
76 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
77
78 let state_store_url = system_params_reader.state_store();
79
80 let storage_memory_config = extract_storage_memory_config(&config);
81 let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
82 &config,
83 &system_params_reader,
84 &storage_memory_config,
85 )));
86 let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
87 * config.storage.compactor_memory_available_proportion)
88 as usize;
89 let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
90 let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
91 Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
92 None => non_reserved_memory_bytes,
93 };
94
95 compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
96 panic!(
97 "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
98 compactor_memory_limit_bytes,
99 meta_cache_capacity_bytes
100 );
101 });
102
103 tracing::info!(
104 "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
105 non_reserved_memory_bytes,
106 meta_cache_capacity_bytes,
107 compactor_memory_limit_bytes,
108 storage_opts.sstable_size_mb * (1 << 20),
109 storage_opts.block_size_kb * (1 << 10),
110 );
111
112 {
114 let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
117 + storage_opts.block_size_kb * (1 << 10))
118 as u64;
119
120 assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
121 }
122
123 let object_store = build_remote_object_store(
124 state_store_url
125 .strip_prefix("hummock+")
126 .expect("object store must be hummock for compactor server"),
127 object_metrics,
128 "Hummock",
129 Arc::new(config.storage.object_store.clone()),
130 )
131 .await;
132
133 let object_store = Arc::new(object_store);
134 let sstable_store = Arc::new(
135 SstableStore::for_compactor(
136 object_store,
137 storage_opts.data_directory.clone(),
138 0,
139 meta_cache_capacity_bytes,
140 system_params_reader.use_new_object_prefix_strategy(),
141 )
142 .await
143 .unwrap(),
145 );
146
147 let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
148 let storage_memory_config = extract_storage_memory_config(&config);
149 let memory_collector = Arc::new(HummockMemoryCollector::new(
150 sstable_store.clone(),
151 memory_limiter.clone(),
152 storage_memory_config,
153 ));
154
155 let heap_profiler = HeapProfiler::new(
156 system_memory_available_bytes(),
157 config.server.heap_profiling.clone(),
158 );
159
160 monitor_cache(memory_collector);
161
162 let await_tree_config = match &config.streaming.async_stack_trace {
163 AsyncStackTraceOption::Off => None,
164 c => await_tree::ConfigBuilder::default()
165 .verbose(c.is_verbose().unwrap())
166 .build()
167 .ok(),
168 };
169 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
170
171 (
172 sstable_store,
173 memory_limiter,
174 heap_profiler,
175 await_tree_reg,
176 storage_opts,
177 compactor_metrics,
178 )
179}
180
181pub async fn compactor_serve(
185 listen_addr: SocketAddr,
186 advertise_addr: HostAddr,
187 opts: CompactorOpts,
188 shutdown: CancellationToken,
189 compactor_mode: CompactorMode,
190) {
191 let config = load_config(&opts.config_path, &opts);
192 info!("Starting compactor node",);
193 info!("> config: {:?}", config);
194 info!(
195 "> debug assertions: {}",
196 if cfg!(debug_assertions) { "on" } else { "off" }
197 );
198 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
199
200 let (meta_client, system_params_reader) = MetaClient::register_new(
202 opts.meta_address.clone(),
203 WorkerType::Compactor,
204 &advertise_addr,
205 Default::default(),
206 &config.meta,
207 )
208 .await;
209
210 info!("Assigned compactor id {}", meta_client.worker_id());
211
212 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
213
214 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
215 meta_client.clone(),
216 hummock_metrics.clone(),
217 ));
218
219 let (
220 sstable_store,
221 memory_limiter,
222 heap_profiler,
223 await_tree_reg,
224 storage_opts,
225 compactor_metrics,
226 ) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await;
227
228 let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
229 RemoteTableAccessor::new(meta_client.clone()),
230 )));
231
232 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
233 let compactor_observer_node = CompactorObserverNode::new(
234 compaction_catalog_manager_ref.clone(),
235 system_params_manager.clone(),
236 );
237 let observer_manager =
238 ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
239
240 heap_profiler.start();
242
243 let _observer_join_handle = observer_manager.start().await;
246
247 let object_id_manager = Arc::new(ObjectIdManager::new(
248 hummock_meta_client.clone(),
249 storage_opts.sstable_id_remote_fetch_number,
250 ));
251
252 let compaction_executor = Arc::new(CompactionExecutor::new(
253 opts.compaction_worker_threads_number,
254 ));
255
256 let compactor_context = CompactorContext {
257 storage_opts,
258 sstable_store: sstable_store.clone(),
259 compactor_metrics,
260 is_share_buffer_compact: false,
261 compaction_executor,
262 memory_limiter,
263 task_progress_manager: Default::default(),
264 await_tree_reg: await_tree_reg.clone(),
265 };
266
267 let mut sub_tasks = vec![
269 MetaClient::start_heartbeat_loop(
270 meta_client.clone(),
271 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
272 ),
273 match compactor_mode {
274 CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
275 compactor_context.clone(),
276 hummock_meta_client.clone(),
277 object_id_manager.clone(),
278 compaction_catalog_manager_ref,
279 ),
280 CompactorMode::Shared => unreachable!(),
281 CompactorMode::DedicatedIceberg => {
282 risingwave_storage::hummock::compactor::start_iceberg_compactor(
283 compactor_context.clone(),
284 hummock_meta_client.clone(),
285 )
286 }
287 CompactorMode::SharedIceberg => unreachable!(),
288 },
289 ];
290
291 let telemetry_manager = TelemetryManager::new(
292 Arc::new(meta_client.clone()),
293 Arc::new(CompactorTelemetryCreator::new()),
294 );
295 if config.server.telemetry_enabled && telemetry_env_enabled() {
298 sub_tasks.push(telemetry_manager.start().await);
299 } else {
300 tracing::info!("Telemetry didn't start due to config");
301 }
302
303 let compactor_srv = CompactorServiceImpl::default();
304 let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
305 let server = tonic::transport::Server::builder()
306 .add_service(CompactorServiceServer::new(compactor_srv))
307 .add_service(MonitorServiceServer::new(monitor_srv))
308 .monitored_serve_with_shutdown(
309 listen_addr,
310 "grpc-compactor-node-service",
311 TcpConfig {
312 tcp_nodelay: true,
313 keepalive_duration: None,
314 },
315 shutdown.clone().cancelled_owned(),
316 );
317 let _server_handle = tokio::spawn(server);
318
319 if config.server.metrics_level > MetricLevel::Disabled {
321 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
322 }
323
324 meta_client.activate(&advertise_addr).await.unwrap();
326
327 shutdown.cancelled().await;
329 meta_client.try_unregister().await;
331}
332
333pub async fn shared_compactor_serve(
337 listen_addr: SocketAddr,
338 opts: CompactorOpts,
339 shutdown: CancellationToken,
340) {
341 let config = load_config(&opts.config_path, &opts);
342 info!("Starting shared compactor node",);
343 info!("> config: {:?}", config);
344 info!(
345 "> debug assertions: {}",
346 if cfg!(debug_assertions) { "on" } else { "off" }
347 );
348 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
349
350 let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
351 let system_params_response = grpc_proxy_client
352 .get_system_params()
353 .await
354 .expect("Fail to get system params, the compactor pod cannot be started.");
355 let system_params = system_params_response.into_inner().params.unwrap();
356
357 let (
358 sstable_store,
359 memory_limiter,
360 heap_profiler,
361 await_tree_reg,
362 storage_opts,
363 compactor_metrics,
364 ) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await;
365 let (sender, receiver) = mpsc::unbounded_channel();
366 let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
367
368 let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone());
369
370 heap_profiler.start();
372
373 let compaction_executor = Arc::new(CompactionExecutor::new(
374 opts.compaction_worker_threads_number,
375 ));
376 let compactor_context = CompactorContext {
377 storage_opts,
378 sstable_store,
379 compactor_metrics,
380 is_share_buffer_compact: false,
381 compaction_executor,
382 memory_limiter,
383 task_progress_manager: Default::default(),
384 await_tree_reg,
385 };
386
387 let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
390 grpc_proxy_client,
391 receiver,
392 compactor_context,
393 );
394
395 let server = tonic::transport::Server::builder()
396 .add_service(CompactorServiceServer::new(compactor_srv))
397 .add_service(MonitorServiceServer::new(monitor_srv))
398 .monitored_serve_with_shutdown(
399 listen_addr,
400 "grpc-compactor-node-service",
401 TcpConfig {
402 tcp_nodelay: true,
403 keepalive_duration: None,
404 },
405 shutdown.clone().cancelled_owned(),
406 );
407
408 let _server_handle = tokio::spawn(server);
409
410 if config.server.metrics_level > MetricLevel::Disabled {
412 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
413 }
414
415 shutdown.cancelled().await;
417
418 }