risingwave_compactor/
server.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_common::config::{
20    AsyncStackTraceOption, MetricLevel, RwConfig, extract_storage_memory_config, load_config,
21};
22use risingwave_common::monitor::{RouterExt, TcpConfig};
23use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
24use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader};
25use risingwave_common::telemetry::manager::TelemetryManager;
26use risingwave_common::telemetry::telemetry_env_enabled;
27use risingwave_common::util::addr::HostAddr;
28use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
29use risingwave_common::util::tokio_util::sync::CancellationToken;
30use risingwave_common::{GIT_SHA, RW_VERSION};
31use risingwave_common_heap_profiling::HeapProfiler;
32use risingwave_common_service::{MetricsManager, ObserverManager};
33use risingwave_object_store::object::build_remote_object_store;
34use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS;
35use risingwave_pb::common::WorkerType;
36use risingwave_pb::common::worker_node::Property;
37use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
38use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
39use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
40use risingwave_storage::compaction_catalog_manager::{
41    CompactionCatalogManager, RemoteTableAccessor,
42};
43use risingwave_storage::hummock::compactor::{
44    CompactionAwaitTreeRegRef, CompactionExecutor, CompactorContext,
45    new_compaction_await_tree_reg_ref,
46};
47use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
48use risingwave_storage::hummock::utils::HummockMemoryCollector;
49use risingwave_storage::hummock::{MemoryLimiter, ObjectIdManager, SstableStore};
50use risingwave_storage::monitor::{
51    CompactorMetrics, GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, monitor_cache,
52};
53use risingwave_storage::opts::StorageOpts;
54use tokio::sync::mpsc;
55use tracing::info;
56
57use super::compactor_observer::observer_manager::CompactorObserverNode;
58use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
59use crate::telemetry::CompactorTelemetryCreator;
60use crate::{
61    CompactorMode, CompactorOpts, default_rpc_max_decoding_message_size_bytes,
62    default_rpc_max_encoding_message_size_bytes,
63};
64
65pub async fn prepare_start_parameters(
66    compactor_opts: &CompactorOpts,
67    config: RwConfig,
68    system_params_reader: SystemParamsReader,
69) -> (
70    Arc<SstableStore>,
71    Arc<MemoryLimiter>,
72    HeapProfiler,
73    Option<CompactionAwaitTreeRegRef>,
74    Arc<StorageOpts>,
75    Arc<CompactorMetrics>,
76) {
77    // Boot compactor
78    let object_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
79    let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
80
81    let state_store_url = system_params_reader.state_store();
82
83    let storage_memory_config = extract_storage_memory_config(&config);
84    let storage_opts: Arc<StorageOpts> = Arc::new(StorageOpts::from((
85        &config,
86        &system_params_reader,
87        &storage_memory_config,
88    )));
89    let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64
90        * config.storage.compactor_memory_available_proportion)
91        as usize;
92    let meta_cache_capacity_bytes = compactor_opts.compactor_meta_cache_memory_bytes;
93    let mut compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
94        Some(compactor_memory_limit_mb) => compactor_memory_limit_mb * (1 << 20),
95        None => non_reserved_memory_bytes,
96    };
97
98    compactor_memory_limit_bytes = compactor_memory_limit_bytes.checked_sub(compactor_opts.compactor_meta_cache_memory_bytes).unwrap_or_else(|| {
99        panic!(
100            "compactor_memory_limit_bytes{} is too small to hold compactor_meta_cache_memory_bytes {}",
101            compactor_memory_limit_bytes,
102            meta_cache_capacity_bytes
103        );
104    });
105
106    tracing::info!(
107        "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}",
108        non_reserved_memory_bytes,
109        meta_cache_capacity_bytes,
110        compactor_memory_limit_bytes,
111        storage_opts.sstable_size_mb * (1 << 20),
112        storage_opts.block_size_kb * (1 << 10),
113    );
114
115    // check memory config
116    {
117        // This is a similar logic to SstableBuilder memory detection, to ensure that we can find
118        // configuration problems as quickly as possible
119        let min_compactor_memory_limit_bytes = (storage_opts.sstable_size_mb * (1 << 20)
120            + storage_opts.block_size_kb * (1 << 10))
121            as u64;
122
123        assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes as usize * 2);
124    }
125
126    let object_store = build_remote_object_store(
127        state_store_url
128            .strip_prefix("hummock+")
129            .expect("object store must be hummock for compactor server"),
130        object_metrics,
131        "Hummock",
132        Arc::new(config.storage.object_store.clone()),
133    )
134    .await;
135
136    let object_store = Arc::new(object_store);
137    let sstable_store = Arc::new(
138        SstableStore::for_compactor(
139            object_store,
140            storage_opts.data_directory.clone(),
141            0,
142            meta_cache_capacity_bytes,
143            system_params_reader.use_new_object_prefix_strategy(),
144        )
145        .await
146        // FIXME(MrCroxx): Handle this error.
147        .unwrap(),
148    );
149
150    let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes as u64));
151    let storage_memory_config = extract_storage_memory_config(&config);
152    let memory_collector = Arc::new(HummockMemoryCollector::new(
153        sstable_store.clone(),
154        memory_limiter.clone(),
155        storage_memory_config,
156    ));
157
158    let heap_profiler = HeapProfiler::new(
159        system_memory_available_bytes(),
160        config.server.heap_profiling.clone(),
161    );
162
163    monitor_cache(memory_collector);
164
165    let await_tree_config = match &config.streaming.async_stack_trace {
166        AsyncStackTraceOption::Off => None,
167        c => await_tree::ConfigBuilder::default()
168            .verbose(c.is_verbose().unwrap())
169            .build()
170            .ok(),
171    };
172    let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
173
174    (
175        sstable_store,
176        memory_limiter,
177        heap_profiler,
178        await_tree_reg,
179        storage_opts,
180        compactor_metrics,
181    )
182}
183
184/// Fetches and runs compaction tasks.
185///
186/// Returns when the `shutdown` token is triggered.
187pub async fn compactor_serve(
188    listen_addr: SocketAddr,
189    advertise_addr: HostAddr,
190    opts: CompactorOpts,
191    shutdown: CancellationToken,
192    compactor_mode: CompactorMode,
193) {
194    let config = load_config(&opts.config_path, &opts);
195    info!("Starting compactor node",);
196    info!("> config: {:?}", config);
197    info!(
198        "> debug assertions: {}",
199        if cfg!(debug_assertions) { "on" } else { "off" }
200    );
201    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
202
203    let is_iceberg_compactor = matches!(
204        compactor_mode,
205        CompactorMode::DedicatedIceberg | CompactorMode::SharedIceberg
206    );
207
208    let compaction_executor = Arc::new(CompactionExecutor::new(
209        opts.compaction_worker_threads_number,
210    ));
211
212    let max_task_parallelism: u32 = (compaction_executor.worker_num() as f32
213        * config.storage.compactor_max_task_multiplier)
214        .ceil() as u32;
215
216    // Register to the cluster.
217    let (meta_client, system_params_reader) = MetaClient::register_new(
218        opts.meta_address.clone(),
219        WorkerType::Compactor,
220        &advertise_addr,
221        Property {
222            is_iceberg_compactor,
223            parallelism: max_task_parallelism,
224            ..Default::default()
225        },
226        Arc::new(config.meta.clone()),
227    )
228    .await;
229
230    info!("Assigned compactor id {}", meta_client.worker_id());
231
232    let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
233
234    let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
235        meta_client.clone(),
236        hummock_metrics.clone(),
237    ));
238
239    let (
240        sstable_store,
241        memory_limiter,
242        heap_profiler,
243        await_tree_reg,
244        storage_opts,
245        compactor_metrics,
246    ) = Box::pin(prepare_start_parameters(
247        &opts,
248        config.clone(),
249        system_params_reader.clone(),
250    ))
251    .await;
252
253    let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
254        RemoteTableAccessor::new(meta_client.clone()),
255    )));
256
257    let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
258    let compactor_observer_node = CompactorObserverNode::new(
259        compaction_catalog_manager_ref.clone(),
260        system_params_manager.clone(),
261    );
262    let observer_manager =
263        ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await;
264
265    // Run a background heap profiler
266    heap_profiler.start();
267
268    // use half of limit because any memory which would hold in meta-cache will be allocate by
269    // limited at first.
270    let _observer_join_handle = observer_manager.start().await;
271
272    let object_id_manager = Arc::new(ObjectIdManager::new(
273        hummock_meta_client.clone(),
274        storage_opts.sstable_id_remote_fetch_number,
275    ));
276
277    let compactor_context = CompactorContext {
278        storage_opts,
279        sstable_store: sstable_store.clone(),
280        compactor_metrics,
281        is_share_buffer_compact: false,
282        compaction_executor,
283        memory_limiter,
284        task_progress_manager: Default::default(),
285        await_tree_reg: await_tree_reg.clone(),
286    };
287
288    // TODO(shutdown): don't collect sub-tasks as there's no need to gracefully shutdown them.
289    let mut sub_tasks = vec![
290        MetaClient::start_heartbeat_loop(
291            meta_client.clone(),
292            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
293        ),
294        match compactor_mode {
295            CompactorMode::Dedicated => risingwave_storage::hummock::compactor::start_compactor(
296                compactor_context.clone(),
297                hummock_meta_client.clone(),
298                object_id_manager.clone(),
299                compaction_catalog_manager_ref,
300            ),
301            CompactorMode::Shared => unreachable!(),
302            CompactorMode::DedicatedIceberg => {
303                risingwave_storage::hummock::compactor::start_iceberg_compactor(
304                    compactor_context.clone(),
305                    hummock_meta_client.clone(),
306                )
307            }
308            CompactorMode::SharedIceberg => unreachable!(),
309        },
310    ];
311
312    let telemetry_manager = TelemetryManager::new(
313        Arc::new(meta_client.clone()),
314        Arc::new(CompactorTelemetryCreator::new()),
315    );
316    // if the toml config file or env variable disables telemetry, do not watch system params change
317    // because if any of configs disable telemetry, we should never start it
318    if config.server.telemetry_enabled && telemetry_env_enabled() {
319        sub_tasks.push(telemetry_manager.start().await);
320    } else {
321        tracing::info!("Telemetry didn't start due to config");
322    }
323
324    let compactor_srv = CompactorServiceImpl::default();
325    let monitor_srv = MonitorServiceImpl::new(await_tree_reg, config.server.clone());
326    let server = tonic::transport::Server::builder()
327        .add_service(CompactorServiceServer::new(compactor_srv))
328        .add_service(MonitorServiceServer::new(monitor_srv))
329        .monitored_serve_with_shutdown(
330            listen_addr,
331            "grpc-compactor-node-service",
332            TcpConfig {
333                tcp_nodelay: true,
334                keepalive_duration: None,
335            },
336            shutdown.clone().cancelled_owned(),
337        );
338    let _server_handle = tokio::spawn(server);
339
340    // Boot metrics service.
341    if config.server.metrics_level > MetricLevel::Disabled {
342        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
343    }
344
345    // All set, let the meta service know we're ready.
346    meta_client.activate(&advertise_addr).await.unwrap();
347
348    // Wait for the shutdown signal.
349    shutdown.cancelled().await;
350    // Run shutdown logic.
351    meta_client.try_unregister().await;
352}
353
354/// Fetches and runs compaction tasks under shared mode.
355///
356/// Returns when the `shutdown` token is triggered.
357pub async fn shared_compactor_serve(
358    listen_addr: SocketAddr,
359    opts: CompactorOpts,
360    shutdown: CancellationToken,
361) {
362    let config = load_config(&opts.config_path, &opts);
363    info!("Starting shared compactor node",);
364    info!("> config: {:?}", config);
365    info!(
366        "> debug assertions: {}",
367        if cfg!(debug_assertions) { "on" } else { "off" }
368    );
369    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
370
371    let grpc_proxy_client = GrpcCompactorProxyClient::new(opts.proxy_rpc_endpoint.clone()).await;
372    let system_params_response = grpc_proxy_client
373        .get_system_params()
374        .await
375        .expect("Fail to get system params, the compactor pod cannot be started.");
376    let system_params = system_params_response.into_inner().params.unwrap();
377
378    let (
379        sstable_store,
380        memory_limiter,
381        heap_profiler,
382        await_tree_reg,
383        storage_opts,
384        compactor_metrics,
385    ) = Box::pin(prepare_start_parameters(
386        &opts,
387        config.clone(),
388        system_params.into(),
389    ))
390    .await;
391    let (sender, receiver) = mpsc::unbounded_channel();
392    let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender);
393
394    let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone(), config.server.clone());
395
396    // Run a background heap profiler
397    heap_profiler.start();
398
399    let compaction_executor = Arc::new(CompactionExecutor::new(
400        opts.compaction_worker_threads_number,
401    ));
402    let compactor_context = CompactorContext {
403        storage_opts,
404        sstable_store,
405        compactor_metrics,
406        is_share_buffer_compact: false,
407        compaction_executor,
408        memory_limiter,
409        task_progress_manager: Default::default(),
410        await_tree_reg,
411    };
412
413    // TODO(shutdown): don't collect there's no need to gracefully shutdown them.
414    // Hold the join handle and tx to keep the compactor running.
415    let _compactor_handle = risingwave_storage::hummock::compactor::start_shared_compactor(
416        grpc_proxy_client,
417        receiver,
418        compactor_context,
419    );
420
421    let rpc_max_encoding_message_size_bytes = opts
422        .rpc_max_encoding_message_size_bytes
423        .unwrap_or(default_rpc_max_encoding_message_size_bytes());
424
425    let rpc_max_decoding_message_size_bytes = opts
426        .rpc_max_decoding_message_size_bytes
427        .unwrap_or(default_rpc_max_decoding_message_size_bytes());
428
429    let server = tonic::transport::Server::builder()
430        .add_service(
431            CompactorServiceServer::new(compactor_srv)
432                .max_decoding_message_size(rpc_max_decoding_message_size_bytes)
433                .max_encoding_message_size(rpc_max_encoding_message_size_bytes),
434        )
435        .add_service(MonitorServiceServer::new(monitor_srv))
436        .monitored_serve_with_shutdown(
437            listen_addr,
438            "grpc-compactor-node-service",
439            TcpConfig {
440                tcp_nodelay: true,
441                keepalive_duration: None,
442            },
443            shutdown.clone().cancelled_owned(),
444        );
445
446    let _server_handle = tokio::spawn(server);
447
448    // Boot metrics service.
449    if config.server.metrics_level > MetricLevel::Disabled {
450        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
451    }
452
453    // Wait for the shutdown signal.
454    shutdown.cancelled().await;
455
456    // TODO(shutdown): shall we notify the proxy that we are shutting down?
457}