risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
19use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
20use risingwave_pb::hummock::PbCompactTask;
21use risingwave_pb::hummock::report_compaction_task_request::{
22    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
23    ReportTask as ReportSharedTask,
24};
25use risingwave_rpc_client::GrpcCompactorProxyClient;
26use thiserror_ext::AsReport;
27use tokio::sync::mpsc;
28use tonic::Request;
29
30pub mod compactor_runner;
31mod context;
32pub mod fast_compactor_runner;
33mod iterator;
34mod shared_buffer_compact;
35pub(super) mod task_progress;
36
37use std::collections::{HashMap, VecDeque};
38use std::marker::PhantomData;
39use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::{Arc, Mutex};
41use std::time::{Duration, SystemTime};
42
43use await_tree::{InstrumentAwait, SpanExt};
44pub use compaction_executor::CompactionExecutor;
45pub use compaction_filter::{
46    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
47    TtlCompactionFilter,
48};
49pub use context::{
50    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
51};
52use futures::{StreamExt, pin_mut};
53pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
54use more_asserts::assert_ge;
55use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
56use risingwave_hummock_sdk::{
57    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
58};
59use risingwave_pb::hummock::compact_task::TaskStatus;
60use risingwave_pb::hummock::subscribe_compaction_event_request::{
61    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
62};
63use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
64use risingwave_pb::hummock::{
65    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
66    SubscribeCompactionEventResponse,
67};
68use risingwave_rpc_client::HummockMetaClient;
69pub use shared_buffer_compact::{compact, merge_imms_in_memory};
70use tokio::sync::oneshot::Sender;
71use tokio::task::JoinHandle;
72use tokio::time::Instant;
73
74pub use self::compaction_utils::{
75    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
76    check_flush_result,
77};
78pub use self::task_progress::TaskProgress;
79use super::multi_builder::CapacitySplitTableBuilder;
80use super::{
81    GetObjectId, HummockResult, SstableBuilderOptions, SstableObjectIdManager, Xor16FilterBuilder,
82};
83use crate::compaction_catalog_manager::{
84    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
85};
86use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
87use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
88use crate::hummock::iterator::{Forward, HummockIterator};
89use crate::hummock::{
90    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
91    UnifiedSstableWriterFactory, validate_ssts,
92};
93use crate::monitor::CompactorMetrics;
94
95/// Implementation of Hummock compaction.
96pub struct Compactor {
97    /// The context of the compactor.
98    context: CompactorContext,
99    object_id_getter: Box<dyn GetObjectId>,
100    task_config: TaskConfig,
101    options: SstableBuilderOptions,
102    get_id_time: Arc<AtomicU64>,
103}
104
105pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
106
107impl Compactor {
108    /// Create a new compactor.
109    pub fn new(
110        context: CompactorContext,
111        options: SstableBuilderOptions,
112        task_config: TaskConfig,
113        object_id_getter: Box<dyn GetObjectId>,
114    ) -> Self {
115        Self {
116            context,
117            options,
118            task_config,
119            get_id_time: Arc::new(AtomicU64::new(0)),
120            object_id_getter,
121        }
122    }
123
124    /// Compact the given key range and merge iterator.
125    /// Upon a successful return, the built SSTs are already uploaded to object store.
126    ///
127    /// `task_progress` is only used for tasks on the compactor.
128    async fn compact_key_range(
129        &self,
130        iter: impl HummockIterator<Direction = Forward>,
131        compaction_filter: impl CompactionFilter,
132        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
133        task_progress: Option<Arc<TaskProgress>>,
134        task_id: Option<HummockCompactionTaskId>,
135        split_index: Option<usize>,
136    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
137        // Monitor time cost building shared buffer to SSTs.
138        let compact_timer = if self.context.is_share_buffer_compact {
139            self.context
140                .compactor_metrics
141                .write_build_l0_sst_duration
142                .start_timer()
143        } else {
144            self.context
145                .compactor_metrics
146                .compact_sst_duration
147                .start_timer()
148        };
149
150        let (split_table_outputs, table_stats_map) = {
151            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
152            if self.task_config.use_block_based_filter {
153                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
154                    factory,
155                    iter,
156                    compaction_filter,
157                    compaction_catalog_agent_ref,
158                    task_progress.clone(),
159                    self.object_id_getter.clone(),
160                )
161                .instrument_await("compact".verbose())
162                .await?
163            } else {
164                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
165                    factory,
166                    iter,
167                    compaction_filter,
168                    compaction_catalog_agent_ref,
169                    task_progress.clone(),
170                    self.object_id_getter.clone(),
171                )
172                .instrument_await("compact".verbose())
173                .await?
174            }
175        };
176
177        compact_timer.observe_duration();
178
179        Self::report_progress(
180            self.context.compactor_metrics.clone(),
181            task_progress,
182            &split_table_outputs,
183            self.context.is_share_buffer_compact,
184        );
185
186        self.context
187            .compactor_metrics
188            .get_table_id_total_time_duration
189            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
190
191        debug_assert!(
192            split_table_outputs
193                .iter()
194                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
195        );
196
197        if task_id.is_some() {
198            // skip shared buffer compaction
199            tracing::info!(
200                "Finish Task {:?} split_index {:?} sst count {}",
201                task_id,
202                split_index,
203                split_table_outputs.len()
204            );
205        }
206        Ok((split_table_outputs, table_stats_map))
207    }
208
209    pub fn report_progress(
210        metrics: Arc<CompactorMetrics>,
211        task_progress: Option<Arc<TaskProgress>>,
212        ssts: &Vec<LocalSstableInfo>,
213        is_share_buffer_compact: bool,
214    ) {
215        for sst_info in ssts {
216            let sst_size = sst_info.file_size();
217            if let Some(tracker) = &task_progress {
218                tracker.inc_ssts_uploaded();
219                tracker.dec_num_pending_write_io();
220            }
221            if is_share_buffer_compact {
222                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
223            } else {
224                metrics.compaction_upload_sst_counts.inc();
225            }
226        }
227    }
228
229    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
230        &self,
231        writer_factory: F,
232        iter: impl HummockIterator<Direction = Forward>,
233        compaction_filter: impl CompactionFilter,
234        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
235        task_progress: Option<Arc<TaskProgress>>,
236        object_id_getter: Box<dyn GetObjectId>,
237    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
238        let builder_factory = RemoteBuilderFactory::<F, B> {
239            object_id_getter,
240            limiter: self.context.memory_limiter.clone(),
241            options: self.options.clone(),
242            policy: self.task_config.cache_policy,
243            remote_rpc_cost: self.get_id_time.clone(),
244            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
245            sstable_writer_factory: writer_factory,
246            _phantom: PhantomData,
247        };
248
249        let mut sst_builder = CapacitySplitTableBuilder::new(
250            builder_factory,
251            self.context.compactor_metrics.clone(),
252            task_progress.clone(),
253            self.task_config.table_vnode_partition.clone(),
254            self.context
255                .storage_opts
256                .compactor_concurrent_uploading_sst_count,
257            compaction_catalog_agent_ref,
258        );
259        let compaction_statistics = compact_and_build_sst(
260            &mut sst_builder,
261            &self.task_config,
262            self.context.compactor_metrics.clone(),
263            iter,
264            compaction_filter,
265        )
266        .instrument_await("compact_and_build_sst".verbose())
267        .await?;
268
269        let ssts = sst_builder
270            .finish()
271            .instrument_await("builder_finish".verbose())
272            .await?;
273
274        Ok((ssts, compaction_statistics))
275    }
276}
277
278/// The background compaction thread that receives compaction tasks from hummock compaction
279/// manager and runs compaction tasks.
280#[cfg_attr(coverage, coverage(off))]
281#[must_use]
282pub fn start_compactor(
283    compactor_context: CompactorContext,
284    hummock_meta_client: Arc<dyn HummockMetaClient>,
285    sstable_object_id_manager: Arc<SstableObjectIdManager>,
286    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
287) -> (JoinHandle<()>, Sender<()>) {
288    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
289    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
290    let stream_retry_interval = Duration::from_secs(30);
291    let task_progress = compactor_context.task_progress_manager.clone();
292    let periodic_event_update_interval = Duration::from_millis(1000);
293
294    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
295        * compactor_context.storage_opts.compactor_max_task_multiplier)
296        .ceil() as u32;
297    let running_task_parallelism = Arc::new(AtomicU32::new(0));
298
299    const MAX_PULL_TASK_COUNT: u32 = 4;
300    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
301
302    assert_ge!(
303        compactor_context.storage_opts.compactor_max_task_multiplier,
304        0.0
305    );
306
307    let join_handle = tokio::spawn(async move {
308        let shutdown_map = CompactionShutdownMap::default();
309        let mut min_interval = tokio::time::interval(stream_retry_interval);
310        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
311
312        // This outer loop is to recreate stream.
313        'start_stream: loop {
314            // reset state
315            // pull_task_ack.store(true, Ordering::SeqCst);
316            let mut pull_task_ack = true;
317            tokio::select! {
318                // Wait for interval.
319                _ = min_interval.tick() => {},
320                // Shutdown compactor.
321                _ = &mut shutdown_rx => {
322                    tracing::info!("Compactor is shutting down");
323                    return;
324                }
325            }
326
327            let (request_sender, response_event_stream) =
328                match hummock_meta_client.subscribe_compaction_event().await {
329                    Ok((request_sender, response_event_stream)) => {
330                        tracing::debug!("Succeeded subscribe_compaction_event.");
331                        (request_sender, response_event_stream)
332                    }
333
334                    Err(e) => {
335                        tracing::warn!(
336                            error = %e.as_report(),
337                            "Subscribing to compaction tasks failed with error. Will retry.",
338                        );
339                        continue 'start_stream;
340                    }
341                };
342
343            pin_mut!(response_event_stream);
344
345            let executor = compactor_context.compaction_executor.clone();
346            let sstable_object_id_manager = sstable_object_id_manager.clone();
347
348            // This inner loop is to consume stream or report task progress.
349            let mut event_loop_iteration_now = Instant::now();
350            'consume_stream: loop {
351                {
352                    // report
353                    compactor_context
354                        .compactor_metrics
355                        .compaction_event_loop_iteration_latency
356                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
357                    event_loop_iteration_now = Instant::now();
358                }
359
360                let running_task_parallelism = running_task_parallelism.clone();
361                let request_sender = request_sender.clone();
362                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
363                    _ = periodic_event_interval.tick() => {
364                        let progress_list = get_task_progress(task_progress.clone());
365
366                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
367                            event: Some(RequestEvent::HeartBeat(
368                                HeartBeat {
369                                    progress: progress_list
370                                }
371                            )),
372                            create_at: SystemTime::now()
373                                .duration_since(std::time::UNIX_EPOCH)
374                                .expect("Clock may have gone backwards")
375                                .as_millis() as u64,
376                        }) {
377                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
378                            // re subscribe stream
379                            continue 'start_stream;
380                        }
381
382
383                        let mut pending_pull_task_count = 0;
384                        if pull_task_ack {
385                            // TODO: Compute parallelism on meta side
386                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
387
388                            if pending_pull_task_count > 0 {
389                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
390                                    event: Some(RequestEvent::PullTask(
391                                        PullTask {
392                                            pull_task_count: pending_pull_task_count,
393                                        }
394                                    )),
395                                    create_at: SystemTime::now()
396                                        .duration_since(std::time::UNIX_EPOCH)
397                                        .expect("Clock may have gone backwards")
398                                        .as_millis() as u64,
399                                }) {
400                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
401
402                                    // re subscribe stream
403                                    continue 'start_stream;
404                                } else {
405                                    pull_task_ack = false;
406                                }
407                            }
408                        }
409
410                        tracing::info!(
411                            running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
412                            pull_task_ack = %pull_task_ack,
413                            pending_pull_task_count = %pending_pull_task_count
414                        );
415
416                        continue;
417                    }
418                    event = response_event_stream.next() => {
419                        event
420                    }
421
422                    _ = &mut shutdown_rx => {
423                        tracing::info!("Compactor is shutting down");
424                        return
425                    }
426                };
427
428                fn send_report_task_event(
429                    compact_task: &CompactTask,
430                    table_stats: TableStatsMap,
431                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
432                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
433                ) {
434                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
435                        event: Some(RequestEvent::ReportTask(ReportTask {
436                            task_id: compact_task.task_id,
437                            task_status: compact_task.task_status.into(),
438                            sorted_output_ssts: compact_task
439                                .sorted_output_ssts
440                                .iter()
441                                .map(|sst| sst.into())
442                                .collect(),
443                            table_stats_change: to_prost_table_stats_map(table_stats),
444                            object_timestamps,
445                        })),
446                        create_at: SystemTime::now()
447                            .duration_since(std::time::UNIX_EPOCH)
448                            .expect("Clock may have gone backwards")
449                            .as_millis() as u64,
450                    }) {
451                        let task_id = compact_task.task_id;
452                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
453                    }
454                }
455
456                match event {
457                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
458                        let event = match event {
459                            Some(event) => event,
460                            None => continue 'consume_stream,
461                        };
462                        let shutdown = shutdown_map.clone();
463                        let context = compactor_context.clone();
464                        let consumed_latency_ms = SystemTime::now()
465                            .duration_since(std::time::UNIX_EPOCH)
466                            .expect("Clock may have gone backwards")
467                            .as_millis() as u64
468                            - create_at;
469                        context
470                            .compactor_metrics
471                            .compaction_event_consumed_latency
472                            .observe(consumed_latency_ms as _);
473
474                        let sstable_object_id_manager = sstable_object_id_manager.clone();
475                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
476
477                        match event {
478                            ResponseEvent::CompactTask(compact_task) => {
479                                let compact_task = CompactTask::from(compact_task);
480                                let parallelism =
481                                    calculate_task_parallelism(&compact_task, &context);
482
483                                assert_ne!(parallelism, 0, "splits cannot be empty");
484
485                                if (max_task_parallelism
486                                    - running_task_parallelism.load(Ordering::SeqCst))
487                                    < parallelism as u32
488                                {
489                                    tracing::warn!(
490                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
491                                        compact_task.task_id,
492                                        parallelism,
493                                        max_task_parallelism,
494                                        running_task_parallelism.load(Ordering::Relaxed),
495                                    );
496                                    let (compact_task, table_stats, object_timestamps) =
497                                        compact_done(
498                                            compact_task,
499                                            context.clone(),
500                                            vec![],
501                                            TaskStatus::NoAvailCpuResourceCanceled,
502                                        );
503
504                                    send_report_task_event(
505                                        &compact_task,
506                                        table_stats,
507                                        object_timestamps,
508                                        &request_sender,
509                                    );
510
511                                    continue 'consume_stream;
512                                }
513
514                                running_task_parallelism
515                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
516                                executor.spawn(async move {
517                                    let (tx, rx) = tokio::sync::oneshot::channel();
518                                    let task_id = compact_task.task_id;
519                                    shutdown.lock().unwrap().insert(task_id, tx);
520
521                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
522                                        context.clone(),
523                                        compact_task,
524                                        rx,
525                                        Box::new(sstable_object_id_manager.clone()),
526                                        compaction_catalog_manager_ref.clone(),
527                                    )
528                                    .await;
529
530                                    shutdown.lock().unwrap().remove(&task_id);
531                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
532
533                                    send_report_task_event(
534                                        &compact_task,
535                                        table_stats,
536                                        object_timestamps,
537                                        &request_sender,
538                                    );
539
540                                    let enable_check_compaction_result =
541                                    context.storage_opts.check_compaction_result;
542                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
543
544                                    if enable_check_compaction_result && need_check_task {
545                                        let compact_table_ids = compact_task.build_compact_table_ids();
546                                        match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
547                                            Ok(compaction_catalog_agent_ref) =>  {
548                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
549                                                {
550                                                    Err(e) => {
551                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
552                                                    }
553                                                    Ok(true) => (),
554                                                    Ok(false) => {
555                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
556                                                    }
557                                                }
558                                            },
559                                            Err(e) => {
560                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
561                                            }
562                                        }
563                                    }
564                                });
565                            }
566                            ResponseEvent::VacuumTask(_) => {
567                                unreachable!("unexpected vacuum task");
568                            }
569                            ResponseEvent::FullScanTask(_) => {
570                                unreachable!("unexpected scan task");
571                            }
572                            ResponseEvent::ValidationTask(validation_task) => {
573                                let validation_task = ValidationTask::from(validation_task);
574                                executor.spawn(async move {
575                                    validate_ssts(validation_task, context.sstable_store.clone())
576                                        .await;
577                                });
578                            }
579                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
580                                .lock()
581                                .unwrap()
582                                .remove(&cancel_compact_task.task_id)
583                            {
584                                Some(tx) => {
585                                    if tx.send(()).is_err() {
586                                        tracing::warn!(
587                                            "Cancellation of compaction task failed. task_id: {}",
588                                            cancel_compact_task.task_id
589                                        );
590                                    }
591                                }
592                                _ => {
593                                    tracing::warn!(
594                                        "Attempting to cancel non-existent compaction task. task_id: {}",
595                                        cancel_compact_task.task_id
596                                    );
597                                }
598                            },
599
600                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
601                                // set flag
602                                pull_task_ack = true;
603                            }
604                        }
605                    }
606                    Some(Err(e)) => {
607                        tracing::warn!("Failed to consume stream. {}", e.message());
608                        continue 'start_stream;
609                    }
610                    _ => {
611                        // The stream is exhausted
612                        continue 'start_stream;
613                    }
614                }
615            }
616        }
617    });
618
619    (join_handle, shutdown_tx)
620}
621
622/// The background compaction thread that receives compaction tasks from hummock compaction
623/// manager and runs compaction tasks.
624#[cfg_attr(coverage, coverage(off))]
625#[must_use]
626pub fn start_shared_compactor(
627    grpc_proxy_client: GrpcCompactorProxyClient,
628    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
629    context: CompactorContext,
630) -> (JoinHandle<()>, Sender<()>) {
631    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
632    let task_progress = context.task_progress_manager.clone();
633    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
634    let periodic_event_update_interval = Duration::from_millis(1000);
635
636    let join_handle = tokio::spawn(async move {
637        let shutdown_map = CompactionShutdownMap::default();
638
639        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
640        let executor = context.compaction_executor.clone();
641        let report_heartbeat_client = grpc_proxy_client.clone();
642        'consume_stream: loop {
643            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
644                _ = periodic_event_interval.tick() => {
645                    let progress_list = get_task_progress(task_progress.clone());
646                    let report_compaction_task_request = ReportCompactionTaskRequest{
647                        event: Some(ReportCompactionTaskEvent::HeartBeat(
648                            SharedHeartBeat {
649                                progress: progress_list
650                            }
651                        )),
652                     };
653                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
654                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
655                    }
656                    continue
657                }
658
659
660                _ = &mut shutdown_rx => {
661                    tracing::info!("Compactor is shutting down");
662                    return
663                }
664
665                request = receiver.recv() => {
666                    request
667                }
668
669            };
670            match request {
671                Some(request) => {
672                    let context = context.clone();
673                    let shutdown = shutdown_map.clone();
674
675                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
676                    executor.spawn(async move {
677                        let DispatchCompactionTaskRequest {
678                            tables,
679                            output_object_ids,
680                            task: dispatch_task,
681                        } = request.into_inner();
682                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
683                            acc.insert(table.id, table);
684                            acc
685                        });
686
687                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
688                        output_object_ids_deque.extend(output_object_ids);
689                        let shared_compactor_object_id_manager =
690                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
691                            match dispatch_task.unwrap() {
692                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
693                                    let compact_task = CompactTask::from(&compact_task);
694                                    let (tx, rx) = tokio::sync::oneshot::channel();
695                                    let task_id = compact_task.task_id;
696                                    shutdown.lock().unwrap().insert(task_id, tx);
697
698                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
699                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
700                                        context.clone(),
701                                        compact_task,
702                                        rx,
703                                        Box::new(shared_compactor_object_id_manager),
704                                        compaction_catalog_agent_ref.clone(),
705                                    )
706                                    .await;
707                                    shutdown.lock().unwrap().remove(&task_id);
708                                    let report_compaction_task_request = ReportCompactionTaskRequest {
709                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
710                                            compact_task: Some(PbCompactTask::from(&compact_task)),
711                                            table_stats_change: to_prost_table_stats_map(table_stats),
712                                            object_timestamps,
713                                        })),
714                                    };
715
716                                    match cloned_grpc_proxy_client
717                                        .report_compaction_task(report_compaction_task_request)
718                                        .await
719                                    {
720                                        Ok(_) => {
721                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
722                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
723                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
724                                            if enable_check_compaction_result && need_check_task {
725                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
726                                                    Err(e) => {
727                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
728                                                    },
729                                                    Ok(true) => (),
730                                                    Ok(false) => {
731                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
732                                                    }
733                                                }
734                                            }
735                                        }
736                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
737                                    }
738
739                                }
740                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
741                                    unreachable!("unexpected vacuum task");
742                                }
743                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
744                                    unreachable!("unexpected scan task");
745                                }
746                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
747                                    let validation_task = ValidationTask::from(validation_task);
748                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
749                                }
750                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
751                                    match shutdown
752                                        .lock()
753                                        .unwrap()
754                                        .remove(&cancel_compact_task.task_id)
755                                    { Some(tx) => {
756                                        if tx.send(()).is_err() {
757                                            tracing::warn!(
758                                                "Cancellation of compaction task failed. task_id: {}",
759                                                cancel_compact_task.task_id
760                                            );
761                                        }
762                                    } _ => {
763                                        tracing::warn!(
764                                            "Attempting to cancel non-existent compaction task. task_id: {}",
765                                            cancel_compact_task.task_id
766                                        );
767                                    }}
768                                }
769                            }
770                    });
771                }
772                None => continue 'consume_stream,
773            }
774        }
775    });
776    (join_handle, shutdown_tx)
777}
778
779fn get_task_progress(
780    task_progress: Arc<
781        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
782    >,
783) -> Vec<CompactTaskProgress> {
784    let mut progress_list = Vec::new();
785    for (&task_id, progress) in &*task_progress.lock() {
786        progress_list.push(progress.snapshot(task_id));
787    }
788    progress_list
789}