risingwave_compactor/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20    AsyncStackTraceOption, MetricLevel, RwConfig, extract_storage_memory_config, load_config,
21};
22use risingwave_common::monitor::{RouterExt, TcpConfig};
23use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
24use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
25use risingwave_common::telemetry::manager::TelemetryManager;
26use risingwave_common::telemetry::telemetry_env_enabled;
27use risingwave_common::util::addr::HostAddr;
28use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
29use risingwave_common::util::tokio_util::sync::CancellationToken;
30use risingwave_common::{GIT_SHA, RW_VERSION};
31use risingwave_common_heap_profiling::HeapProfiler;
32use risingwave_common_service::{MetricsManager, ObserverManager};
33use risingwave_object_store::object::build_remote_object_store;
34use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
35use risingwave_pb::common::WorkerType;
36use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
37use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
38use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
39use risingwave_storage::compaction_catalog_manager::{
40    CompactionCatalogManager, RemoteTableAccessor,
41};
42use risingwave_storage::hummock::compactor::{
43    CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
44    new_compaction_await_tree_reg_ref,
45};
46use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
47use risingwave_storage::hummock::utils::HummockMemoryCollector;
48use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
49use risingwave_storage::monitor::{
50    CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
51};
52use risingwave_storage::opts::StorageOpts;
53use tokio::sync::mpsc;
54use tracing::info;
55
56use super::compactor_observer::observer_manager::CompactorObserverNode;
57use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
58use crate::telemetry::CompactorTelemetryCreator;
59use crate::{
60    CompactorMode, CompactorOpts, default_rpc_max_decoding_message_size_bytes,
61    default_rpc_max_encoding_message_size_bytes,
62};
63
64pub async fn prepare_start_parameters(
65    compactor_opts: &CompactorOpts,
66    config: RwConfig,
67    system_params_reader: SystemParamsReader,
68) -> (
69    Arc<SstableStore>,
70    Arc<MemoryLimiter>,
71    HeapProfiler,
72    Option<CompactionAwaitTreeRegRef>,
73    Arc<StorageOpts>,
74    Arc<CompactorMetrics>,
75) {
76    // Boot compactor
77    let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
78    let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
79
80    let state_store_url = system_params_reader.state_store();
81
82    let storage_memory_config = extract_storage_memory_config(&config);
83    let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
84        &config,
85        &system_params_reader,
86        &storage_memory_config,
87    )));
88    let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
89        * config.storage.compactor_memory_available_proportion)
90        as usize;
91    let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
92    let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
93        Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
94        None => non_reserved_memory_bytes,
95    };
96
97    compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
98        panic!(
99            "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
100            compactor_memory_limit_bytes,
101            meta_cache_capacity_bytes
102        );
103    });
104
105    tracing::info!(
106        "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
107        non_reserved_memory_bytes,
108        meta_cache_capacity_bytes,
109        compactor_memory_limit_bytes,
110        storage_opts.sstable_size_mb * (1 << 20),
111        storage_opts.block_size_kb * (1 << 10),
112    );
113
114    // check memory config
115    {
116        // This is a similar logic to SstableBuilder memory detection, to ensure that we can find
117        // configuration problems as quickly as possible
118        let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
119            + storage_opts.block_size_kb * (1 << 10))
120            as u64;
121
122        assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
123    }
124
125    let object_store = build_remote_object_store(
126        state_store_url
127            .strip_prefix("hummock+")
128            .expect("object store must be hummock for compactor server"),
129        object_metrics,
130        "Hummock",
131        Arc::new(config.storage.object_store.clone()),
132    )
133    .await;
134
135    let object_store = Arc::new(object_store);
136    let sstable_store = Arc::new(
137        SstableStore::for_compactor(
138            object_store,
139            storage_opts.data_directory.clone(),
140            0,
141            meta_cache_capacity_bytes,
142            system_params_reader.use_new_object_prefix_strategy(),
143        )
144        .await
145        // FIXME(MrCroxx): Handle this error.
146        .unwrap(),
147    );
148
149    let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
150    let storage_memory_config = extract_storage_memory_config(&config);
151    let memory_collector = Arc::new(HummockMemoryCollector::new(
152        sstable_store.clone(),
153        memory_limiter.clone(),
154        storage_memory_config,
155    ));
156
157    let heap_profiler = HeapProfiler::new(
158        system_memory_available_bytes(),
159        config.server.heap_profiling.clone(),
160    );
161
162    monitor_cache(memory_collector);
163
164    let await_tree_config = match &config.streaming.async_stack_trace {
165        AsyncStackTraceOption::Off => None,
166        c => await_tree::ConfigBuilder::default()
167            .verbose(c.is_verbose().unwrap())
168            .build()
169            .ok(),
170    };
171    let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
172
173    (
174        sstable_store,
175        memory_limiter,
176        heap_profiler,
177        await_tree_reg,
178        storage_opts,
179        compactor_metrics,
180    )
181}
182
183/// Fetches and runs compaction tasks.
184///
185/// Returns when the `shutdown` token is triggered.
186pub async fn compactor_serve(
187    listen_addr: SocketAddr,
188    advertise_addr: HostAddr,
189    opts: CompactorOpts,
190    shutdown: CancellationToken,
191    compactor_mode: CompactorMode,
192) {
193    let config = load_config(&opts.config_path, &opts);
194    info!("Starting compactor node",);
195    info!("> config: {:?}", config);
196    info!(
197        "> debug assertions: {}",
198        if cfg!(debug_assertions) { "on" } else { "off" }
199    );
200    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
201    // Register to the cluster.
202    let (meta_client, system_params_reader) = MetaClient::register_new(
203        opts.meta_address.clone(),
204        WorkerType::Compactor,
205        &advertise_addr,
206        Default::default(),
207        Arc::new(config.meta.clone()),
208    )
209    .await;
210
211    info!("Assigned compactor id {}", meta_client.worker_id());
212
213    let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
214
215    let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
216        meta_client.clone(),
217        hummock_metrics.clone(),
218    ));
219
220    let (
221        sstable_store,
222        memory_limiter,
223        heap_profiler,
224        await_tree_reg,
225        storage_opts,
226        compactor_metrics,
227    ) = prepare_start_parameters(&opts, config.clone(), system_params_reader.clone()).await;
228
229    let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
230        RemoteTableAccessor::new(meta_client.clone()),
231    )));
232
233    let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
234    let compactor_observer_node = CompactorObserverNode::new(
235        compaction_catalog_manager_ref.clone(),
236        system_params_manager.clone(),
237    );
238    let observer_manager =
239        ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
240
241    // Run a background heap profiler
242    heap_profiler.start();
243
244    // use half of limit because any memory which would hold in meta-cache will be allocate by
245    // limited at first.
246    let _observer_join_handle = observer_manager.start().await;
247
248    let object_id_manager = Arc::new(ObjectIdManager::new(
249        hummock_meta_client.clone(),
250        storage_opts.sstable_id_remote_fetch_number,
251    ));
252
253    let compaction_executor = Arc::new(CompactionExecutor::new(
254        opts.compaction_worker_threads_number,
255    ));
256
257    let compactor_context = CompactorContext {
258        storage_opts,
259        sstable_store: sstable_store.clone(),
260        compactor_metrics,
261        is_share_buffer_compact: false,
262        compaction_executor,
263        memory_limiter,
264        task_progress_manager: Default::default(),
265        await_tree_reg: await_tree_reg.clone(),
266    };
267
268    // TODO(shutdown): don't collect sub-tasks as there's no need to gracefully shutdown them.
269    let mut sub_tasks = vec![
270        MetaClient::start_heartbeat_loop(
271            meta_client.clone(),
272            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
273        ),
274        match compactor_mode {
275            CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
276                compactor_context.clone(),
277                hummock_meta_client.clone(),
278                object_id_manager.clone(),
279                compaction_catalog_manager_ref,
280            ),
281            CompactorMode::Shared => unreachable!(),
282            CompactorMode::DedicatedIceberg => {
283                risingwave_storage::hummock::compactor::start_iceberg_compactor(
284                    compactor_context.clone(),
285                    hummock_meta_client.clone(),
286                )
287            }
288            CompactorMode::SharedIceberg => unreachable!(),
289        },
290    ];
291
292    let telemetry_manager = TelemetryManager::new(
293        Arc::new(meta_client.clone()),
294        Arc::new(CompactorTelemetryCreator::new()),
295    );
296    // if the toml config file or env variable disables telemetry, do not watch system params change
297    // because if any of configs disable telemetry, we should never start it
298    if config.server.telemetry_enabled && telemetry_env_enabled() {
299        sub_tasks.push(telemetry_manager.start().await);
300    } else {
301        tracing::info!("Telemetry didn't start due to config");
302    }
303
304    let compactor_srv = CompactorServiceImpl::default();
305    let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
306    let server = tonic::transport::Server::builder()
307        .add_service(CompactorServiceServer::new(compactor_srv))
308        .add_service(MonitorServiceServer::new(monitor_srv))
309        .monitored_serve_with_shutdown(
310            listen_addr,
311            "grpc-compactor-node-service",
312            TcpConfig {
313                tcp_nodelay: true,
314                keepalive_duration: None,
315            },
316            shutdown.clone().cancelled_owned(),
317        );
318    let _server_handle = tokio::spawn(server);
319
320    // Boot metrics service.
321    if config.server.metrics_level > MetricLevel::Disabled {
322        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
323    }
324
325    // All set, let the meta service know we're ready.
326    meta_client.activate(&advertise_addr).await.unwrap();
327
328    // Wait for the shutdown signal.
329    shutdown.cancelled().await;
330    // Run shutdown logic.
331    meta_client.try_unregister().await;
332}
333
334/// Fetches and runs compaction tasks under shared mode.
335///
336/// Returns when the `shutdown` token is triggered.
337pub async fn shared_compactor_serve(
338    listen_addr: SocketAddr,
339    opts: CompactorOpts,
340    shutdown: CancellationToken,
341) {
342    let config = load_config(&opts.config_path, &opts);
343    info!("Starting shared compactor node",);
344    info!("> config: {:?}", config);
345    info!(
346        "> debug assertions: {}",
347        if cfg!(debug_assertions) { "on" } else { "off" }
348    );
349    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
350
351    let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
352    let system_params_response = grpc_proxy_client
353        .get_system_params()
354        .await
355        .expect("Fail to get system params, the compactor pod cannot be started.");
356    let system_params = system_params_response.into_inner().params.unwrap();
357
358    let (
359        sstable_store,
360        memory_limiter,
361        heap_profiler,
362        await_tree_reg,
363        storage_opts,
364        compactor_metrics,
365    ) = prepare_start_parameters(&opts, config.clone(), system_params.into()).await;
366    let (sender, receiver) = mpsc::unbounded_channel();
367    let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
368
369    let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone());
370
371    // Run a background heap profiler
372    heap_profiler.start();
373
374    let compaction_executor = Arc::new(CompactionExecutor::new(
375        opts.compaction_worker_threads_number,
376    ));
377    let compactor_context = CompactorContext {
378        storage_opts,
379        sstable_store,
380        compactor_metrics,
381        is_share_buffer_compact: false,
382        compaction_executor,
383        memory_limiter,
384        task_progress_manager: Default::default(),
385        await_tree_reg,
386    };
387
388    // TODO(shutdown): don't collect there's no need to gracefully shutdown them.
389    // Hold the join handle and tx to keep the compactor running.
390    let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
391        grpc_proxy_client,
392        receiver,
393        compactor_context,
394    );
395
396    let rpc_max_encoding_message_size_bytes = opts
397        .rpc_max_encoding_message_size_bytes
398        .unwrap_or(default_rpc_max_encoding_message_size_bytes());
399
400    let rpc_max_decoding_message_size_bytes = opts
401        .rpc_max_decoding_message_size_bytes
402        .unwrap_or(default_rpc_max_decoding_message_size_bytes());
403
404    let server = tonic::transport::Server::builder()
405        .add_service(
406            CompactorServiceServer::new(compactor_srv)
407                .max_decoding_message_size(rpc_max_decoding_message_size_bytes)
408                .max_encoding_message_size(rpc_max_encoding_message_size_bytes),
409        )
410        .add_service(MonitorServiceServer::new(monitor_srv))
411        .monitored_serve_with_shutdown(
412            listen_addr,
413            "grpc-compactor-node-service",
414            TcpConfig {
415                tcp_nodelay: true,
416                keepalive_duration: None,
417            },
418            shutdown.clone().cancelled_owned(),
419        );
420
421    let _server_handle = tokio::spawn(server);
422
423    // Boot metrics service.
424    if config.server.metrics_level > MetricLevel::Disabled {
425        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
426    }
427
428    // Wait for the shutdown signal.
429    shutdown.cancelled().await;
430
431    // TODO(shutdown): shall we notify the proxy that we are shutting down?
432}