1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20 AsyncStackTraceOption, MetricLevel, RwConfig, extract_storage_memory_config, load_config,
21};
22use risingwave_common::monitor::{RouterExt, TcpConfig};
23use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
24use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
25use risingwave_common::telemetry::manager::TelemetryManager;
26use risingwave_common::telemetry::telemetry_env_enabled;
27use risingwave_common::util::addr::HostAddr;
28use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
29use risingwave_common::util::tokio_util::sync::CancellationToken;
30use risingwave_common::{GIT_SHA, RW_VERSION};
31use risingwave_common_heap_profiling::HeapProfiler;
32use risingwave_common_service::{MetricsManager, ObserverManager};
33use risingwave_object_store::object::build_remote_object_store;
34use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
35use risingwave_pb::common::WorkerType;
36use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
37use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
38use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
39use risingwave_storage::compaction_catalog_manager::{
40 CompactionCatalogManager, RemoteTableAccessor,
41};
42use risingwave_storage::hummock::compactor::{
43 CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
44 new_compaction_await_tree_reg_ref,
45};
46use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
47use risingwave_storage::hummock::utils::HummockMemoryCollector;
48use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
49use risingwave_storage::monitor::{
50 CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
51};
52use risingwave_storage::opts::StorageOpts;
53use tokio::sync::mpsc;
54use tracing::info;
55
56use super::compactor_observer::observer_manager::CompactorObserverNode;
57use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
58use crate::telemetry::CompactorTelemetryCreator;
59use crate::{
60 CompactorMode, CompactorOpts, default_rpc_max_decoding_message_size_bytes,
61 default_rpc_max_encoding_message_size_bytes,
62};
63
64pub async fn prepare_start_parameters(
65 compactor_opts: &CompactorOpts,
66 config: RwConfig,
67 system_params_reader: SystemParamsReader,
68) -> (
69 Arc<SstableStore>,
70 Arc<MemoryLimiter>,
71 HeapProfiler,
72 Option<CompactionAwaitTreeRegRef>,
73 Arc<StorageOpts>,
74 Arc<CompactorMetrics>,
75) {
76 let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
78 let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
79
80 let state_store_url = system_params_reader.state_store();
81
82 let storage_memory_config = extract_storage_memory_config(&config);
83 let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
84 &config,
85 &system_params_reader,
86 &storage_memory_config,
87 )));
88 let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
89 * config.storage.compactor_memory_available_proportion)
90 as usize;
91 let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
92 let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
93 Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
94 None => non_reserved_memory_bytes,
95 };
96
97 compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
98 panic!(
99 "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
100 compactor_memory_limit_bytes,
101 meta_cache_capacity_bytes
102 );
103 });
104
105 tracing::info!(
106 "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
107 non_reserved_memory_bytes,
108 meta_cache_capacity_bytes,
109 compactor_memory_limit_bytes,
110 storage_opts.sstable_size_mb * (1 << 20),
111 storage_opts.block_size_kb * (1 << 10),
112 );
113
114 {
116 let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
119 + storage_opts.block_size_kb * (1 << 10))
120 as u64;
121
122 assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
123 }
124
125 let object_store = build_remote_object_store(
126 state_store_url
127 .strip_prefix("hummock+")
128 .expect("object store must be hummock for compactor server"),
129 object_metrics,
130 "Hummock",
131 Arc::new(config.storage.object_store.clone()),
132 )
133 .await;
134
135 let object_store = Arc::new(object_store);
136 let sstable_store = Arc::new(
137 SstableStore::for_compactor(
138 object_store,
139 storage_opts.data_directory.clone(),
140 0,
141 meta_cache_capacity_bytes,
142 system_params_reader.use_new_object_prefix_strategy(),
143 )
144 .await
145 .unwrap(),
147 );
148
149 let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
150 let storage_memory_config = extract_storage_memory_config(&config);
151 let memory_collector = Arc::new(HummockMemoryCollector::new(
152 sstable_store.clone(),
153 memory_limiter.clone(),
154 storage_memory_config,
155 ));
156
157 let heap_profiler = HeapProfiler::new(
158 system_memory_available_bytes(),
159 config.server.heap_profiling.clone(),
160 );
161
162 monitor_cache(memory_collector);
163
164 let await_tree_config = match &config.streaming.async_stack_trace {
165 AsyncStackTraceOption::Off => None,
166 c => await_tree::ConfigBuilder::default()
167 .verbose(c.is_verbose().unwrap())
168 .build()
169 .ok(),
170 };
171 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
172
173 (
174 sstable_store,
175 memory_limiter,
176 heap_profiler,
177 await_tree_reg,
178 storage_opts,
179 compactor_metrics,
180 )
181}
182
183pub async fn compactor_serve(
187 listen_addr: SocketAddr,
188 advertise_addr: HostAddr,
189 opts: CompactorOpts,
190 shutdown: CancellationToken,
191 compactor_mode: CompactorMode,
192) {
193 let config = load_config(&opts.config_path, &opts);
194 info!("Starting compactor node",);
195 info!("> config: {:?}", config);
196 info!(
197 "> debug assertions: {}",
198 if cfg!(debug_assertions) { "on" } else { "off" }
199 );
200 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
201 let (meta_client, system_params_reader) = MetaClient::register_new(
203 opts.meta_address.clone(),
204 WorkerType::Compactor,
205 &advertise_addr,
206 Default::default(),
207 Arc::new(config.meta.clone()),
208 )
209 .await;
210
211 info!("Assigned compactor id {}", meta_client.worker_id());
212
213 let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
214
215 let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
216 meta_client.clone(),
217 hummock_metrics.clone(),
218 ));
219
220 let (
221 sstable_store,
222 memory_limiter,
223 heap_profiler,
224 await_tree_reg,
225 storage_opts,
226 compactor_metrics,
227 ) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await;
228
229 let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
230 RemoteTableAccessor::new(meta_client.clone()),
231 )));
232
233 let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
234 let compactor_observer_node = CompactorObserverNode::new(
235 compaction_catalog_manager_ref.clone(),
236 system_params_manager.clone(),
237 );
238 let observer_manager =
239 ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
240
241 heap_profiler.start();
243
244 let _observer_join_handle = observer_manager.start().await;
247
248 let object_id_manager = Arc::new(ObjectIdManager::new(
249 hummock_meta_client.clone(),
250 storage_opts.sstable_id_remote_fetch_number,
251 ));
252
253 let compaction_executor = Arc::new(CompactionExecutor::new(
254 opts.compaction_worker_threads_number,
255 ));
256
257 let compactor_context = CompactorContext {
258 storage_opts,
259 sstable_store: sstable_store.clone(),
260 compactor_metrics,
261 is_share_buffer_compact: false,
262 compaction_executor,
263 memory_limiter,
264 task_progress_manager: Default::default(),
265 await_tree_reg: await_tree_reg.clone(),
266 };
267
268 let mut sub_tasks = vec![
270 MetaClient::start_heartbeat_loop(
271 meta_client.clone(),
272 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
273 ),
274 match compactor_mode {
275 CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
276 compactor_context.clone(),
277 hummock_meta_client.clone(),
278 object_id_manager.clone(),
279 compaction_catalog_manager_ref,
280 ),
281 CompactorMode::Shared => unreachable!(),
282 CompactorMode::DedicatedIceberg => {
283 risingwave_storage::hummock::compactor::start_iceberg_compactor(
284 compactor_context.clone(),
285 hummock_meta_client.clone(),
286 )
287 }
288 CompactorMode::SharedIceberg => unreachable!(),
289 },
290 ];
291
292 let telemetry_manager = TelemetryManager::new(
293 Arc::new(meta_client.clone()),
294 Arc::new(CompactorTelemetryCreator::new()),
295 );
296 if config.server.telemetry_enabled && telemetry_env_enabled() {
299 sub_tasks.push(telemetry_manager.start().await);
300 } else {
301 tracing::info!("Telemetry didn't start due to config");
302 }
303
304 let compactor_srv = CompactorServiceImpl::default();
305 let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
306 let server = tonic::transport::Server::builder()
307 .add_service(CompactorServiceServer::new(compactor_srv))
308 .add_service(MonitorServiceServer::new(monitor_srv))
309 .monitored_serve_with_shutdown(
310 listen_addr,
311 "grpc-compactor-node-service",
312 TcpConfig {
313 tcp_nodelay: true,
314 keepalive_duration: None,
315 },
316 shutdown.clone().cancelled_owned(),
317 );
318 let _server_handle = tokio::spawn(server);
319
320 if config.server.metrics_level > MetricLevel::Disabled {
322 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
323 }
324
325 meta_client.activate(&advertise_addr).await.unwrap();
327
328 shutdown.cancelled().await;
330 meta_client.try_unregister().await;
332}
333
334pub async fn shared_compactor_serve(
338 listen_addr: SocketAddr,
339 opts: CompactorOpts,
340 shutdown: CancellationToken,
341) {
342 let config = load_config(&opts.config_path, &opts);
343 info!("Starting shared compactor node",);
344 info!("> config: {:?}", config);
345 info!(
346 "> debug assertions: {}",
347 if cfg!(debug_assertions) { "on" } else { "off" }
348 );
349 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
350
351 let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
352 let system_params_response = grpc_proxy_client
353 .get_system_params()
354 .await
355 .expect("Fail to get system params, the compactor pod cannot be started.");
356 let system_params = system_params_response.into_inner().params.unwrap();
357
358 let (
359 sstable_store,
360 memory_limiter,
361 heap_profiler,
362 await_tree_reg,
363 storage_opts,
364 compactor_metrics,
365 ) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await;
366 let (sender, receiver) = mpsc::unbounded_channel();
367 let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
368
369 let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone());
370
371 heap_profiler.start();
373
374 let compaction_executor = Arc::new(CompactionExecutor::new(
375 opts.compaction_worker_threads_number,
376 ));
377 let compactor_context = CompactorContext {
378 storage_opts,
379 sstable_store,
380 compactor_metrics,
381 is_share_buffer_compact: false,
382 compaction_executor,
383 memory_limiter,
384 task_progress_manager: Default::default(),
385 await_tree_reg,
386 };
387
388 let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
391 grpc_proxy_client,
392 receiver,
393 compactor_context,
394 );
395
396 let rpc_max_encoding_message_size_bytes = opts
397 .rpc_max_encoding_message_size_bytes
398 .unwrap_or(default_rpc_max_encoding_message_size_bytes());
399
400 let rpc_max_decoding_message_size_bytes = opts
401 .rpc_max_decoding_message_size_bytes
402 .unwrap_or(default_rpc_max_decoding_message_size_bytes());
403
404 let server = tonic::transport::Server::builder()
405 .add_service(
406 CompactorServiceServer::new(compactor_srv)
407 .max_decoding_message_size(rpc_max_decoding_message_size_bytes)
408 .max_encoding_message_size(rpc_max_encoding_message_size_bytes),
409 )
410 .add_service(MonitorServiceServer::new(monitor_srv))
411 .monitored_serve_with_shutdown(
412 listen_addr,
413 "grpc-compactor-node-service",
414 TcpConfig {
415 tcp_nodelay: true,
416 keepalive_duration: None,
417 },
418 shutdown.clone().cancelled_owned(),
419 );
420
421 let _server_handle = tokio::spawn(server);
422
423 if config.server.metrics_level > MetricLevel::Disabled {
425 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
426 }
427
428 shutdown.cancelled().await;
430
431 }