risingwave_compactor/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20    AsyncStackTraceOption, MetricLevel, RwConfig, extract_storage_memory_config, load_config,
21};
22use risingwave_common::monitor::{RouterExt, TcpConfig};
23use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
24use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
25use risingwave_common::telemetry::manager::TelemetryManager;
26use risingwave_common::telemetry::telemetry_env_enabled;
27use risingwave_common::util::addr::HostAddr;
28use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
29use risingwave_common::util::tokio_util::sync::CancellationToken;
30use risingwave_common::{GIT_SHA, RW_VERSION};
31use risingwave_common_heap_profiling::HeapProfiler;
32use risingwave_common_service::{MetricsManager, ObserverManager};
33use risingwave_object_store::object::build_remote_object_store;
34use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
35use risingwave_pb::common::WorkerType;
36use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
37use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
38use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
39use risingwave_storage::compaction_catalog_manager::{
40    CompactionCatalogManager, RemoteTableAccessor,
41};
42use risingwave_storage::hummock::compactor::{
43    CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
44    new_compaction_await_tree_reg_ref,
45};
46use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
47use risingwave_storage::hummock::utils::HummockMemoryCollector;
48use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
49use risingwave_storage::monitor::{
50    CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
51};
52use risingwave_storage::opts::StorageOpts;
53use tokio::sync::mpsc;
54use tracing::info;
55
56use super::compactor_observer::observer_manager::CompactorObserverNode;
57use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
58use crate::telemetry::CompactorTelemetryCreator;
59use crate::{CompactorMode, CompactorOpts};
60
61pub async fn prepare_start_parameters(
62    compactor_opts: &CompactorOpts,
63    config: RwConfig,
64    system_params_reader: SystemParamsReader,
65) -> (
66    Arc<SstableStore>,
67    Arc<MemoryLimiter>,
68    HeapProfiler,
69    Option<CompactionAwaitTreeRegRef>,
70    Arc<StorageOpts>,
71    Arc<CompactorMetrics>,
72) {
73    // Boot compactor
74    let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
75    let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
76
77    let state_store_url = system_params_reader.state_store();
78
79    let storage_memory_config = extract_storage_memory_config(&config);
80    let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
81        &config,
82        &system_params_reader,
83        &storage_memory_config,
84    )));
85    let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
86        * config.storage.compactor_memory_available_proportion)
87        as usize;
88    let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
89    let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
90        Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
91        None => non_reserved_memory_bytes,
92    };
93
94    compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
95        panic!(
96            "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
97            compactor_memory_limit_bytes,
98            meta_cache_capacity_bytes
99        );
100    });
101
102    tracing::info!(
103        "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
104        non_reserved_memory_bytes,
105        meta_cache_capacity_bytes,
106        compactor_memory_limit_bytes,
107        storage_opts.sstable_size_mb * (1 << 20),
108        storage_opts.block_size_kb * (1 << 10),
109    );
110
111    // check memory config
112    {
113        // This is a similar logic to SstableBuilder memory detection, to ensure that we can find
114        // configuration problems as quickly as possible
115        let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
116            + storage_opts.block_size_kb * (1 << 10))
117            as u64;
118
119        assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
120    }
121
122    let object_store = build_remote_object_store(
123        state_store_url
124            .strip_prefix("hummock+")
125            .expect("object store must be hummock for compactor server"),
126        object_metrics,
127        "Hummock",
128        Arc::new(config.storage.object_store.clone()),
129    )
130    .await;
131
132    let object_store = Arc::new(object_store);
133    let sstable_store = Arc::new(
134        SstableStore::for_compactor(
135            object_store,
136            storage_opts.data_directory.clone(),
137            0,
138            meta_cache_capacity_bytes,
139            system_params_reader.use_new_object_prefix_strategy(),
140        )
141        .await
142        // FIXME(MrCroxx): Handle this error.
143        .unwrap(),
144    );
145
146    let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
147    let storage_memory_config = extract_storage_memory_config(&config);
148    let memory_collector = Arc::new(HummockMemoryCollector::new(
149        sstable_store.clone(),
150        memory_limiter.clone(),
151        storage_memory_config,
152    ));
153
154    let heap_profiler = HeapProfiler::new(
155        system_memory_available_bytes(),
156        config.server.heap_profiling.clone(),
157    );
158
159    monitor_cache(memory_collector);
160
161    let await_tree_config = match &config.streaming.async_stack_trace {
162        AsyncStackTraceOption::Off => None,
163        c => await_tree::ConfigBuilder::default()
164            .verbose(c.is_verbose().unwrap())
165            .build()
166            .ok(),
167    };
168    let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
169
170    (
171        sstable_store,
172        memory_limiter,
173        heap_profiler,
174        await_tree_reg,
175        storage_opts,
176        compactor_metrics,
177    )
178}
179
180/// Fetches and runs compaction tasks.
181///
182/// Returns when the `shutdown` token is triggered.
183pub async fn compactor_serve(
184    listen_addr: SocketAddr,
185    advertise_addr: HostAddr,
186    opts: CompactorOpts,
187    shutdown: CancellationToken,
188    compactor_mode: CompactorMode,
189) {
190    let config = load_config(&opts.config_path, &opts);
191    info!("Starting compactor node",);
192    info!("> config: {:?}", config);
193    info!(
194        "> debug assertions: {}",
195        if cfg!(debug_assertions) { "on" } else { "off" }
196    );
197    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
198    // Register to the cluster.
199    let (meta_client, system_params_reader) = MetaClient::register_new(
200        opts.meta_address.clone(),
201        WorkerType::Compactor,
202        &advertise_addr,
203        Default::default(),
204        &config.meta,
205    )
206    .await;
207
208    info!("Assigned compactor id {}", meta_client.worker_id());
209
210    let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
211
212    let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
213        meta_client.clone(),
214        hummock_metrics.clone(),
215    ));
216
217    let (
218        sstable_store,
219        memory_limiter,
220        heap_profiler,
221        await_tree_reg,
222        storage_opts,
223        compactor_metrics,
224    ) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await;
225
226    let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
227        RemoteTableAccessor::new(meta_client.clone()),
228    )));
229
230    let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
231    let compactor_observer_node = CompactorObserverNode::new(
232        compaction_catalog_manager_ref.clone(),
233        system_params_manager.clone(),
234    );
235    let observer_manager =
236        ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
237
238    // Run a background heap profiler
239    heap_profiler.start();
240
241    // use half of limit because any memory which would hold in meta-cache will be allocate by
242    // limited at first.
243    let _observer_join_handle = observer_manager.start().await;
244
245    let object_id_manager = Arc::new(ObjectIdManager::new(
246        hummock_meta_client.clone(),
247        storage_opts.sstable_id_remote_fetch_number,
248    ));
249
250    let compaction_executor = Arc::new(CompactionExecutor::new(
251        opts.compaction_worker_threads_number,
252    ));
253
254    let compactor_context = CompactorContext {
255        storage_opts,
256        sstable_store: sstable_store.clone(),
257        compactor_metrics,
258        is_share_buffer_compact: false,
259        compaction_executor,
260        memory_limiter,
261        task_progress_manager: Default::default(),
262        await_tree_reg: await_tree_reg.clone(),
263    };
264
265    // TODO(shutdown): don't collect sub-tasks as there's no need to gracefully shutdown them.
266    let mut sub_tasks = vec![
267        MetaClient::start_heartbeat_loop(
268            meta_client.clone(),
269            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
270        ),
271        match compactor_mode {
272            CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
273                compactor_context.clone(),
274                hummock_meta_client.clone(),
275                object_id_manager.clone(),
276                compaction_catalog_manager_ref,
277            ),
278            CompactorMode::Shared => unreachable!(),
279            CompactorMode::DedicatedIceberg => {
280                risingwave_storage::hummock::compactor::start_iceberg_compactor(
281                    compactor_context.clone(),
282                    hummock_meta_client.clone(),
283                )
284            }
285            CompactorMode::SharedIceberg => unreachable!(),
286        },
287    ];
288
289    let telemetry_manager = TelemetryManager::new(
290        Arc::new(meta_client.clone()),
291        Arc::new(CompactorTelemetryCreator::new()),
292    );
293    // if the toml config file or env variable disables telemetry, do not watch system params change
294    // because if any of configs disable telemetry, we should never start it
295    if config.server.telemetry_enabled && telemetry_env_enabled() {
296        sub_tasks.push(telemetry_manager.start().await);
297    } else {
298        tracing::info!("Telemetry didn't start due to config");
299    }
300
301    let compactor_srv = CompactorServiceImpl::default();
302    let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
303    let server = tonic::transport::Server::builder()
304        .add_service(CompactorServiceServer::new(compactor_srv))
305        .add_service(MonitorServiceServer::new(monitor_srv))
306        .monitored_serve_with_shutdown(
307            listen_addr,
308            "grpc-compactor-node-service",
309            TcpConfig {
310                tcp_nodelay: true,
311                keepalive_duration: None,
312            },
313            shutdown.clone().cancelled_owned(),
314        );
315    let _server_handle = tokio::spawn(server);
316
317    // Boot metrics service.
318    if config.server.metrics_level > MetricLevel::Disabled {
319        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
320    }
321
322    // All set, let the meta service know we're ready.
323    meta_client.activate(&advertise_addr).await.unwrap();
324
325    // Wait for the shutdown signal.
326    shutdown.cancelled().await;
327    // Run shutdown logic.
328    meta_client.try_unregister().await;
329}
330
331/// Fetches and runs compaction tasks under shared mode.
332///
333/// Returns when the `shutdown` token is triggered.
334pub async fn shared_compactor_serve(
335    listen_addr: SocketAddr,
336    opts: CompactorOpts,
337    shutdown: CancellationToken,
338) {
339    let config = load_config(&opts.config_path, &opts);
340    info!("Starting shared compactor node",);
341    info!("> config: {:?}", config);
342    info!(
343        "> debug assertions: {}",
344        if cfg!(debug_assertions) { "on" } else { "off" }
345    );
346    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
347
348    let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
349    let system_params_response = grpc_proxy_client
350        .get_system_params()
351        .await
352        .expect("Fail to get system params, the compactor pod cannot be started.");
353    let system_params = system_params_response.into_inner().params.unwrap();
354
355    let (
356        sstable_store,
357        memory_limiter,
358        heap_profiler,
359        await_tree_reg,
360        storage_opts,
361        compactor_metrics,
362    ) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await;
363    let (sender, receiver) = mpsc::unbounded_channel();
364    let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
365
366    let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone());
367
368    // Run a background heap profiler
369    heap_profiler.start();
370
371    let compaction_executor = Arc::new(CompactionExecutor::new(
372        opts.compaction_worker_threads_number,
373    ));
374    let compactor_context = CompactorContext {
375        storage_opts,
376        sstable_store,
377        compactor_metrics,
378        is_share_buffer_compact: false,
379        compaction_executor,
380        memory_limiter,
381        task_progress_manager: Default::default(),
382        await_tree_reg,
383    };
384
385    // TODO(shutdown): don't collect there's no need to gracefully shutdown them.
386    // Hold the join handle and tx to keep the compactor running.
387    let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
388        grpc_proxy_client,
389        receiver,
390        compactor_context,
391    );
392
393    let server = tonic::transport::Server::builder()
394        .add_service(CompactorServiceServer::new(compactor_srv))
395        .add_service(MonitorServiceServer::new(monitor_srv))
396        .monitored_serve_with_shutdown(
397            listen_addr,
398            "grpc-compactor-node-service",
399            TcpConfig {
400                tcp_nodelay: true,
401                keepalive_duration: None,
402            },
403            shutdown.clone().cancelled_owned(),
404        );
405
406    let _server_handle = tokio::spawn(server);
407
408    // Boot metrics service.
409    if config.server.metrics_level > MetricLevel::Disabled {
410        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
411    }
412
413    // Wait for the shutdown signal.
414    shutdown.cancelled().await;
415
416    // TODO(shutdown): shall we notify the proxy that we are shutting down?
417}