risingwave_compactor/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20    AsyncStackTraceOption, CompactorMode, MetricLevel, RwConfig, extract_storage_memory_config,
21    load_config,
22};
23use risingwave_common::monitor::{RouterExt, TcpConfig};
24use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
25use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
26use risingwave_common::telemetry::manager::TelemetryManager;
27use risingwave_common::telemetry::telemetry_env_enabled;
28use risingwave_common::util::addr::HostAddr;
29use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
30use risingwave_common::util::tokio_util::sync::CancellationToken;
31use risingwave_common::{GIT_SHA, RW_VERSION};
32use risingwave_common_heap_profiling::HeapProfiler;
33use risingwave_common_service::{MetricsManager, ObserverManager};
34use risingwave_object_store::object::build_remote_object_store;
35use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
36use risingwave_pb::common::WorkerType;
37use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
38use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
39use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
40use risingwave_storage::compaction_catalog_manager::{
41    CompactionCatalogManager, RemoteTableAccessor,
42};
43use risingwave_storage::hummock::compactor::{
44    CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
45    new_compaction_await_tree_reg_ref,
46};
47use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
48use risingwave_storage::hummock::utils::HummockMemoryCollector;
49use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
50use risingwave_storage::monitor::{
51    CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
52};
53use risingwave_storage::opts::StorageOpts;
54use tokio::sync::mpsc;
55use tracing::info;
56
57use super::compactor_observer::observer_manager::CompactorObserverNode;
58use crate::CompactorOpts;
59use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
60use crate::telemetry::CompactorTelemetryCreator;
61
62pub async fn prepare_start_parameters(
63    compactor_opts: &CompactorOpts,
64    config: RwConfig,
65    system_params_reader: SystemParamsReader,
66) -> (
67    Arc<SstableStore>,
68    Arc<MemoryLimiter>,
69    HeapProfiler,
70    Option<CompactionAwaitTreeRegRef>,
71    Arc<StorageOpts>,
72    Arc<CompactorMetrics>,
73) {
74    // Boot compactor
75    let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
76    let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
77
78    let state_store_url = system_params_reader.state_store();
79
80    let storage_memory_config = extract_storage_memory_config(&config);
81    let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
82        &config,
83        &system_params_reader,
84        &storage_memory_config,
85    )));
86    let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
87        * config.storage.compactor_memory_available_proportion)
88        as usize;
89    let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
90    let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
91        Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
92        None => non_reserved_memory_bytes,
93    };
94
95    compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
96        panic!(
97            "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
98            compactor_memory_limit_bytes,
99            meta_cache_capacity_bytes
100        );
101    });
102
103    tracing::info!(
104        "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
105        non_reserved_memory_bytes,
106        meta_cache_capacity_bytes,
107        compactor_memory_limit_bytes,
108        storage_opts.sstable_size_mb * (1 << 20),
109        storage_opts.block_size_kb * (1 << 10),
110    );
111
112    // check memory config
113    {
114        // This is a similar logic to SstableBuilder memory detection, to ensure that we can find
115        // configuration problems as quickly as possible
116        let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
117            + storage_opts.block_size_kb * (1 << 10))
118            as u64;
119
120        assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
121    }
122
123    let object_store = build_remote_object_store(
124        state_store_url
125            .strip_prefix("hummock+")
126            .expect("object store must be hummock for compactor server"),
127        object_metrics,
128        "Hummock",
129        Arc::new(config.storage.object_store.clone()),
130    )
131    .await;
132
133    let object_store = Arc::new(object_store);
134    let sstable_store = Arc::new(
135        SstableStore::for_compactor(
136            object_store,
137            storage_opts.data_directory.clone(),
138            0,
139            meta_cache_capacity_bytes,
140            system_params_reader.use_new_object_prefix_strategy(),
141        )
142        .await
143        // FIXME(MrCroxx): Handle this error.
144        .unwrap(),
145    );
146
147    let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
148    let storage_memory_config = extract_storage_memory_config(&config);
149    let memory_collector = Arc::new(HummockMemoryCollector::new(
150        sstable_store.clone(),
151        memory_limiter.clone(),
152        storage_memory_config,
153    ));
154
155    let heap_profiler = HeapProfiler::new(
156        system_memory_available_bytes(),
157        config.server.heap_profiling.clone(),
158    );
159
160    monitor_cache(memory_collector);
161
162    let await_tree_config = match &config.streaming.async_stack_trace {
163        AsyncStackTraceOption::Off => None,
164        c => await_tree::ConfigBuilder::default()
165            .verbose(c.is_verbose().unwrap())
166            .build()
167            .ok(),
168    };
169    let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
170
171    (
172        sstable_store,
173        memory_limiter,
174        heap_profiler,
175        await_tree_reg,
176        storage_opts,
177        compactor_metrics,
178    )
179}
180
181/// Fetches and runs compaction tasks.
182///
183/// Returns when the `shutdown` token is triggered.
184pub async fn compactor_serve(
185    listen_addr: SocketAddr,
186    advertise_addr: HostAddr,
187    opts: CompactorOpts,
188    shutdown: CancellationToken,
189    compactor_mode: CompactorMode,
190) {
191    let config = load_config(&opts.config_path, &opts);
192    info!("Starting compactor node",);
193    info!("> config: {:?}", config);
194    info!(
195        "> debug assertions: {}",
196        if cfg!(debug_assertions) { "on" } else { "off" }
197    );
198    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
199
200    // Register to the cluster.
201    let (meta_client, system_params_reader) = MetaClient::register_new(
202        opts.meta_address.clone(),
203        WorkerType::Compactor,
204        &advertise_addr,
205        Default::default(),
206        &config.meta,
207    )
208    .await;
209
210    info!("Assigned compactor id {}", meta_client.worker_id());
211
212    let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
213
214    let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
215        meta_client.clone(),
216        hummock_metrics.clone(),
217    ));
218
219    let (
220        sstable_store,
221        memory_limiter,
222        heap_profiler,
223        await_tree_reg,
224        storage_opts,
225        compactor_metrics,
226    ) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await;
227
228    let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
229        RemoteTableAccessor::new(meta_client.clone()),
230    )));
231
232    let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
233    let compactor_observer_node = CompactorObserverNode::new(
234        compaction_catalog_manager_ref.clone(),
235        system_params_manager.clone(),
236    );
237    let observer_manager =
238        ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
239
240    // Run a background heap profiler
241    heap_profiler.start();
242
243    // use half of limit because any memory which would hold in meta-cache will be allocate by
244    // limited at first.
245    let _observer_join_handle = observer_manager.start().await;
246
247    let object_id_manager = Arc::new(ObjectIdManager::new(
248        hummock_meta_client.clone(),
249        storage_opts.sstable_id_remote_fetch_number,
250    ));
251
252    let compaction_executor = Arc::new(CompactionExecutor::new(
253        opts.compaction_worker_threads_number,
254    ));
255
256    let compactor_context = CompactorContext {
257        storage_opts,
258        sstable_store: sstable_store.clone(),
259        compactor_metrics,
260        is_share_buffer_compact: false,
261        compaction_executor,
262        memory_limiter,
263        task_progress_manager: Default::default(),
264        await_tree_reg: await_tree_reg.clone(),
265    };
266
267    // TODO(shutdown): don't collect sub-tasks as there's no need to gracefully shutdown them.
268    let mut sub_tasks = vec![
269        MetaClient::start_heartbeat_loop(
270            meta_client.clone(),
271            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
272        ),
273        match compactor_mode {
274            CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
275                compactor_context.clone(),
276                hummock_meta_client.clone(),
277                object_id_manager.clone(),
278                compaction_catalog_manager_ref,
279            ),
280            CompactorMode::Shared => unreachable!(),
281            CompactorMode::DedicatedIceberg => {
282                risingwave_storage::hummock::compactor::start_iceberg_compactor(
283                    compactor_context.clone(),
284                    hummock_meta_client.clone(),
285                )
286            }
287            CompactorMode::SharedIceberg => unreachable!(),
288        },
289    ];
290
291    let telemetry_manager = TelemetryManager::new(
292        Arc::new(meta_client.clone()),
293        Arc::new(CompactorTelemetryCreator::new()),
294    );
295    // if the toml config file or env variable disables telemetry, do not watch system params change
296    // because if any of configs disable telemetry, we should never start it
297    if config.server.telemetry_enabled && telemetry_env_enabled() {
298        sub_tasks.push(telemetry_manager.start().await);
299    } else {
300        tracing::info!("Telemetry didn't start due to config");
301    }
302
303    let compactor_srv = CompactorServiceImpl::default();
304    let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
305    let server = tonic::transport::Server::builder()
306        .add_service(CompactorServiceServer::new(compactor_srv))
307        .add_service(MonitorServiceServer::new(monitor_srv))
308        .monitored_serve_with_shutdown(
309            listen_addr,
310            "grpc-compactor-node-service",
311            TcpConfig {
312                tcp_nodelay: true,
313                keepalive_duration: None,
314            },
315            shutdown.clone().cancelled_owned(),
316        );
317    let _server_handle = tokio::spawn(server);
318
319    // Boot metrics service.
320    if config.server.metrics_level > MetricLevel::Disabled {
321        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
322    }
323
324    // All set, let the meta service know we're ready.
325    meta_client.activate(&advertise_addr).await.unwrap();
326
327    // Wait for the shutdown signal.
328    shutdown.cancelled().await;
329    // Run shutdown logic.
330    meta_client.try_unregister().await;
331}
332
333/// Fetches and runs compaction tasks under shared mode.
334///
335/// Returns when the `shutdown` token is triggered.
336pub async fn shared_compactor_serve(
337    listen_addr: SocketAddr,
338    opts: CompactorOpts,
339    shutdown: CancellationToken,
340) {
341    let config = load_config(&opts.config_path, &opts);
342    info!("Starting shared compactor node",);
343    info!("> config: {:?}", config);
344    info!(
345        "> debug assertions: {}",
346        if cfg!(debug_assertions) { "on" } else { "off" }
347    );
348    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
349
350    let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
351    let system_params_response = grpc_proxy_client
352        .get_system_params()
353        .await
354        .expect("Fail to get system params, the compactor pod cannot be started.");
355    let system_params = system_params_response.into_inner().params.unwrap();
356
357    let (
358        sstable_store,
359        memory_limiter,
360        heap_profiler,
361        await_tree_reg,
362        storage_opts,
363        compactor_metrics,
364    ) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await;
365    let (sender, receiver) = mpsc::unbounded_channel();
366    let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
367
368    let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone());
369
370    // Run a background heap profiler
371    heap_profiler.start();
372
373    let compaction_executor = Arc::new(CompactionExecutor::new(
374        opts.compaction_worker_threads_number,
375    ));
376    let compactor_context = CompactorContext {
377        storage_opts,
378        sstable_store,
379        compactor_metrics,
380        is_share_buffer_compact: false,
381        compaction_executor,
382        memory_limiter,
383        task_progress_manager: Default::default(),
384        await_tree_reg,
385    };
386
387    // TODO(shutdown): don't collect there's no need to gracefully shutdown them.
388    // Hold the join handle and tx to keep the compactor running.
389    let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
390        grpc_proxy_client,
391        receiver,
392        compactor_context,
393    );
394
395    let server = tonic::transport::Server::builder()
396        .add_service(CompactorServiceServer::new(compactor_srv))
397        .add_service(MonitorServiceServer::new(monitor_srv))
398        .monitored_serve_with_shutdown(
399            listen_addr,
400            "grpc-compactor-node-service",
401            TcpConfig {
402                tcp_nodelay: true,
403                keepalive_duration: None,
404            },
405            shutdown.clone().cancelled_owned(),
406        );
407
408    let _server_handle = tokio::spawn(server);
409
410    // Boot metrics service.
411    if config.server.metrics_level > MetricLevel::Disabled {
412        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
413    }
414
415    // Wait for the shutdown signal.
416    shutdown.cancelled().await;
417
418    // TODO(shutdown): shall we notify the proxy that we are shutting down?
419}