risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use parquet::basic::Compression;
19use parquet::file::properties::WriterProperties;
20use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
21use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
22use risingwave_pb::hummock::PbCompactTask;
23use risingwave_pb::hummock::report_compaction_task_request::{
24    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
25    ReportTask as ReportSharedTask,
26};
27use risingwave_pb::iceberg_compaction::{
28    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
29    subscribe_iceberg_compaction_event_request,
30};
31use risingwave_rpc_client::GrpcCompactorProxyClient;
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc;
34use tonic::Request;
35
36pub mod compactor_runner;
37mod context;
38pub mod fast_compactor_runner;
39mod iterator;
40mod shared_buffer_compact;
41pub(super) mod task_progress;
42
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
53    TtlCompactionFilter,
54};
55pub use context::{
56    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
57};
58use futures::{StreamExt, pin_mut};
59pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
60use more_asserts::assert_ge;
61use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
62use risingwave_hummock_sdk::{
63    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
64};
65use risingwave_pb::hummock::compact_task::TaskStatus;
66use risingwave_pb::hummock::subscribe_compaction_event_request::{
67    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
68};
69use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
70use risingwave_pb::hummock::{
71    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
72    SubscribeCompactionEventResponse,
73};
74use risingwave_rpc_client::HummockMetaClient;
75pub use shared_buffer_compact::{compact, merge_imms_in_memory};
76use tokio::sync::oneshot::Sender;
77use tokio::task::JoinHandle;
78use tokio::time::Instant;
79
80pub use self::compaction_utils::{
81    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
82    check_flush_result,
83};
84pub use self::task_progress::TaskProgress;
85use super::multi_builder::CapacitySplitTableBuilder;
86use super::{
87    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
88};
89use crate::compaction_catalog_manager::{
90    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
91};
92use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
93use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
94use crate::hummock::iceberg_compactor_runner::{
95    IcebergCompactorRunner, IcebergCompactorRunnerConfigBuilder, RunnerContext,
96};
97use crate::hummock::iterator::{Forward, HummockIterator};
98use crate::hummock::{
99    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
100    UnifiedSstableWriterFactory, validate_ssts,
101};
102use crate::monitor::CompactorMetrics;
103
104/// Implementation of Hummock compaction.
105pub struct Compactor {
106    /// The context of the compactor.
107    context: CompactorContext,
108    object_id_getter: Arc<dyn GetObjectId>,
109    task_config: TaskConfig,
110    options: SstableBuilderOptions,
111    get_id_time: Arc<AtomicU64>,
112}
113
114pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
115
116impl Compactor {
117    /// Create a new compactor.
118    pub fn new(
119        context: CompactorContext,
120        options: SstableBuilderOptions,
121        task_config: TaskConfig,
122        object_id_getter: Arc<dyn GetObjectId>,
123    ) -> Self {
124        Self {
125            context,
126            options,
127            task_config,
128            get_id_time: Arc::new(AtomicU64::new(0)),
129            object_id_getter,
130        }
131    }
132
133    /// Compact the given key range and merge iterator.
134    /// Upon a successful return, the built SSTs are already uploaded to object store.
135    ///
136    /// `task_progress` is only used for tasks on the compactor.
137    async fn compact_key_range(
138        &self,
139        iter: impl HummockIterator<Direction = Forward>,
140        compaction_filter: impl CompactionFilter,
141        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142        task_progress: Option<Arc<TaskProgress>>,
143        task_id: Option<HummockCompactionTaskId>,
144        split_index: Option<usize>,
145    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
146        // Monitor time cost building shared buffer to SSTs.
147        let compact_timer = if self.context.is_share_buffer_compact {
148            self.context
149                .compactor_metrics
150                .write_build_l0_sst_duration
151                .start_timer()
152        } else {
153            self.context
154                .compactor_metrics
155                .compact_sst_duration
156                .start_timer()
157        };
158
159        let (split_table_outputs, table_stats_map) = {
160            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
161            if self.task_config.use_block_based_filter {
162                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
163                    factory,
164                    iter,
165                    compaction_filter,
166                    compaction_catalog_agent_ref,
167                    task_progress.clone(),
168                    self.object_id_getter.clone(),
169                )
170                .instrument_await("compact".verbose())
171                .await?
172            } else {
173                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
174                    factory,
175                    iter,
176                    compaction_filter,
177                    compaction_catalog_agent_ref,
178                    task_progress.clone(),
179                    self.object_id_getter.clone(),
180                )
181                .instrument_await("compact".verbose())
182                .await?
183            }
184        };
185
186        compact_timer.observe_duration();
187
188        Self::report_progress(
189            self.context.compactor_metrics.clone(),
190            task_progress,
191            &split_table_outputs,
192            self.context.is_share_buffer_compact,
193        );
194
195        self.context
196            .compactor_metrics
197            .get_table_id_total_time_duration
198            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
199
200        debug_assert!(
201            split_table_outputs
202                .iter()
203                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
204        );
205
206        if task_id.is_some() {
207            // skip shared buffer compaction
208            tracing::info!(
209                "Finish Task {:?} split_index {:?} sst count {}",
210                task_id,
211                split_index,
212                split_table_outputs.len()
213            );
214        }
215        Ok((split_table_outputs, table_stats_map))
216    }
217
218    pub fn report_progress(
219        metrics: Arc<CompactorMetrics>,
220        task_progress: Option<Arc<TaskProgress>>,
221        ssts: &Vec<LocalSstableInfo>,
222        is_share_buffer_compact: bool,
223    ) {
224        for sst_info in ssts {
225            let sst_size = sst_info.file_size();
226            if let Some(tracker) = &task_progress {
227                tracker.inc_ssts_uploaded();
228                tracker.dec_num_pending_write_io();
229            }
230            if is_share_buffer_compact {
231                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
232            } else {
233                metrics.compaction_upload_sst_counts.inc();
234            }
235        }
236    }
237
238    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
239        &self,
240        writer_factory: F,
241        iter: impl HummockIterator<Direction = Forward>,
242        compaction_filter: impl CompactionFilter,
243        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
244        task_progress: Option<Arc<TaskProgress>>,
245        object_id_getter: Arc<dyn GetObjectId>,
246    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
247        let builder_factory = RemoteBuilderFactory::<F, B> {
248            object_id_getter,
249            limiter: self.context.memory_limiter.clone(),
250            options: self.options.clone(),
251            policy: self.task_config.cache_policy,
252            remote_rpc_cost: self.get_id_time.clone(),
253            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
254            sstable_writer_factory: writer_factory,
255            _phantom: PhantomData,
256        };
257
258        let mut sst_builder = CapacitySplitTableBuilder::new(
259            builder_factory,
260            self.context.compactor_metrics.clone(),
261            task_progress.clone(),
262            self.task_config.table_vnode_partition.clone(),
263            self.context
264                .storage_opts
265                .compactor_concurrent_uploading_sst_count,
266            compaction_catalog_agent_ref,
267        );
268        let compaction_statistics = compact_and_build_sst(
269            &mut sst_builder,
270            &self.task_config,
271            self.context.compactor_metrics.clone(),
272            iter,
273            compaction_filter,
274        )
275        .instrument_await("compact_and_build_sst".verbose())
276        .await?;
277
278        let ssts = sst_builder
279            .finish()
280            .instrument_await("builder_finish".verbose())
281            .await?;
282
283        Ok((ssts, compaction_statistics))
284    }
285}
286
287/// The background compaction thread that receives compaction tasks from hummock compaction
288/// manager and runs compaction tasks.
289#[cfg_attr(coverage, coverage(off))]
290#[must_use]
291pub fn start_compactor_iceberg(
292    compactor_context: CompactorContext,
293    hummock_meta_client: Arc<dyn HummockMetaClient>,
294) -> (JoinHandle<()>, Sender<()>) {
295    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
296    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
297    let stream_retry_interval = Duration::from_secs(30);
298    let periodic_event_update_interval = Duration::from_millis(1000);
299    let worker_num = compactor_context.compaction_executor.worker_num();
300
301    let max_task_parallelism: u32 = (worker_num as f32
302        * compactor_context.storage_opts.compactor_max_task_multiplier)
303        .ceil() as u32;
304    let running_task_parallelism = Arc::new(AtomicU32::new(0));
305
306    const MAX_PULL_TASK_COUNT: u32 = 4;
307    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
308
309    assert_ge!(
310        compactor_context.storage_opts.compactor_max_task_multiplier,
311        0.0
312    );
313
314    let join_handle = tokio::spawn(async move {
315        let shutdown_map = CompactionShutdownMap::default();
316        let mut min_interval = tokio::time::interval(stream_retry_interval);
317        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
318
319        // This outer loop is to recreate stream.
320        'start_stream: loop {
321            // reset state
322            // pull_task_ack.store(true, Ordering::SeqCst);
323            let mut pull_task_ack = true;
324            tokio::select! {
325                // Wait for interval.
326                _ = min_interval.tick() => {},
327                // Shutdown compactor.
328                _ = &mut shutdown_rx => {
329                    tracing::info!("Compactor is shutting down");
330                    return;
331                }
332            }
333
334            let (request_sender, response_event_stream) = match hummock_meta_client
335                .subscribe_iceberg_compaction_event()
336                .await
337            {
338                Ok((request_sender, response_event_stream)) => {
339                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
340                    (request_sender, response_event_stream)
341                }
342
343                Err(e) => {
344                    tracing::warn!(
345                        error = %e.as_report(),
346                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
347                    );
348                    continue 'start_stream;
349                }
350            };
351
352            pin_mut!(response_event_stream);
353
354            let executor = compactor_context.compaction_executor.clone();
355
356            // This inner loop is to consume stream or report task progress.
357            let mut event_loop_iteration_now = Instant::now();
358            'consume_stream: loop {
359                {
360                    // report
361                    compactor_context
362                        .compactor_metrics
363                        .compaction_event_loop_iteration_latency
364                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
365                    event_loop_iteration_now = Instant::now();
366                }
367
368                let running_task_parallelism = running_task_parallelism.clone();
369                let request_sender = request_sender.clone();
370                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
371                    _ = periodic_event_interval.tick() => {
372                        let mut pending_pull_task_count = 0;
373                        if pull_task_ack {
374                            // TODO: Compute parallelism on meta side
375                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
376
377                            if pending_pull_task_count > 0 {
378                                if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
379                                    event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
380                                        subscribe_iceberg_compaction_event_request::PullTask {
381                                            pull_task_count: pending_pull_task_count,
382                                        }
383                                    )),
384                                    create_at: SystemTime::now()
385                                        .duration_since(std::time::UNIX_EPOCH)
386                                        .expect("Clock may have gone backwards")
387                                        .as_millis() as u64,
388                                }) {
389                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
390
391                                    // re subscribe stream
392                                    continue 'start_stream;
393                                } else {
394                                    pull_task_ack = false;
395                                }
396                            }
397                        }
398
399                        tracing::info!(
400                            running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
401                            pull_task_ack = %pull_task_ack,
402                            pending_pull_task_count = %pending_pull_task_count
403                        );
404
405                        continue;
406                    }
407                    event = response_event_stream.next() => {
408                        event
409                    }
410
411                    _ = &mut shutdown_rx => {
412                        tracing::info!("Iceberg Compactor is shutting down");
413                        return
414                    }
415                };
416
417                match event {
418                    Some(Ok(SubscribeIcebergCompactionEventResponse {
419                        event,
420                        create_at: _create_at,
421                    })) => {
422                        let event = match event {
423                            Some(event) => event,
424                            None => continue 'consume_stream,
425                        };
426                        // todo!: add metrics
427                        let shutdown = shutdown_map.clone();
428                        match event {
429                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
430                                let task_id = iceberg_compaction_task.task_id;
431                                 let write_parquet_properties = WriterProperties::builder()
432                                        .set_created_by(concat!(
433                                            "risingwave version ",
434                                            env!("CARGO_PKG_VERSION")
435                                        )
436                                        .to_owned())
437                                        .set_max_row_group_size(
438                                            compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
439                                        )
440                                        .set_compression(Compression::SNAPPY) // TODO: make it configurable
441                                        .build();
442
443                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
444                                    .max_parallelism(worker_num as u32)
445                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
446                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
447                                    .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
448                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
449                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
450                                    .write_parquet_properties(write_parquet_properties)
451                                    .build() {
452                                    Ok(config) => config,
453                                    Err(e) => {
454                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
455                                        continue 'consume_stream;
456                                    }
457                                };
458
459
460                                let iceberg_runner = match IcebergCompactorRunner::new(
461                                    iceberg_compaction_task,
462                                    compactor_runner_config,
463                                    compactor_context.compactor_metrics.clone(),
464                                ).await {
465                                    Ok(runner) => runner,
466                                    Err(e) => {
467                                        tracing::warn!(error = %e.as_report(), "Failed to create iceberg compactor runner {}", task_id);
468                                        continue 'consume_stream;
469                                    }
470                                };
471
472                                executor.spawn(async move {
473                                    let (tx, rx) = tokio::sync::oneshot::channel();
474                                    shutdown.lock().unwrap().insert(task_id, tx);
475
476                                    if let Err(e) = iceberg_runner.compact(
477                                        RunnerContext::new(
478                                            max_task_parallelism,
479                                            running_task_parallelism.clone(),
480                                        ),
481                                        rx,
482                                    )
483                                    .await {
484                                        tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
485                                    }
486
487                                    shutdown.lock().unwrap().remove(&task_id);
488                                });
489                            },
490                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
491                                // set flag
492                                pull_task_ack = true;
493                            },
494                    }
495                    }
496                    Some(Err(e)) => {
497                        tracing::warn!("Failed to consume stream. {}", e.message());
498                        continue 'start_stream;
499                    }
500                    _ => {
501                        // The stream is exhausted
502                        continue 'start_stream;
503                    }
504                }
505            }
506        }
507    });
508
509    (join_handle, shutdown_tx)
510}
511
512/// The background compaction thread that receives compaction tasks from hummock compaction
513/// manager and runs compaction tasks.
514#[cfg_attr(coverage, coverage(off))]
515#[must_use]
516pub fn start_compactor(
517    compactor_context: CompactorContext,
518    hummock_meta_client: Arc<dyn HummockMetaClient>,
519    object_id_manager: Arc<ObjectIdManager>,
520    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
521) -> (JoinHandle<()>, Sender<()>) {
522    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
523    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
524    let stream_retry_interval = Duration::from_secs(30);
525    let task_progress = compactor_context.task_progress_manager.clone();
526    let periodic_event_update_interval = Duration::from_millis(1000);
527
528    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
529        * compactor_context.storage_opts.compactor_max_task_multiplier)
530        .ceil() as u32;
531    let running_task_parallelism = Arc::new(AtomicU32::new(0));
532
533    const MAX_PULL_TASK_COUNT: u32 = 4;
534    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
535
536    assert_ge!(
537        compactor_context.storage_opts.compactor_max_task_multiplier,
538        0.0
539    );
540
541    let join_handle = tokio::spawn(async move {
542        let shutdown_map = CompactionShutdownMap::default();
543        let mut min_interval = tokio::time::interval(stream_retry_interval);
544        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
545
546        // This outer loop is to recreate stream.
547        'start_stream: loop {
548            // reset state
549            // pull_task_ack.store(true, Ordering::SeqCst);
550            let mut pull_task_ack = true;
551            tokio::select! {
552                // Wait for interval.
553                _ = min_interval.tick() => {},
554                // Shutdown compactor.
555                _ = &mut shutdown_rx => {
556                    tracing::info!("Compactor is shutting down");
557                    return;
558                }
559            }
560
561            let (request_sender, response_event_stream) =
562                match hummock_meta_client.subscribe_compaction_event().await {
563                    Ok((request_sender, response_event_stream)) => {
564                        tracing::debug!("Succeeded subscribe_compaction_event.");
565                        (request_sender, response_event_stream)
566                    }
567
568                    Err(e) => {
569                        tracing::warn!(
570                            error = %e.as_report(),
571                            "Subscribing to compaction tasks failed with error. Will retry.",
572                        );
573                        continue 'start_stream;
574                    }
575                };
576
577            pin_mut!(response_event_stream);
578
579            let executor = compactor_context.compaction_executor.clone();
580            let object_id_manager = object_id_manager.clone();
581
582            // This inner loop is to consume stream or report task progress.
583            let mut event_loop_iteration_now = Instant::now();
584            'consume_stream: loop {
585                {
586                    // report
587                    compactor_context
588                        .compactor_metrics
589                        .compaction_event_loop_iteration_latency
590                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
591                    event_loop_iteration_now = Instant::now();
592                }
593
594                let running_task_parallelism = running_task_parallelism.clone();
595                let request_sender = request_sender.clone();
596                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
597                    _ = periodic_event_interval.tick() => {
598                        let progress_list = get_task_progress(task_progress.clone());
599
600                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
601                            event: Some(RequestEvent::HeartBeat(
602                                HeartBeat {
603                                    progress: progress_list
604                                }
605                            )),
606                            create_at: SystemTime::now()
607                                .duration_since(std::time::UNIX_EPOCH)
608                                .expect("Clock may have gone backwards")
609                                .as_millis() as u64,
610                        }) {
611                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
612                            // re subscribe stream
613                            continue 'start_stream;
614                        }
615
616
617                        let mut pending_pull_task_count = 0;
618                        if pull_task_ack {
619                            // TODO: Compute parallelism on meta side
620                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
621
622                            if pending_pull_task_count > 0 {
623                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
624                                    event: Some(RequestEvent::PullTask(
625                                        PullTask {
626                                            pull_task_count: pending_pull_task_count,
627                                        }
628                                    )),
629                                    create_at: SystemTime::now()
630                                        .duration_since(std::time::UNIX_EPOCH)
631                                        .expect("Clock may have gone backwards")
632                                        .as_millis() as u64,
633                                }) {
634                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
635
636                                    // re subscribe stream
637                                    continue 'start_stream;
638                                } else {
639                                    pull_task_ack = false;
640                                }
641                            }
642                        }
643
644                        tracing::info!(
645                            running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
646                            pull_task_ack = %pull_task_ack,
647                            pending_pull_task_count = %pending_pull_task_count
648                        );
649
650                        continue;
651                    }
652                    event = response_event_stream.next() => {
653                        event
654                    }
655
656                    _ = &mut shutdown_rx => {
657                        tracing::info!("Compactor is shutting down");
658                        return
659                    }
660                };
661
662                fn send_report_task_event(
663                    compact_task: &CompactTask,
664                    table_stats: TableStatsMap,
665                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
666                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
667                ) {
668                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
669                        event: Some(RequestEvent::ReportTask(ReportTask {
670                            task_id: compact_task.task_id,
671                            task_status: compact_task.task_status.into(),
672                            sorted_output_ssts: compact_task
673                                .sorted_output_ssts
674                                .iter()
675                                .map(|sst| sst.into())
676                                .collect(),
677                            table_stats_change: to_prost_table_stats_map(table_stats),
678                            object_timestamps: object_timestamps
679                                .into_iter()
680                                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
681                                .collect(),
682                        })),
683                        create_at: SystemTime::now()
684                            .duration_since(std::time::UNIX_EPOCH)
685                            .expect("Clock may have gone backwards")
686                            .as_millis() as u64,
687                    }) {
688                        let task_id = compact_task.task_id;
689                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
690                    }
691                }
692
693                match event {
694                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
695                        let event = match event {
696                            Some(event) => event,
697                            None => continue 'consume_stream,
698                        };
699                        let shutdown = shutdown_map.clone();
700                        let context = compactor_context.clone();
701                        let consumed_latency_ms = SystemTime::now()
702                            .duration_since(std::time::UNIX_EPOCH)
703                            .expect("Clock may have gone backwards")
704                            .as_millis() as u64
705                            - create_at;
706                        context
707                            .compactor_metrics
708                            .compaction_event_consumed_latency
709                            .observe(consumed_latency_ms as _);
710
711                        let object_id_manager = object_id_manager.clone();
712                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
713
714                        match event {
715                            ResponseEvent::CompactTask(compact_task) => {
716                                let compact_task = CompactTask::from(compact_task);
717                                let parallelism =
718                                    calculate_task_parallelism(&compact_task, &context);
719
720                                assert_ne!(parallelism, 0, "splits cannot be empty");
721
722                                if (max_task_parallelism
723                                    - running_task_parallelism.load(Ordering::SeqCst))
724                                    < parallelism as u32
725                                {
726                                    tracing::warn!(
727                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
728                                        compact_task.task_id,
729                                        parallelism,
730                                        max_task_parallelism,
731                                        running_task_parallelism.load(Ordering::Relaxed),
732                                    );
733                                    let (compact_task, table_stats, object_timestamps) =
734                                        compact_done(
735                                            compact_task,
736                                            context.clone(),
737                                            vec![],
738                                            TaskStatus::NoAvailCpuResourceCanceled,
739                                        );
740
741                                    send_report_task_event(
742                                        &compact_task,
743                                        table_stats,
744                                        object_timestamps,
745                                        &request_sender,
746                                    );
747
748                                    continue 'consume_stream;
749                                }
750
751                                running_task_parallelism
752                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
753                                executor.spawn(async move {
754                                    let (tx, rx) = tokio::sync::oneshot::channel();
755                                    let task_id = compact_task.task_id;
756                                    shutdown.lock().unwrap().insert(task_id, tx);
757
758                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
759                                        context.clone(),
760                                        compact_task,
761                                        rx,
762                                        object_id_manager.clone(),
763                                        compaction_catalog_manager_ref.clone(),
764                                    )
765                                    .await;
766
767                                    shutdown.lock().unwrap().remove(&task_id);
768                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
769
770                                    send_report_task_event(
771                                        &compact_task,
772                                        table_stats,
773                                        object_timestamps,
774                                        &request_sender,
775                                    );
776
777                                    let enable_check_compaction_result =
778                                    context.storage_opts.check_compaction_result;
779                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
780
781                                    if enable_check_compaction_result && need_check_task {
782                                        let compact_table_ids = compact_task.build_compact_table_ids();
783                                        match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
784                                            Ok(compaction_catalog_agent_ref) =>  {
785                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
786                                                {
787                                                    Err(e) => {
788                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
789                                                    }
790                                                    Ok(true) => (),
791                                                    Ok(false) => {
792                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
793                                                    }
794                                                }
795                                            },
796                                            Err(e) => {
797                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
798                                            }
799                                        }
800                                    }
801                                });
802                            }
803                            ResponseEvent::VacuumTask(_) => {
804                                unreachable!("unexpected vacuum task");
805                            }
806                            ResponseEvent::FullScanTask(_) => {
807                                unreachable!("unexpected scan task");
808                            }
809                            ResponseEvent::ValidationTask(validation_task) => {
810                                let validation_task = ValidationTask::from(validation_task);
811                                executor.spawn(async move {
812                                    validate_ssts(validation_task, context.sstable_store.clone())
813                                        .await;
814                                });
815                            }
816                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
817                                .lock()
818                                .unwrap()
819                                .remove(&cancel_compact_task.task_id)
820                            {
821                                Some(tx) => {
822                                    if tx.send(()).is_err() {
823                                        tracing::warn!(
824                                            "Cancellation of compaction task failed. task_id: {}",
825                                            cancel_compact_task.task_id
826                                        );
827                                    }
828                                }
829                                _ => {
830                                    tracing::warn!(
831                                        "Attempting to cancel non-existent compaction task. task_id: {}",
832                                        cancel_compact_task.task_id
833                                    );
834                                }
835                            },
836
837                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
838                                // set flag
839                                pull_task_ack = true;
840                            }
841                        }
842                    }
843                    Some(Err(e)) => {
844                        tracing::warn!("Failed to consume stream. {}", e.message());
845                        continue 'start_stream;
846                    }
847                    _ => {
848                        // The stream is exhausted
849                        continue 'start_stream;
850                    }
851                }
852            }
853        }
854    });
855
856    (join_handle, shutdown_tx)
857}
858
859/// The background compaction thread that receives compaction tasks from hummock compaction
860/// manager and runs compaction tasks.
861#[cfg_attr(coverage, coverage(off))]
862#[must_use]
863pub fn start_shared_compactor(
864    grpc_proxy_client: GrpcCompactorProxyClient,
865    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
866    context: CompactorContext,
867) -> (JoinHandle<()>, Sender<()>) {
868    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
869    let task_progress = context.task_progress_manager.clone();
870    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
871    let periodic_event_update_interval = Duration::from_millis(1000);
872
873    let join_handle = tokio::spawn(async move {
874        let shutdown_map = CompactionShutdownMap::default();
875
876        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
877        let executor = context.compaction_executor.clone();
878        let report_heartbeat_client = grpc_proxy_client.clone();
879        'consume_stream: loop {
880            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
881                _ = periodic_event_interval.tick() => {
882                    let progress_list = get_task_progress(task_progress.clone());
883                    let report_compaction_task_request = ReportCompactionTaskRequest{
884                        event: Some(ReportCompactionTaskEvent::HeartBeat(
885                            SharedHeartBeat {
886                                progress: progress_list
887                            }
888                        )),
889                     };
890                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
891                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
892                    }
893                    continue
894                }
895
896
897                _ = &mut shutdown_rx => {
898                    tracing::info!("Compactor is shutting down");
899                    return
900                }
901
902                request = receiver.recv() => {
903                    request
904                }
905
906            };
907            match request {
908                Some(request) => {
909                    let context = context.clone();
910                    let shutdown = shutdown_map.clone();
911
912                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
913                    executor.spawn(async move {
914                        let DispatchCompactionTaskRequest {
915                            tables,
916                            output_object_ids,
917                            task: dispatch_task,
918                        } = request.into_inner();
919                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
920                            acc.insert(table.id, table);
921                            acc
922                        });
923
924                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
925                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
926                        let shared_compactor_object_id_manager =
927                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
928                            match dispatch_task.unwrap() {
929                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
930                                    let compact_task = CompactTask::from(&compact_task);
931                                    let (tx, rx) = tokio::sync::oneshot::channel();
932                                    let task_id = compact_task.task_id;
933                                    shutdown.lock().unwrap().insert(task_id, tx);
934
935                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
936                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
937                                        context.clone(),
938                                        compact_task,
939                                        rx,
940                                        shared_compactor_object_id_manager,
941                                        compaction_catalog_agent_ref.clone(),
942                                    )
943                                    .await;
944                                    shutdown.lock().unwrap().remove(&task_id);
945                                    let report_compaction_task_request = ReportCompactionTaskRequest {
946                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
947                                            compact_task: Some(PbCompactTask::from(&compact_task)),
948                                            table_stats_change: to_prost_table_stats_map(table_stats),
949                                            object_timestamps: object_timestamps
950                                            .into_iter()
951                                            .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
952                                            .collect(),
953                                    })),
954                                    };
955
956                                    match cloned_grpc_proxy_client
957                                        .report_compaction_task(report_compaction_task_request)
958                                        .await
959                                    {
960                                        Ok(_) => {
961                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
962                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
963                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
964                                            if enable_check_compaction_result && need_check_task {
965                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
966                                                    Err(e) => {
967                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
968                                                    },
969                                                    Ok(true) => (),
970                                                    Ok(false) => {
971                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
972                                                    }
973                                                }
974                                            }
975                                        }
976                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
977                                    }
978
979                                }
980                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
981                                    unreachable!("unexpected vacuum task");
982                                }
983                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
984                                    unreachable!("unexpected scan task");
985                                }
986                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
987                                    let validation_task = ValidationTask::from(validation_task);
988                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
989                                }
990                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
991                                    match shutdown
992                                        .lock()
993                                        .unwrap()
994                                        .remove(&cancel_compact_task.task_id)
995                                    { Some(tx) => {
996                                        if tx.send(()).is_err() {
997                                            tracing::warn!(
998                                                "Cancellation of compaction task failed. task_id: {}",
999                                                cancel_compact_task.task_id
1000                                            );
1001                                        }
1002                                    } _ => {
1003                                        tracing::warn!(
1004                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1005                                            cancel_compact_task.task_id
1006                                        );
1007                                    }}
1008                                }
1009                            }
1010                    });
1011                }
1012                None => continue 'consume_stream,
1013            }
1014        }
1015    });
1016    (join_handle, shutdown_tx)
1017}
1018
1019fn get_task_progress(
1020    task_progress: Arc<
1021        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1022    >,
1023) -> Vec<CompactTaskProgress> {
1024    let mut progress_list = Vec::new();
1025    for (&task_id, progress) in &*task_progress.lock() {
1026        progress_list.push(progress.snapshot(task_id));
1027    }
1028    progress_list
1029}