risingwave_compactor/
server.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20    AsyncStackTraceOption, MetricLevel, RwConfig, extract_storage_memory_config, load_config,
21};
22use risingwave_common::monitor::{RouterExt, TcpConfig};
23use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
24use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
25use risingwave_common::telemetry::manager::TelemetryManager;
26use risingwave_common::telemetry::telemetry_env_enabled;
27use risingwave_common::util::addr::HostAddr;
28use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
29use risingwave_common::util::tokio_util::sync::CancellationToken;
30use risingwave_common::{GIT_SHA, RW_VERSION};
31use risingwave_common_heap_profiling::HeapProfiler;
32use risingwave_common_service::{MetricsManager, ObserverManager};
33use risingwave_object_store::object::build_remote_object_store;
34use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
35use risingwave_pb::common::WorkerType;
36use risingwave_pb::common::worker_node::Property;
37use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
38use risingwave_pb::configured_monitor_service_server;
39use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
40use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
41use risingwave_storage::compaction_catalog_manager::{
42    CompactionCatalogManager, RemoteTableAccessor,
43};
44use risingwave_storage::hummock::compactor::{
45    CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
46    new_compaction_await_tree_reg_ref,
47};
48use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
49use risingwave_storage::hummock::utils::HummockMemoryCollector;
50use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
51use risingwave_storage::monitor::{
52    CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
53};
54use risingwave_storage::opts::StorageOpts;
55use tokio::sync::mpsc;
56use tracing::info;
57
58use super::compactor_observer::observer_manager::CompactorObserverNode;
59use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
60use crate::telemetry::CompactorTelemetryCreator;
61use crate::{
62    CompactorMode, CompactorOpts, default_rpc_max_decoding_message_size_bytes,
63    default_rpc_max_encoding_message_size_bytes,
64};
65
66pub async fn prepare_start_parameters(
67    compactor_opts: &CompactorOpts,
68    config: RwConfig,
69    system_params_reader: SystemParamsReader,
70) -> (
71    Arc<SstableStore>,
72    Arc<MemoryLimiter>,
73    HeapProfiler,
74    Option<CompactionAwaitTreeRegRef>,
75    Arc<StorageOpts>,
76    Arc<CompactorMetrics>,
77) {
78    // Boot compactor
79    let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
80    let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
81
82    let state_store_url = system_params_reader.state_store();
83
84    let storage_memory_config = extract_storage_memory_config(&config);
85    let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
86        &config,
87        &system_params_reader,
88        &storage_memory_config,
89    )));
90    let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
91        * config.storage.compactor_memory_available_proportion)
92        as usize;
93    let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
94    let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
95        Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
96        None => non_reserved_memory_bytes,
97    };
98
99    compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
100        panic!(
101            "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
102            compactor_memory_limit_bytes,
103            meta_cache_capacity_bytes
104        );
105    });
106
107    tracing::info!(
108        "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
109        non_reserved_memory_bytes,
110        meta_cache_capacity_bytes,
111        compactor_memory_limit_bytes,
112        storage_opts.sstable_size_mb * (1 << 20),
113        storage_opts.block_size_kb * (1 << 10),
114    );
115
116    // check memory config
117    {
118        // This is a similar logic to SstableBuilder memory detection, to ensure that we can find
119        // configuration problems as quickly as possible
120        let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
121            + storage_opts.block_size_kb * (1 << 10))
122            as u64;
123
124        assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
125    }
126
127    let object_store = build_remote_object_store(
128        state_store_url
129            .strip_prefix("hummock+")
130            .expect("object store must be hummock for compactor server"),
131        object_metrics,
132        "Hummock",
133        Arc::new(config.storage.object_store.clone()),
134    )
135    .await;
136
137    let object_store = Arc::new(object_store);
138    let sstable_store = Arc::new(
139        SstableStore::for_compactor(
140            object_store,
141            storage_opts.data_directory.clone(),
142            0,
143            meta_cache_capacity_bytes,
144            system_params_reader.use_new_object_prefix_strategy(),
145        )
146        .await
147        // FIXME(MrCroxx): Handle this error.
148        .unwrap(),
149    );
150
151    let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
152    let storage_memory_config = extract_storage_memory_config(&config);
153    let memory_collector = Arc::new(HummockMemoryCollector::new(
154        sstable_store.clone(),
155        memory_limiter.clone(),
156        storage_memory_config,
157    ));
158
159    let heap_profiler = HeapProfiler::new(
160        system_memory_available_bytes(),
161        config.server.heap_profiling.clone(),
162    );
163
164    monitor_cache(memory_collector);
165
166    let await_tree_config = match &config.streaming.async_stack_trace {
167        AsyncStackTraceOption::Off => None,
168        c => await_tree::ConfigBuilder::default()
169            .verbose(c.is_verbose().unwrap())
170            .build()
171            .ok(),
172    };
173    let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
174
175    (
176        sstable_store,
177        memory_limiter,
178        heap_profiler,
179        await_tree_reg,
180        storage_opts,
181        compactor_metrics,
182    )
183}
184
185/// Fetches and runs compaction tasks.
186///
187/// Returns when the `shutdown` token is triggered.
188pub async fn compactor_serve(
189    listen_addr: SocketAddr,
190    advertise_addr: HostAddr,
191    opts: CompactorOpts,
192    shutdown: CancellationToken,
193    compactor_mode: CompactorMode,
194) {
195    let config = load_config(&opts.config_path, &opts);
196    info!("Starting compactor node",);
197    info!("> config: {:?}", config);
198    info!(
199        "> debug assertions: {}",
200        if cfg!(debug_assertions) { "on" } else { "off" }
201    );
202    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
203
204    let is_iceberg_compactor = matches!(
205        compactor_mode,
206        CompactorMode::DedicatedIceberg | CompactorMode::SharedIceberg
207    );
208
209    let compaction_executor = Arc::new(CompactionExecutor::new(
210        opts.compaction_worker_threads_number,
211    ));
212
213    let max_task_parallelism: u32 = (compaction_executor.worker_num() as f32
214        * config.storage.compactor_max_task_multiplier)
215        .ceil() as u32;
216
217    // Register to the cluster.
218    let (meta_client, system_params_reader) = MetaClient::register_new(
219        opts.meta_address.clone(),
220        WorkerType::Compactor,
221        &advertise_addr,
222        Property {
223            is_iceberg_compactor,
224            parallelism: max_task_parallelism,
225            ..Default::default()
226        },
227        Arc::new(config.meta.clone()),
228    )
229    .await;
230
231    info!("Assigned compactor id {}", meta_client.worker_id());
232
233    let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
234
235    let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
236        meta_client.clone(),
237        hummock_metrics.clone(),
238    ));
239
240    let (
241        sstable_store,
242        memory_limiter,
243        heap_profiler,
244        await_tree_reg,
245        storage_opts,
246        compactor_metrics,
247    ) = Box::pin(prepare_start_parameters(
248        &opts,
249        config.clone(),
250        system_params_reader.clone(),
251    ))
252    .await;
253
254    let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
255        RemoteTableAccessor::new(meta_client.clone()),
256    )));
257
258    let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
259    let compactor_observer_node = CompactorObserverNode::new(
260        compaction_catalog_manager_ref.clone(),
261        system_params_manager.clone(),
262    );
263    let observer_manager =
264        ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
265
266    // Run a background heap profiler
267    heap_profiler.start();
268
269    // use half of limit because any memory which would hold in meta-cache will be allocate by
270    // limited at first.
271    let _observer_join_handle = observer_manager.start().await;
272
273    let object_id_manager = Arc::new(ObjectIdManager::new(
274        hummock_meta_client.clone(),
275        storage_opts.sstable_id_remote_fetch_number,
276    ));
277
278    let compactor_context = CompactorContext {
279        storage_opts,
280        sstable_store: sstable_store.clone(),
281        compactor_metrics,
282        is_share_buffer_compact: false,
283        compaction_executor,
284        memory_limiter,
285        task_progress_manager: Default::default(),
286        await_tree_reg: await_tree_reg.clone(),
287    };
288
289    // TODO(shutdown): don't collect sub-tasks as there's no need to gracefully shutdown them.
290    let mut sub_tasks = vec![
291        MetaClient::start_heartbeat_loop(
292            meta_client.clone(),
293            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
294        ),
295        match compactor_mode {
296            CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
297                compactor_context.clone(),
298                hummock_meta_client.clone(),
299                object_id_manager.clone(),
300                compaction_catalog_manager_ref,
301            ),
302            CompactorMode::Shared => unreachable!(),
303            CompactorMode::DedicatedIceberg => {
304                risingwave_storage::hummock::compactor::start_iceberg_compactor(
305                    compactor_context.clone(),
306                    hummock_meta_client.clone(),
307                )
308            }
309            CompactorMode::SharedIceberg => unreachable!(),
310        },
311    ];
312
313    let telemetry_manager = TelemetryManager::new(
314        Arc::new(meta_client.clone()),
315        Arc::new(CompactorTelemetryCreator::new()),
316    );
317    // if the toml config file or env variable disables telemetry, do not watch system params change
318    // because if any of configs disable telemetry, we should never start it
319    if config.server.telemetry_enabled && telemetry_env_enabled() {
320        sub_tasks.push(telemetry_manager.start().await);
321    } else {
322        tracing::info!("Telemetry didn't start due to config");
323    }
324
325    let compactor_srv = CompactorServiceImpl::default();
326    let monitor_srv = MonitorServiceImpl::new(await_tree_reg, config.server.clone());
327    let server = tonic::transport::Server::builder()
328        .add_service(CompactorServiceServer::new(compactor_srv))
329        .add_service(configured_monitor_service_server(
330            MonitorServiceServer::new(monitor_srv),
331        ))
332        .monitored_serve_with_shutdown(
333            listen_addr,
334            "grpc-compactor-node-service",
335            TcpConfig {
336                tcp_nodelay: true,
337                keepalive_duration: None,
338            },
339            shutdown.clone().cancelled_owned(),
340        );
341    let _server_handle = tokio::spawn(server);
342
343    // Boot metrics service.
344    if config.server.metrics_level > MetricLevel::Disabled {
345        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
346    }
347
348    // All set, let the meta service know we're ready.
349    meta_client.activate(&advertise_addr).await.unwrap();
350
351    // Wait for the shutdown signal.
352    shutdown.cancelled().await;
353    // Run shutdown logic.
354    meta_client.try_unregister().await;
355}
356
357/// Fetches and runs compaction tasks under shared mode.
358///
359/// Returns when the `shutdown` token is triggered.
360pub async fn shared_compactor_serve(
361    listen_addr: SocketAddr,
362    opts: CompactorOpts,
363    shutdown: CancellationToken,
364) {
365    let config = load_config(&opts.config_path, &opts);
366    info!("Starting shared compactor node",);
367    info!("> config: {:?}", config);
368    info!(
369        "> debug assertions: {}",
370        if cfg!(debug_assertions) { "on" } else { "off" }
371    );
372    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
373
374    let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
375    let system_params_response = grpc_proxy_client
376        .get_system_params()
377        .await
378        .expect("Fail to get system params, the compactor pod cannot be started.");
379    let system_params = system_params_response.into_inner().params.unwrap();
380
381    let (
382        sstable_store,
383        memory_limiter,
384        heap_profiler,
385        await_tree_reg,
386        storage_opts,
387        compactor_metrics,
388    ) = Box::pin(prepare_start_parameters(
389        &opts,
390        config.clone(),
391        system_params.into(),
392    ))
393    .await;
394    let (sender, receiver) = mpsc::unbounded_channel();
395    let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
396
397    let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone(), config.server.clone());
398
399    // Run a background heap profiler
400    heap_profiler.start();
401
402    let compaction_executor = Arc::new(CompactionExecutor::new(
403        opts.compaction_worker_threads_number,
404    ));
405    let compactor_context = CompactorContext {
406        storage_opts,
407        sstable_store,
408        compactor_metrics,
409        is_share_buffer_compact: false,
410        compaction_executor,
411        memory_limiter,
412        task_progress_manager: Default::default(),
413        await_tree_reg,
414    };
415
416    // TODO(shutdown): don't collect there's no need to gracefully shutdown them.
417    // Hold the join handle and tx to keep the compactor running.
418    let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
419        grpc_proxy_client,
420        receiver,
421        compactor_context,
422    );
423
424    let rpc_max_encoding_message_size_bytes = opts
425        .rpc_max_encoding_message_size_bytes
426        .unwrap_or(default_rpc_max_encoding_message_size_bytes());
427
428    let rpc_max_decoding_message_size_bytes = opts
429        .rpc_max_decoding_message_size_bytes
430        .unwrap_or(default_rpc_max_decoding_message_size_bytes());
431
432    let server = tonic::transport::Server::builder()
433        .add_service(
434            CompactorServiceServer::new(compactor_srv)
435                .max_decoding_message_size(rpc_max_decoding_message_size_bytes)
436                .max_encoding_message_size(rpc_max_encoding_message_size_bytes),
437        )
438        .add_service(configured_monitor_service_server(
439            MonitorServiceServer::new(monitor_srv),
440        ))
441        .monitored_serve_with_shutdown(
442            listen_addr,
443            "grpc-compactor-node-service",
444            TcpConfig {
445                tcp_nodelay: true,
446                keepalive_duration: None,
447            },
448            shutdown.clone().cancelled_owned(),
449        );
450
451    let _server_handle = tokio::spawn(server);
452
453    // Boot metrics service.
454    if config.server.metrics_level > MetricLevel::Disabled {
455        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
456    }
457
458    // Wait for the shutdown signal.
459    shutdown.cancelled().await;
460
461    // TODO(shutdown): shall we notify the proxy that we are shutting down?
462}