1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20 AsyncStackTraceOption, MetricLevel, RwConfig, extract_storage_memory_config, load_config,
21};
22use risingwave_common::monitor::{RouterExt, TcpConfig};
23use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
24use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
25use risingwave_common::telemetry::manager::TelemetryManager;
26use risingwave_common::telemetry::telemetry_env_enabled;
27use risingwave_common::util::addr::HostAddr;
28use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
29use risingwave_common::util::tokio_util::sync::CancellationToken;
30use risingwave_common::{GIT_SHA, RW_VERSION};
31use risingwave_common_heap_profiling::HeapProfiler;
32use risingwave_common_service::{MetricsManager, ObserverManager};
33use risingwave_object_store::object::build_remote_object_store;
34use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
35use risingwave_pb::common::WorkerType;
36use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
37use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
38use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
39use risingwave_storage::compaction_catalog_manager::{
40 CompactionCatalogManager, RemoteTableAccessor,
41};
42use risingwave_storage::hummock::compactor::{
43 CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
44 new_compaction_await_tree_reg_ref,
45};
46use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
47use risingwave_storage::hummock::utils::HummockMemoryCollector;
48use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
49use risingwave_storage::monitor::{
50 CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
51};
52use risingwave_storage::opts::StorageOpts;
53use tokio::sync::mpsc;
54use tracing::info;
55
56use super::compactor_observer::observer_manager::CompactorObserverNode;
57use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
58use crate::telemetry::CompactorTelemetryCreator;
59use crate::{CompactorMode, CompactorOpts};
60
61pub async fn prepare_start_parameters(
62 compactor_opts: &CompactorOpts,
63 config: RwConfig,
64 system_params_reader: SystemParamsReader,
65) -> (
66 Arc<SstableStore>,
67 Arc<MemoryLimiter>,
68 HeapProfiler,
69 Option<CompactionAwaitTreeRegRef>,
70 Arc<StorageOpts>,
71 Arc<CompactorMetrics>,
72) {
73 let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
75 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
76
77 let state_store_url = system_params_reader.state_store();
78
79 let storage_memory_config = extract_storage_memory_config(&config);
80 let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
81 &config,
82 &system_params_reader,
83 &storage_memory_config,
84 )));
85 let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
86 * config.storage.compactor_memory_available_proportion)
87 as usize;
88 let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
89 let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
90 Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
91 None => non_reserved_memory_bytes,
92 };
93
94 compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
95 panic!(
96 "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
97 compactor_memory_limit_bytes,
98 meta_cache_capacity_bytes
99 );
100 });
101
102 tracing::info!(
103 "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
104 non_reserved_memory_bytes,
105 meta_cache_capacity_bytes,
106 compactor_memory_limit_bytes,
107 storage_opts.sstable_size_mb * (1 << 20),
108 storage_opts.block_size_kb * (1 << 10),
109 );
110
111 {
113 let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
116 + storage_opts.block_size_kb * (1 << 10))
117 as u64;
118
119 assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
120 }
121
122 let object_store = build_remote_object_store(
123 state_store_url
124 .strip_prefix("hummock+")
125 .expect("object store must be hummock for compactor server"),
126 object_metrics,
127 "Hummock",
128 Arc::new(config.storage.object_store.clone()),
129 )
130 .await;
131
132 let object_store = Arc::new(object_store);
133 let sstable_store = Arc::new(
134 SstableStore::for_compactor(
135 object_store,
136 storage_opts.data_directory.clone(),
137 0,
138 meta_cache_capacity_bytes,
139 system_params_reader.use_new_object_prefix_strategy(),
140 )
141 .await
142 .unwrap(),
144 );
145
146 let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
147 let storage_memory_config = extract_storage_memory_config(&config);
148 let memory_collector = Arc::new(HummockMemoryCollector::new(
149 sstable_store.clone(),
150 memory_limiter.clone(),
151 storage_memory_config,
152 ));
153
154 let heap_profiler = HeapProfiler::new(
155 system_memory_available_bytes(),
156 config.server.heap_profiling.clone(),
157 );
158
159 monitor_cache(memory_collector);
160
161 let await_tree_config = match &config.streaming.async_stack_trace {
162 AsyncStackTraceOption::Off => None,
163 c => await_tree::ConfigBuilder::default()
164 .verbose(c.is_verbose().unwrap())
165 .build()
166 .ok(),
167 };
168 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
169
170 (
171 sstable_store,
172 memory_limiter,
173 heap_profiler,
174 await_tree_reg,
175 storage_opts,
176 compactor_metrics,
177 )
178}
179
180pub async fn compactor_serve(
184 listen_addr: SocketAddr,
185 advertise_addr: HostAddr,
186 opts: CompactorOpts,
187 shutdown: CancellationToken,
188 compactor_mode: CompactorMode,
189) {
190 let config = load_config(&opts.config_path, &opts);
191 info!("Starting compactor node",);
192 info!("> config: {:?}", config);
193 info!(
194 "> debug assertions: {}",
195 if cfg!(debug_assertions) { "on" } else { "off" }
196 );
197 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
198 let (meta_client, system_params_reader) = MetaClient::register_new(
200 opts.meta_address.clone(),
201 WorkerType::Compactor,
202 &advertise_addr,
203 Default::default(),
204 &config.meta,
205 )
206 .await;
207
208 info!("Assigned compactor id {}", meta_client.worker_id());
209
210 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
211
212 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
213 meta_client.clone(),
214 hummock_metrics.clone(),
215 ));
216
217 let (
218 sstable_store,
219 memory_limiter,
220 heap_profiler,
221 await_tree_reg,
222 storage_opts,
223 compactor_metrics,
224 ) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await;
225
226 let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
227 RemoteTableAccessor::new(meta_client.clone()),
228 )));
229
230 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
231 let compactor_observer_node = CompactorObserverNode::new(
232 compaction_catalog_manager_ref.clone(),
233 system_params_manager.clone(),
234 );
235 let observer_manager =
236 ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
237
238 heap_profiler.start();
240
241 let _observer_join_handle = observer_manager.start().await;
244
245 let object_id_manager = Arc::new(ObjectIdManager::new(
246 hummock_meta_client.clone(),
247 storage_opts.sstable_id_remote_fetch_number,
248 ));
249
250 let compaction_executor = Arc::new(CompactionExecutor::new(
251 opts.compaction_worker_threads_number,
252 ));
253
254 let compactor_context = CompactorContext {
255 storage_opts,
256 sstable_store: sstable_store.clone(),
257 compactor_metrics,
258 is_share_buffer_compact: false,
259 compaction_executor,
260 memory_limiter,
261 task_progress_manager: Default::default(),
262 await_tree_reg: await_tree_reg.clone(),
263 };
264
265 let mut sub_tasks = vec![
267 MetaClient::start_heartbeat_loop(
268 meta_client.clone(),
269 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
270 ),
271 match compactor_mode {
272 CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
273 compactor_context.clone(),
274 hummock_meta_client.clone(),
275 object_id_manager.clone(),
276 compaction_catalog_manager_ref,
277 ),
278 CompactorMode::Shared => unreachable!(),
279 CompactorMode::DedicatedIceberg => {
280 risingwave_storage::hummock::compactor::start_iceberg_compactor(
281 compactor_context.clone(),
282 hummock_meta_client.clone(),
283 )
284 }
285 CompactorMode::SharedIceberg => unreachable!(),
286 },
287 ];
288
289 let telemetry_manager = TelemetryManager::new(
290 Arc::new(meta_client.clone()),
291 Arc::new(CompactorTelemetryCreator::new()),
292 );
293 if config.server.telemetry_enabled && telemetry_env_enabled() {
296 sub_tasks.push(telemetry_manager.start().await);
297 } else {
298 tracing::info!("Telemetry didn't start due to config");
299 }
300
301 let compactor_srv = CompactorServiceImpl::default();
302 let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
303 let server = tonic::transport::Server::builder()
304 .add_service(CompactorServiceServer::new(compactor_srv))
305 .add_service(MonitorServiceServer::new(monitor_srv))
306 .monitored_serve_with_shutdown(
307 listen_addr,
308 "grpc-compactor-node-service",
309 TcpConfig {
310 tcp_nodelay: true,
311 keepalive_duration: None,
312 },
313 shutdown.clone().cancelled_owned(),
314 );
315 let _server_handle = tokio::spawn(server);
316
317 if config.server.metrics_level > MetricLevel::Disabled {
319 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
320 }
321
322 meta_client.activate(&advertise_addr).await.unwrap();
324
325 shutdown.cancelled().await;
327 meta_client.try_unregister().await;
329}
330
331pub async fn shared_compactor_serve(
335 listen_addr: SocketAddr,
336 opts: CompactorOpts,
337 shutdown: CancellationToken,
338) {
339 let config = load_config(&opts.config_path, &opts);
340 info!("Starting shared compactor node",);
341 info!("> config: {:?}", config);
342 info!(
343 "> debug assertions: {}",
344 if cfg!(debug_assertions) { "on" } else { "off" }
345 );
346 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
347
348 let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
349 let system_params_response = grpc_proxy_client
350 .get_system_params()
351 .await
352 .expect("Fail to get system params, the compactor pod cannot be started.");
353 let system_params = system_params_response.into_inner().params.unwrap();
354
355 let (
356 sstable_store,
357 memory_limiter,
358 heap_profiler,
359 await_tree_reg,
360 storage_opts,
361 compactor_metrics,
362 ) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await;
363 let (sender, receiver) = mpsc::unbounded_channel();
364 let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
365
366 let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone());
367
368 heap_profiler.start();
370
371 let compaction_executor = Arc::new(CompactionExecutor::new(
372 opts.compaction_worker_threads_number,
373 ));
374 let compactor_context = CompactorContext {
375 storage_opts,
376 sstable_store,
377 compactor_metrics,
378 is_share_buffer_compact: false,
379 compaction_executor,
380 memory_limiter,
381 task_progress_manager: Default::default(),
382 await_tree_reg,
383 };
384
385 let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
388 grpc_proxy_client,
389 receiver,
390 compactor_context,
391 );
392
393 let server = tonic::transport::Server::builder()
394 .add_service(CompactorServiceServer::new(compactor_srv))
395 .add_service(MonitorServiceServer::new(monitor_srv))
396 .monitored_serve_with_shutdown(
397 listen_addr,
398 "grpc-compactor-node-service",
399 TcpConfig {
400 tcp_nodelay: true,
401 keepalive_duration: None,
402 },
403 shutdown.clone().cancelled_owned(),
404 );
405
406 let _server_handle = tokio::spawn(server);
407
408 if config.server.metrics_level > MetricLevel::Disabled {
410 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
411 }
412
413 shutdown.cancelled().await;
415
416 }