risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use parquet::basic::Compression;
19use parquet::file::properties::WriterProperties;
20use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
21use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
22use risingwave_pb::hummock::PbCompactTask;
23use risingwave_pb::hummock::report_compaction_task_request::{
24    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
25    ReportTask as ReportSharedTask,
26};
27use risingwave_pb::iceberg_compaction::{
28    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
29    subscribe_iceberg_compaction_event_request,
30};
31use risingwave_rpc_client::GrpcCompactorProxyClient;
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc;
34use tonic::Request;
35
36pub mod compactor_runner;
37mod context;
38pub mod fast_compactor_runner;
39mod iterator;
40mod shared_buffer_compact;
41pub(super) mod task_progress;
42
43use std::collections::{HashMap, HashSet, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
53    TtlCompactionFilter,
54};
55pub use context::{
56    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
57};
58use futures::{StreamExt, pin_mut};
59pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
60use more_asserts::assert_ge;
61use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
62use risingwave_hummock_sdk::{
63    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
64};
65use risingwave_pb::hummock::compact_task::TaskStatus;
66use risingwave_pb::hummock::subscribe_compaction_event_request::{
67    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
68};
69use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
70use risingwave_pb::hummock::{
71    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
72    SubscribeCompactionEventResponse,
73};
74use risingwave_rpc_client::HummockMetaClient;
75pub use shared_buffer_compact::{compact, merge_imms_in_memory};
76use tokio::sync::oneshot::Sender;
77use tokio::task::JoinHandle;
78use tokio::time::Instant;
79
80pub use self::compaction_utils::{
81    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
82    check_flush_result,
83};
84pub use self::task_progress::TaskProgress;
85use super::multi_builder::CapacitySplitTableBuilder;
86use super::{
87    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
88};
89use crate::compaction_catalog_manager::{
90    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
91};
92use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
93use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
94use crate::hummock::iceberg_compactor_runner::{
95    IcebergCompactorRunner, IcebergCompactorRunnerConfigBuilder, RunnerContext,
96};
97use crate::hummock::iterator::{Forward, HummockIterator};
98use crate::hummock::{
99    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
100    UnifiedSstableWriterFactory, validate_ssts,
101};
102use crate::monitor::CompactorMetrics;
103
104/// Implementation of Hummock compaction.
105pub struct Compactor {
106    /// The context of the compactor.
107    context: CompactorContext,
108    object_id_getter: Arc<dyn GetObjectId>,
109    task_config: TaskConfig,
110    options: SstableBuilderOptions,
111    get_id_time: Arc<AtomicU64>,
112}
113
114pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
115
116impl Compactor {
117    /// Create a new compactor.
118    pub fn new(
119        context: CompactorContext,
120        options: SstableBuilderOptions,
121        task_config: TaskConfig,
122        object_id_getter: Arc<dyn GetObjectId>,
123    ) -> Self {
124        Self {
125            context,
126            options,
127            task_config,
128            get_id_time: Arc::new(AtomicU64::new(0)),
129            object_id_getter,
130        }
131    }
132
133    /// Compact the given key range and merge iterator.
134    /// Upon a successful return, the built SSTs are already uploaded to object store.
135    ///
136    /// `task_progress` is only used for tasks on the compactor.
137    async fn compact_key_range(
138        &self,
139        iter: impl HummockIterator<Direction = Forward>,
140        compaction_filter: impl CompactionFilter,
141        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142        task_progress: Option<Arc<TaskProgress>>,
143        task_id: Option<HummockCompactionTaskId>,
144        split_index: Option<usize>,
145    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
146        // Monitor time cost building shared buffer to SSTs.
147        let compact_timer = if self.context.is_share_buffer_compact {
148            self.context
149                .compactor_metrics
150                .write_build_l0_sst_duration
151                .start_timer()
152        } else {
153            self.context
154                .compactor_metrics
155                .compact_sst_duration
156                .start_timer()
157        };
158
159        let (split_table_outputs, table_stats_map) = {
160            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
161            if self.task_config.use_block_based_filter {
162                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
163                    factory,
164                    iter,
165                    compaction_filter,
166                    compaction_catalog_agent_ref,
167                    task_progress.clone(),
168                    self.object_id_getter.clone(),
169                )
170                .instrument_await("compact".verbose())
171                .await?
172            } else {
173                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
174                    factory,
175                    iter,
176                    compaction_filter,
177                    compaction_catalog_agent_ref,
178                    task_progress.clone(),
179                    self.object_id_getter.clone(),
180                )
181                .instrument_await("compact".verbose())
182                .await?
183            }
184        };
185
186        compact_timer.observe_duration();
187
188        Self::report_progress(
189            self.context.compactor_metrics.clone(),
190            task_progress,
191            &split_table_outputs,
192            self.context.is_share_buffer_compact,
193        );
194
195        self.context
196            .compactor_metrics
197            .get_table_id_total_time_duration
198            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
199
200        debug_assert!(
201            split_table_outputs
202                .iter()
203                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
204        );
205
206        if task_id.is_some() {
207            // skip shared buffer compaction
208            tracing::info!(
209                "Finish Task {:?} split_index {:?} sst count {}",
210                task_id,
211                split_index,
212                split_table_outputs.len()
213            );
214        }
215        Ok((split_table_outputs, table_stats_map))
216    }
217
218    pub fn report_progress(
219        metrics: Arc<CompactorMetrics>,
220        task_progress: Option<Arc<TaskProgress>>,
221        ssts: &Vec<LocalSstableInfo>,
222        is_share_buffer_compact: bool,
223    ) {
224        for sst_info in ssts {
225            let sst_size = sst_info.file_size();
226            if let Some(tracker) = &task_progress {
227                tracker.inc_ssts_uploaded();
228                tracker.dec_num_pending_write_io();
229            }
230            if is_share_buffer_compact {
231                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
232            } else {
233                metrics.compaction_upload_sst_counts.inc();
234            }
235        }
236    }
237
238    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
239        &self,
240        writer_factory: F,
241        iter: impl HummockIterator<Direction = Forward>,
242        compaction_filter: impl CompactionFilter,
243        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
244        task_progress: Option<Arc<TaskProgress>>,
245        object_id_getter: Arc<dyn GetObjectId>,
246    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
247        let builder_factory = RemoteBuilderFactory::<F, B> {
248            object_id_getter,
249            limiter: self.context.memory_limiter.clone(),
250            options: self.options.clone(),
251            policy: self.task_config.cache_policy,
252            remote_rpc_cost: self.get_id_time.clone(),
253            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
254            sstable_writer_factory: writer_factory,
255            _phantom: PhantomData,
256        };
257
258        let mut sst_builder = CapacitySplitTableBuilder::new(
259            builder_factory,
260            self.context.compactor_metrics.clone(),
261            task_progress.clone(),
262            self.task_config.table_vnode_partition.clone(),
263            self.context
264                .storage_opts
265                .compactor_concurrent_uploading_sst_count,
266            compaction_catalog_agent_ref,
267        );
268        let compaction_statistics = compact_and_build_sst(
269            &mut sst_builder,
270            &self.task_config,
271            self.context.compactor_metrics.clone(),
272            iter,
273            compaction_filter,
274        )
275        .instrument_await("compact_and_build_sst".verbose())
276        .await?;
277
278        let ssts = sst_builder
279            .finish()
280            .instrument_await("builder_finish".verbose())
281            .await?;
282
283        Ok((ssts, compaction_statistics))
284    }
285}
286
287/// The background compaction thread that receives compaction tasks from hummock compaction
288/// manager and runs compaction tasks.
289#[must_use]
290pub fn start_iceberg_compactor(
291    compactor_context: CompactorContext,
292    hummock_meta_client: Arc<dyn HummockMetaClient>,
293) -> (JoinHandle<()>, Sender<()>) {
294    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
295    let stream_retry_interval = Duration::from_secs(30);
296    let periodic_event_update_interval = Duration::from_millis(1000);
297    let worker_num = compactor_context.compaction_executor.worker_num();
298
299    let max_task_parallelism: u32 = (worker_num as f32
300        * compactor_context.storage_opts.compactor_max_task_multiplier)
301        .ceil() as u32;
302    let running_task_parallelism = Arc::new(AtomicU32::new(0));
303
304    const MAX_PULL_TASK_COUNT: u32 = 4;
305    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
306
307    assert_ge!(
308        compactor_context.storage_opts.compactor_max_task_multiplier,
309        0.0
310    );
311
312    let join_handle = tokio::spawn(async move {
313        // TODO: It's a workaround to use HashSet for filtering out duplicate tasks.
314        let iceberg_compaction_running_task_tracker =
315            Arc::new(Mutex::new((HashMap::new(), HashSet::new())));
316        let mut min_interval = tokio::time::interval(stream_retry_interval);
317        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
318
319        // This outer loop is to recreate stream.
320        'start_stream: loop {
321            // reset state
322            // pull_task_ack.store(true, Ordering::SeqCst);
323            let mut pull_task_ack = true;
324            tokio::select! {
325                // Wait for interval.
326                _ = min_interval.tick() => {},
327                // Shutdown compactor.
328                _ = &mut shutdown_rx => {
329                    tracing::info!("Compactor is shutting down");
330                    return;
331                }
332            }
333
334            let (request_sender, response_event_stream) = match hummock_meta_client
335                .subscribe_iceberg_compaction_event()
336                .await
337            {
338                Ok((request_sender, response_event_stream)) => {
339                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
340                    (request_sender, response_event_stream)
341                }
342
343                Err(e) => {
344                    tracing::warn!(
345                        error = %e.as_report(),
346                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
347                    );
348                    continue 'start_stream;
349                }
350            };
351
352            pin_mut!(response_event_stream);
353
354            let executor = compactor_context.compaction_executor.clone();
355
356            // This inner loop is to consume stream or report task progress.
357            let mut event_loop_iteration_now = Instant::now();
358            'consume_stream: loop {
359                {
360                    // report
361                    compactor_context
362                        .compactor_metrics
363                        .compaction_event_loop_iteration_latency
364                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
365                    event_loop_iteration_now = Instant::now();
366                }
367
368                let running_task_parallelism = running_task_parallelism.clone();
369                let request_sender = request_sender.clone();
370                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
371                    _ = periodic_event_interval.tick() => {
372                        let mut pending_pull_task_count = 0;
373                        if pull_task_ack {
374                            // TODO: Compute parallelism on meta side
375                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
376
377                            if pending_pull_task_count > 0 {
378                                if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
379                                    event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
380                                        subscribe_iceberg_compaction_event_request::PullTask {
381                                            pull_task_count: pending_pull_task_count,
382                                        }
383                                    )),
384                                    create_at: SystemTime::now()
385                                        .duration_since(std::time::UNIX_EPOCH)
386                                        .expect("Clock may have gone backwards")
387                                        .as_millis() as u64,
388                                }) {
389                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
390
391                                    // re subscribe stream
392                                    continue 'start_stream;
393                                } else {
394                                    pull_task_ack = false;
395                                }
396                            }
397                        }
398
399                        tracing::info!(
400                            running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
401                            pull_task_ack = %pull_task_ack,
402                            pending_pull_task_count = %pending_pull_task_count
403                        );
404
405                        continue;
406                    }
407                    event = response_event_stream.next() => {
408                        event
409                    }
410
411                    _ = &mut shutdown_rx => {
412                        tracing::info!("Iceberg Compactor is shutting down");
413                        return
414                    }
415                };
416
417                match event {
418                    Some(Ok(SubscribeIcebergCompactionEventResponse {
419                        event,
420                        create_at: _create_at,
421                    })) => {
422                        let event = match event {
423                            Some(event) => event,
424                            None => continue 'consume_stream,
425                        };
426
427                        let iceberg_running_task_tracker =
428                            iceberg_compaction_running_task_tracker.clone();
429                        match event {
430                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
431                                let task_id = iceberg_compaction_task.task_id;
432                                 let write_parquet_properties = WriterProperties::builder()
433                                        .set_created_by(concat!(
434                                            "risingwave version ",
435                                            env!("CARGO_PKG_VERSION")
436                                        )
437                                        .to_owned())
438                                        .set_max_row_group_size(
439                                            compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
440                                        )
441                                        .set_compression(Compression::SNAPPY) // TODO: make it configurable
442                                        .build();
443
444                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
445                                    .max_parallelism(worker_num as u32)
446                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
447                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
448                                    .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
449                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
450                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
451                                    .write_parquet_properties(write_parquet_properties)
452                                    .build() {
453                                    Ok(config) => config,
454                                    Err(e) => {
455                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
456                                        continue 'consume_stream;
457                                    }
458                                };
459
460
461                                let iceberg_runner = match IcebergCompactorRunner::new(
462                                    iceberg_compaction_task,
463                                    compactor_runner_config,
464                                    compactor_context.compactor_metrics.clone(),
465                                ).await {
466                                    Ok(runner) => runner,
467                                    Err(e) => {
468                                        tracing::warn!(error = %e.as_report(), "Failed to create iceberg compactor runner {}", task_id);
469                                        continue 'consume_stream;
470                                    }
471                                };
472
473                                let task_unique_ident = format!(
474                                    "{}-{:?}",
475                                    iceberg_runner.iceberg_config.catalog_name(),
476                                    iceberg_runner.table_ident
477                                );
478
479                                {
480                                    let running_task_tracker_guard = iceberg_compaction_running_task_tracker
481                                        .lock()
482                                        .unwrap();
483
484                                    if running_task_tracker_guard.1.contains(&task_unique_ident) {
485                                        tracing::warn!(
486                                            task_id = %task_id,
487                                            task_unique_ident = %task_unique_ident,
488                                            "Iceberg compaction task already running, skip",
489                                        );
490                                        continue 'consume_stream;
491                                    }
492                                }
493
494                                executor.spawn(async move {
495                                    let (tx, rx) = tokio::sync::oneshot::channel();
496                                    {
497                                        let mut running_task_tracker_guard =
498                                            iceberg_running_task_tracker.lock().unwrap();
499                                        running_task_tracker_guard.0.insert(task_id, tx);
500                                        running_task_tracker_guard.1.insert(task_unique_ident.clone());
501                                    }
502
503                                    let _release_guard = scopeguard::guard(
504                                        iceberg_running_task_tracker.clone(),
505                                        move |tracker| {
506                                            let mut running_task_tracker_guard = tracker.lock().unwrap();
507                                            running_task_tracker_guard.0.remove(&task_id);
508                                            running_task_tracker_guard.1.remove(&task_unique_ident);
509                                        },
510                                    );
511
512                                    if let Err(e) = iceberg_runner.compact(
513                                        RunnerContext::new(
514                                            max_task_parallelism,
515                                            running_task_parallelism.clone(),
516                                        ),
517                                        rx,
518                                    )
519                                    .await {
520                                        tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
521                                    }
522                                });
523                            },
524                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
525                                // set flag
526                                pull_task_ack = true;
527                            },
528                    }
529                    }
530                    Some(Err(e)) => {
531                        tracing::warn!("Failed to consume stream. {}", e.message());
532                        continue 'start_stream;
533                    }
534                    _ => {
535                        // The stream is exhausted
536                        continue 'start_stream;
537                    }
538                }
539            }
540        }
541    });
542
543    (join_handle, shutdown_tx)
544}
545
546/// The background compaction thread that receives compaction tasks from hummock compaction
547/// manager and runs compaction tasks.
548#[must_use]
549pub fn start_compactor(
550    compactor_context: CompactorContext,
551    hummock_meta_client: Arc<dyn HummockMetaClient>,
552    object_id_manager: Arc<ObjectIdManager>,
553    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
554) -> (JoinHandle<()>, Sender<()>) {
555    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
556    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
557    let stream_retry_interval = Duration::from_secs(30);
558    let task_progress = compactor_context.task_progress_manager.clone();
559    let periodic_event_update_interval = Duration::from_millis(1000);
560
561    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
562        * compactor_context.storage_opts.compactor_max_task_multiplier)
563        .ceil() as u32;
564    let running_task_parallelism = Arc::new(AtomicU32::new(0));
565
566    const MAX_PULL_TASK_COUNT: u32 = 4;
567    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
568
569    assert_ge!(
570        compactor_context.storage_opts.compactor_max_task_multiplier,
571        0.0
572    );
573
574    let join_handle = tokio::spawn(async move {
575        let shutdown_map = CompactionShutdownMap::default();
576        let mut min_interval = tokio::time::interval(stream_retry_interval);
577        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
578
579        // This outer loop is to recreate stream.
580        'start_stream: loop {
581            // reset state
582            // pull_task_ack.store(true, Ordering::SeqCst);
583            let mut pull_task_ack = true;
584            tokio::select! {
585                // Wait for interval.
586                _ = min_interval.tick() => {},
587                // Shutdown compactor.
588                _ = &mut shutdown_rx => {
589                    tracing::info!("Compactor is shutting down");
590                    return;
591                }
592            }
593
594            let (request_sender, response_event_stream) =
595                match hummock_meta_client.subscribe_compaction_event().await {
596                    Ok((request_sender, response_event_stream)) => {
597                        tracing::debug!("Succeeded subscribe_compaction_event.");
598                        (request_sender, response_event_stream)
599                    }
600
601                    Err(e) => {
602                        tracing::warn!(
603                            error = %e.as_report(),
604                            "Subscribing to compaction tasks failed with error. Will retry.",
605                        );
606                        continue 'start_stream;
607                    }
608                };
609
610            pin_mut!(response_event_stream);
611
612            let executor = compactor_context.compaction_executor.clone();
613            let object_id_manager = object_id_manager.clone();
614
615            // This inner loop is to consume stream or report task progress.
616            let mut event_loop_iteration_now = Instant::now();
617            'consume_stream: loop {
618                {
619                    // report
620                    compactor_context
621                        .compactor_metrics
622                        .compaction_event_loop_iteration_latency
623                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
624                    event_loop_iteration_now = Instant::now();
625                }
626
627                let running_task_parallelism = running_task_parallelism.clone();
628                let request_sender = request_sender.clone();
629                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
630                    _ = periodic_event_interval.tick() => {
631                        let progress_list = get_task_progress(task_progress.clone());
632
633                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
634                            event: Some(RequestEvent::HeartBeat(
635                                HeartBeat {
636                                    progress: progress_list
637                                }
638                            )),
639                            create_at: SystemTime::now()
640                                .duration_since(std::time::UNIX_EPOCH)
641                                .expect("Clock may have gone backwards")
642                                .as_millis() as u64,
643                        }) {
644                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
645                            // re subscribe stream
646                            continue 'start_stream;
647                        }
648
649
650                        let mut pending_pull_task_count = 0;
651                        if pull_task_ack {
652                            // TODO: Compute parallelism on meta side
653                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
654
655                            if pending_pull_task_count > 0 {
656                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
657                                    event: Some(RequestEvent::PullTask(
658                                        PullTask {
659                                            pull_task_count: pending_pull_task_count,
660                                        }
661                                    )),
662                                    create_at: SystemTime::now()
663                                        .duration_since(std::time::UNIX_EPOCH)
664                                        .expect("Clock may have gone backwards")
665                                        .as_millis() as u64,
666                                }) {
667                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
668
669                                    // re subscribe stream
670                                    continue 'start_stream;
671                                } else {
672                                    pull_task_ack = false;
673                                }
674                            }
675                        }
676
677                        tracing::info!(
678                            running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
679                            pull_task_ack = %pull_task_ack,
680                            pending_pull_task_count = %pending_pull_task_count
681                        );
682
683                        continue;
684                    }
685                    event = response_event_stream.next() => {
686                        event
687                    }
688
689                    _ = &mut shutdown_rx => {
690                        tracing::info!("Compactor is shutting down");
691                        return
692                    }
693                };
694
695                fn send_report_task_event(
696                    compact_task: &CompactTask,
697                    table_stats: TableStatsMap,
698                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
699                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
700                ) {
701                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
702                        event: Some(RequestEvent::ReportTask(ReportTask {
703                            task_id: compact_task.task_id,
704                            task_status: compact_task.task_status.into(),
705                            sorted_output_ssts: compact_task
706                                .sorted_output_ssts
707                                .iter()
708                                .map(|sst| sst.into())
709                                .collect(),
710                            table_stats_change: to_prost_table_stats_map(table_stats),
711                            object_timestamps: object_timestamps
712                                .into_iter()
713                                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
714                                .collect(),
715                        })),
716                        create_at: SystemTime::now()
717                            .duration_since(std::time::UNIX_EPOCH)
718                            .expect("Clock may have gone backwards")
719                            .as_millis() as u64,
720                    }) {
721                        let task_id = compact_task.task_id;
722                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
723                    }
724                }
725
726                match event {
727                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
728                        let event = match event {
729                            Some(event) => event,
730                            None => continue 'consume_stream,
731                        };
732                        let shutdown = shutdown_map.clone();
733                        let context = compactor_context.clone();
734                        let consumed_latency_ms = SystemTime::now()
735                            .duration_since(std::time::UNIX_EPOCH)
736                            .expect("Clock may have gone backwards")
737                            .as_millis() as u64
738                            - create_at;
739                        context
740                            .compactor_metrics
741                            .compaction_event_consumed_latency
742                            .observe(consumed_latency_ms as _);
743
744                        let object_id_manager = object_id_manager.clone();
745                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
746
747                        match event {
748                            ResponseEvent::CompactTask(compact_task) => {
749                                let compact_task = CompactTask::from(compact_task);
750                                let parallelism =
751                                    calculate_task_parallelism(&compact_task, &context);
752
753                                assert_ne!(parallelism, 0, "splits cannot be empty");
754
755                                if (max_task_parallelism
756                                    - running_task_parallelism.load(Ordering::SeqCst))
757                                    < parallelism as u32
758                                {
759                                    tracing::warn!(
760                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
761                                        compact_task.task_id,
762                                        parallelism,
763                                        max_task_parallelism,
764                                        running_task_parallelism.load(Ordering::Relaxed),
765                                    );
766                                    let (compact_task, table_stats, object_timestamps) =
767                                        compact_done(
768                                            compact_task,
769                                            context.clone(),
770                                            vec![],
771                                            TaskStatus::NoAvailCpuResourceCanceled,
772                                        );
773
774                                    send_report_task_event(
775                                        &compact_task,
776                                        table_stats,
777                                        object_timestamps,
778                                        &request_sender,
779                                    );
780
781                                    continue 'consume_stream;
782                                }
783
784                                running_task_parallelism
785                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
786                                executor.spawn(async move {
787                                    let (tx, rx) = tokio::sync::oneshot::channel();
788                                    let task_id = compact_task.task_id;
789                                    shutdown.lock().unwrap().insert(task_id, tx);
790
791                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
792                                        context.clone(),
793                                        compact_task,
794                                        rx,
795                                        object_id_manager.clone(),
796                                        compaction_catalog_manager_ref.clone(),
797                                    )
798                                    .await;
799
800                                    shutdown.lock().unwrap().remove(&task_id);
801                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
802
803                                    send_report_task_event(
804                                        &compact_task,
805                                        table_stats,
806                                        object_timestamps,
807                                        &request_sender,
808                                    );
809
810                                    let enable_check_compaction_result =
811                                    context.storage_opts.check_compaction_result;
812                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
813
814                                    if enable_check_compaction_result && need_check_task {
815                                        let compact_table_ids = compact_task.build_compact_table_ids();
816                                        match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
817                                            Ok(compaction_catalog_agent_ref) =>  {
818                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
819                                                {
820                                                    Err(e) => {
821                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
822                                                    }
823                                                    Ok(true) => (),
824                                                    Ok(false) => {
825                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
826                                                    }
827                                                }
828                                            },
829                                            Err(e) => {
830                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
831                                            }
832                                        }
833                                    }
834                                });
835                            }
836                            ResponseEvent::VacuumTask(_) => {
837                                unreachable!("unexpected vacuum task");
838                            }
839                            ResponseEvent::FullScanTask(_) => {
840                                unreachable!("unexpected scan task");
841                            }
842                            ResponseEvent::ValidationTask(validation_task) => {
843                                let validation_task = ValidationTask::from(validation_task);
844                                executor.spawn(async move {
845                                    validate_ssts(validation_task, context.sstable_store.clone())
846                                        .await;
847                                });
848                            }
849                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
850                                .lock()
851                                .unwrap()
852                                .remove(&cancel_compact_task.task_id)
853                            {
854                                Some(tx) => {
855                                    if tx.send(()).is_err() {
856                                        tracing::warn!(
857                                            "Cancellation of compaction task failed. task_id: {}",
858                                            cancel_compact_task.task_id
859                                        );
860                                    }
861                                }
862                                _ => {
863                                    tracing::warn!(
864                                        "Attempting to cancel non-existent compaction task. task_id: {}",
865                                        cancel_compact_task.task_id
866                                    );
867                                }
868                            },
869
870                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
871                                // set flag
872                                pull_task_ack = true;
873                            }
874                        }
875                    }
876                    Some(Err(e)) => {
877                        tracing::warn!("Failed to consume stream. {}", e.message());
878                        continue 'start_stream;
879                    }
880                    _ => {
881                        // The stream is exhausted
882                        continue 'start_stream;
883                    }
884                }
885            }
886        }
887    });
888
889    (join_handle, shutdown_tx)
890}
891
892/// The background compaction thread that receives compaction tasks from hummock compaction
893/// manager and runs compaction tasks.
894#[must_use]
895pub fn start_shared_compactor(
896    grpc_proxy_client: GrpcCompactorProxyClient,
897    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
898    context: CompactorContext,
899) -> (JoinHandle<()>, Sender<()>) {
900    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
901    let task_progress = context.task_progress_manager.clone();
902    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
903    let periodic_event_update_interval = Duration::from_millis(1000);
904
905    let join_handle = tokio::spawn(async move {
906        let shutdown_map = CompactionShutdownMap::default();
907
908        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
909        let executor = context.compaction_executor.clone();
910        let report_heartbeat_client = grpc_proxy_client.clone();
911        'consume_stream: loop {
912            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
913                _ = periodic_event_interval.tick() => {
914                    let progress_list = get_task_progress(task_progress.clone());
915                    let report_compaction_task_request = ReportCompactionTaskRequest{
916                        event: Some(ReportCompactionTaskEvent::HeartBeat(
917                            SharedHeartBeat {
918                                progress: progress_list
919                            }
920                        )),
921                     };
922                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
923                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
924                    }
925                    continue
926                }
927
928
929                _ = &mut shutdown_rx => {
930                    tracing::info!("Compactor is shutting down");
931                    return
932                }
933
934                request = receiver.recv() => {
935                    request
936                }
937
938            };
939            match request {
940                Some(request) => {
941                    let context = context.clone();
942                    let shutdown = shutdown_map.clone();
943
944                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
945                    executor.spawn(async move {
946                        let DispatchCompactionTaskRequest {
947                            tables,
948                            output_object_ids,
949                            task: dispatch_task,
950                        } = request.into_inner();
951                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
952                            acc.insert(table.id, table);
953                            acc
954                        });
955
956                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
957                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
958                        let shared_compactor_object_id_manager =
959                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
960                            match dispatch_task.unwrap() {
961                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
962                                    let compact_task = CompactTask::from(&compact_task);
963                                    let (tx, rx) = tokio::sync::oneshot::channel();
964                                    let task_id = compact_task.task_id;
965                                    shutdown.lock().unwrap().insert(task_id, tx);
966
967                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
968                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
969                                        context.clone(),
970                                        compact_task,
971                                        rx,
972                                        shared_compactor_object_id_manager,
973                                        compaction_catalog_agent_ref.clone(),
974                                    )
975                                    .await;
976                                    shutdown.lock().unwrap().remove(&task_id);
977                                    let report_compaction_task_request = ReportCompactionTaskRequest {
978                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
979                                            compact_task: Some(PbCompactTask::from(&compact_task)),
980                                            table_stats_change: to_prost_table_stats_map(table_stats),
981                                            object_timestamps: object_timestamps
982                                            .into_iter()
983                                            .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
984                                            .collect(),
985                                    })),
986                                    };
987
988                                    match cloned_grpc_proxy_client
989                                        .report_compaction_task(report_compaction_task_request)
990                                        .await
991                                    {
992                                        Ok(_) => {
993                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
994                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
995                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
996                                            if enable_check_compaction_result && need_check_task {
997                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
998                                                    Err(e) => {
999                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1000                                                    },
1001                                                    Ok(true) => (),
1002                                                    Ok(false) => {
1003                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1004                                                    }
1005                                                }
1006                                            }
1007                                        }
1008                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1009                                    }
1010
1011                                }
1012                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1013                                    unreachable!("unexpected vacuum task");
1014                                }
1015                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1016                                    unreachable!("unexpected scan task");
1017                                }
1018                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1019                                    let validation_task = ValidationTask::from(validation_task);
1020                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1021                                }
1022                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1023                                    match shutdown
1024                                        .lock()
1025                                        .unwrap()
1026                                        .remove(&cancel_compact_task.task_id)
1027                                    { Some(tx) => {
1028                                        if tx.send(()).is_err() {
1029                                            tracing::warn!(
1030                                                "Cancellation of compaction task failed. task_id: {}",
1031                                                cancel_compact_task.task_id
1032                                            );
1033                                        }
1034                                    } _ => {
1035                                        tracing::warn!(
1036                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1037                                            cancel_compact_task.task_id
1038                                        );
1039                                    }}
1040                                }
1041                            }
1042                    });
1043                }
1044                None => continue 'consume_stream,
1045            }
1046        }
1047    });
1048    (join_handle, shutdown_tx)
1049}
1050
1051fn get_task_progress(
1052    task_progress: Arc<
1053        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1054    >,
1055) -> Vec<CompactTaskProgress> {
1056    let mut progress_list = Vec::new();
1057    for (&task_id, progress) in &*task_progress.lock() {
1058        progress_list.push(progress.snapshot(task_id));
1059    }
1060    progress_list
1061}