risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use parquet::basic::Compression;
19use parquet::file::properties::WriterProperties;
20use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
21use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
22use risingwave_pb::hummock::PbCompactTask;
23use risingwave_pb::hummock::report_compaction_task_request::{
24    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
25    ReportTask as ReportSharedTask,
26};
27use risingwave_pb::iceberg_compaction::{
28    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
29    subscribe_iceberg_compaction_event_request,
30};
31use risingwave_rpc_client::GrpcCompactorProxyClient;
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc;
34use tonic::Request;
35
36pub mod compactor_runner;
37mod context;
38pub mod fast_compactor_runner;
39mod iterator;
40mod shared_buffer_compact;
41pub(super) mod task_progress;
42
43use std::collections::{HashMap, HashSet, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
53    TtlCompactionFilter,
54};
55pub use context::{
56    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
57};
58use futures::{StreamExt, pin_mut};
59pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
60use more_asserts::assert_ge;
61use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
62use risingwave_hummock_sdk::{
63    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
64};
65use risingwave_pb::hummock::compact_task::TaskStatus;
66use risingwave_pb::hummock::subscribe_compaction_event_request::{
67    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
68};
69use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
70use risingwave_pb::hummock::{
71    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
72    SubscribeCompactionEventResponse,
73};
74use risingwave_rpc_client::HummockMetaClient;
75pub use shared_buffer_compact::{compact, merge_imms_in_memory};
76use tokio::sync::oneshot::Sender;
77use tokio::task::JoinHandle;
78use tokio::time::Instant;
79
80pub use self::compaction_utils::{
81    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
82    check_flush_result,
83};
84pub use self::task_progress::TaskProgress;
85use super::multi_builder::CapacitySplitTableBuilder;
86use super::{
87    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
88};
89use crate::compaction_catalog_manager::{
90    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
91};
92use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
93use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
94use crate::hummock::iceberg_compactor_runner::{
95    IcebergCompactorRunner, IcebergCompactorRunnerConfigBuilder, RunnerContext,
96};
97use crate::hummock::iterator::{Forward, HummockIterator};
98use crate::hummock::{
99    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
100    UnifiedSstableWriterFactory, validate_ssts,
101};
102use crate::monitor::CompactorMetrics;
103
104/// Implementation of Hummock compaction.
105pub struct Compactor {
106    /// The context of the compactor.
107    context: CompactorContext,
108    object_id_getter: Arc<dyn GetObjectId>,
109    task_config: TaskConfig,
110    options: SstableBuilderOptions,
111    get_id_time: Arc<AtomicU64>,
112}
113
114pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
115
116impl Compactor {
117    /// Create a new compactor.
118    pub fn new(
119        context: CompactorContext,
120        options: SstableBuilderOptions,
121        task_config: TaskConfig,
122        object_id_getter: Arc<dyn GetObjectId>,
123    ) -> Self {
124        Self {
125            context,
126            options,
127            task_config,
128            get_id_time: Arc::new(AtomicU64::new(0)),
129            object_id_getter,
130        }
131    }
132
133    /// Compact the given key range and merge iterator.
134    /// Upon a successful return, the built SSTs are already uploaded to object store.
135    ///
136    /// `task_progress` is only used for tasks on the compactor.
137    async fn compact_key_range(
138        &self,
139        iter: impl HummockIterator<Direction = Forward>,
140        compaction_filter: impl CompactionFilter,
141        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142        task_progress: Option<Arc<TaskProgress>>,
143        task_id: Option<HummockCompactionTaskId>,
144        split_index: Option<usize>,
145    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
146        // Monitor time cost building shared buffer to SSTs.
147        let compact_timer = if self.context.is_share_buffer_compact {
148            self.context
149                .compactor_metrics
150                .write_build_l0_sst_duration
151                .start_timer()
152        } else {
153            self.context
154                .compactor_metrics
155                .compact_sst_duration
156                .start_timer()
157        };
158
159        let (split_table_outputs, table_stats_map) = {
160            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
161            if self.task_config.use_block_based_filter {
162                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
163                    factory,
164                    iter,
165                    compaction_filter,
166                    compaction_catalog_agent_ref,
167                    task_progress.clone(),
168                    self.object_id_getter.clone(),
169                )
170                .instrument_await("compact".verbose())
171                .await?
172            } else {
173                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
174                    factory,
175                    iter,
176                    compaction_filter,
177                    compaction_catalog_agent_ref,
178                    task_progress.clone(),
179                    self.object_id_getter.clone(),
180                )
181                .instrument_await("compact".verbose())
182                .await?
183            }
184        };
185
186        compact_timer.observe_duration();
187
188        Self::report_progress(
189            self.context.compactor_metrics.clone(),
190            task_progress,
191            &split_table_outputs,
192            self.context.is_share_buffer_compact,
193        );
194
195        self.context
196            .compactor_metrics
197            .get_table_id_total_time_duration
198            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
199
200        debug_assert!(
201            split_table_outputs
202                .iter()
203                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
204        );
205
206        if task_id.is_some() {
207            // skip shared buffer compaction
208            tracing::info!(
209                "Finish Task {:?} split_index {:?} sst count {}",
210                task_id,
211                split_index,
212                split_table_outputs.len()
213            );
214        }
215        Ok((split_table_outputs, table_stats_map))
216    }
217
218    pub fn report_progress(
219        metrics: Arc<CompactorMetrics>,
220        task_progress: Option<Arc<TaskProgress>>,
221        ssts: &Vec<LocalSstableInfo>,
222        is_share_buffer_compact: bool,
223    ) {
224        for sst_info in ssts {
225            let sst_size = sst_info.file_size();
226            if let Some(tracker) = &task_progress {
227                tracker.inc_ssts_uploaded();
228                tracker.dec_num_pending_write_io();
229            }
230            if is_share_buffer_compact {
231                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
232            } else {
233                metrics.compaction_upload_sst_counts.inc();
234            }
235        }
236    }
237
238    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
239        &self,
240        writer_factory: F,
241        iter: impl HummockIterator<Direction = Forward>,
242        compaction_filter: impl CompactionFilter,
243        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
244        task_progress: Option<Arc<TaskProgress>>,
245        object_id_getter: Arc<dyn GetObjectId>,
246    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
247        let builder_factory = RemoteBuilderFactory::<F, B> {
248            object_id_getter,
249            limiter: self.context.memory_limiter.clone(),
250            options: self.options.clone(),
251            policy: self.task_config.cache_policy,
252            remote_rpc_cost: self.get_id_time.clone(),
253            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
254            sstable_writer_factory: writer_factory,
255            _phantom: PhantomData,
256        };
257
258        let mut sst_builder = CapacitySplitTableBuilder::new(
259            builder_factory,
260            self.context.compactor_metrics.clone(),
261            task_progress.clone(),
262            self.task_config.table_vnode_partition.clone(),
263            self.context
264                .storage_opts
265                .compactor_concurrent_uploading_sst_count,
266            compaction_catalog_agent_ref,
267        );
268        let compaction_statistics = compact_and_build_sst(
269            &mut sst_builder,
270            &self.task_config,
271            self.context.compactor_metrics.clone(),
272            iter,
273            compaction_filter,
274        )
275        .instrument_await("compact_and_build_sst".verbose())
276        .await?;
277
278        let ssts = sst_builder
279            .finish()
280            .instrument_await("builder_finish".verbose())
281            .await?;
282
283        Ok((ssts, compaction_statistics))
284    }
285}
286
287/// The background compaction thread that receives compaction tasks from hummock compaction
288/// manager and runs compaction tasks.
289#[must_use]
290pub fn start_iceberg_compactor(
291    compactor_context: CompactorContext,
292    hummock_meta_client: Arc<dyn HummockMetaClient>,
293) -> (JoinHandle<()>, Sender<()>) {
294    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
295    let stream_retry_interval = Duration::from_secs(30);
296    let periodic_event_update_interval = Duration::from_millis(1000);
297    let worker_num = compactor_context.compaction_executor.worker_num();
298
299    let max_task_parallelism: u32 = (worker_num as f32
300        * compactor_context.storage_opts.compactor_max_task_multiplier)
301        .ceil() as u32;
302    let running_task_parallelism = Arc::new(AtomicU32::new(0));
303
304    const MAX_PULL_TASK_COUNT: u32 = 4;
305    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
306
307    assert_ge!(
308        compactor_context.storage_opts.compactor_max_task_multiplier,
309        0.0
310    );
311
312    let join_handle = tokio::spawn(async move {
313        // TODO: It's a workaround to use HashSet for filtering out duplicate tasks.
314        let iceberg_compaction_running_task_tracker =
315            Arc::new(Mutex::new((HashMap::new(), HashSet::new())));
316        let mut min_interval = tokio::time::interval(stream_retry_interval);
317        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
318
319        // This outer loop is to recreate stream.
320        'start_stream: loop {
321            // reset state
322            // pull_task_ack.store(true, Ordering::SeqCst);
323            let mut pull_task_ack = true;
324            tokio::select! {
325                // Wait for interval.
326                _ = min_interval.tick() => {},
327                // Shutdown compactor.
328                _ = &mut shutdown_rx => {
329                    tracing::info!("Compactor is shutting down");
330                    return;
331                }
332            }
333
334            let (request_sender, response_event_stream) = match hummock_meta_client
335                .subscribe_iceberg_compaction_event()
336                .await
337            {
338                Ok((request_sender, response_event_stream)) => {
339                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
340                    (request_sender, response_event_stream)
341                }
342
343                Err(e) => {
344                    tracing::warn!(
345                        error = %e.as_report(),
346                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
347                    );
348                    continue 'start_stream;
349                }
350            };
351
352            pin_mut!(response_event_stream);
353
354            let executor = compactor_context.compaction_executor.clone();
355
356            // This inner loop is to consume stream or report task progress.
357            let mut event_loop_iteration_now = Instant::now();
358            'consume_stream: loop {
359                {
360                    // report
361                    compactor_context
362                        .compactor_metrics
363                        .compaction_event_loop_iteration_latency
364                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
365                    event_loop_iteration_now = Instant::now();
366                }
367
368                let running_task_parallelism = running_task_parallelism.clone();
369                let request_sender = request_sender.clone();
370                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
371                    _ = periodic_event_interval.tick() => {
372                        let mut pending_pull_task_count = 0;
373                        if pull_task_ack {
374                            // TODO: Compute parallelism on meta side
375                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
376
377                            if pending_pull_task_count > 0 {
378                                if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
379                                    event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
380                                        subscribe_iceberg_compaction_event_request::PullTask {
381                                            pull_task_count: pending_pull_task_count,
382                                        }
383                                    )),
384                                    create_at: SystemTime::now()
385                                        .duration_since(std::time::UNIX_EPOCH)
386                                        .expect("Clock may have gone backwards")
387                                        .as_millis() as u64,
388                                }) {
389                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
390
391                                    // re subscribe stream
392                                    continue 'start_stream;
393                                } else {
394                                    pull_task_ack = false;
395                                }
396                            }
397                        }
398
399                        tracing::info!(
400                            running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
401                            pull_task_ack = %pull_task_ack,
402                            pending_pull_task_count = %pending_pull_task_count
403                        );
404
405                        continue;
406                    }
407                    event = response_event_stream.next() => {
408                        event
409                    }
410
411                    _ = &mut shutdown_rx => {
412                        tracing::info!("Iceberg Compactor is shutting down");
413                        return
414                    }
415                };
416
417                match event {
418                    Some(Ok(SubscribeIcebergCompactionEventResponse {
419                        event,
420                        create_at: _create_at,
421                    })) => {
422                        let event = match event {
423                            Some(event) => event,
424                            None => continue 'consume_stream,
425                        };
426
427                        let iceberg_running_task_tracker =
428                            iceberg_compaction_running_task_tracker.clone();
429                        match event {
430                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
431                                let task_id = iceberg_compaction_task.task_id;
432                                 let write_parquet_properties = WriterProperties::builder()
433                                        .set_created_by(concat!(
434                                            "risingwave version ",
435                                            env!("CARGO_PKG_VERSION")
436                                        )
437                                        .to_owned())
438                                        .set_max_row_group_size(
439                                            compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
440                                        )
441                                        .set_compression(Compression::SNAPPY) // TODO: make it configurable
442                                        .build();
443
444                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
445                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
446                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
447                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
448                                    .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
449                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
450                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
451                                    .write_parquet_properties(write_parquet_properties)
452                                    .small_file_threshold(compactor_context.storage_opts.iceberg_compaction_small_file_threshold_mb as u64 * 1024 * 1024)
453                                    .max_task_total_size(
454                                        compactor_context.storage_opts.iceberg_compaction_max_task_total_size_mb as u64 * 1024 * 1024,
455                                    )
456                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
457                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
458                                    .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
459                                    .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
460                                    .build() {
461                                    Ok(config) => config,
462                                    Err(e) => {
463                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
464                                        continue 'consume_stream;
465                                    }
466                                };
467
468
469                                let iceberg_runner = match IcebergCompactorRunner::new(
470                                    iceberg_compaction_task,
471                                    compactor_runner_config,
472                                    compactor_context.compactor_metrics.clone(),
473                                ).await {
474                                    Ok(runner) => runner,
475                                    Err(e) => {
476                                        tracing::warn!(error = %e.as_report(), "Failed to create iceberg compactor runner {}", task_id);
477                                        continue 'consume_stream;
478                                    }
479                                };
480
481                                let task_unique_ident = format!(
482                                    "{}-{:?}",
483                                    iceberg_runner.iceberg_config.catalog_name(),
484                                    iceberg_runner.table_ident
485                                );
486
487                                {
488                                    let running_task_tracker_guard = iceberg_compaction_running_task_tracker
489                                        .lock()
490                                        .unwrap();
491
492                                    if running_task_tracker_guard.1.contains(&task_unique_ident) {
493                                        tracing::warn!(
494                                            task_id = %task_id,
495                                            task_unique_ident = %task_unique_ident,
496                                            "Iceberg compaction task already running, skip",
497                                        );
498                                        continue 'consume_stream;
499                                    }
500                                }
501
502                                executor.spawn(async move {
503                                    let (tx, rx) = tokio::sync::oneshot::channel();
504                                    {
505                                        let mut running_task_tracker_guard =
506                                            iceberg_running_task_tracker.lock().unwrap();
507                                        running_task_tracker_guard.0.insert(task_id, tx);
508                                        running_task_tracker_guard.1.insert(task_unique_ident.clone());
509                                    }
510
511                                    let _release_guard = scopeguard::guard(
512                                        iceberg_running_task_tracker.clone(),
513                                        move |tracker| {
514                                            let mut running_task_tracker_guard = tracker.lock().unwrap();
515                                            running_task_tracker_guard.0.remove(&task_id);
516                                            running_task_tracker_guard.1.remove(&task_unique_ident);
517                                        },
518                                    );
519
520                                    if let Err(e) = iceberg_runner.compact(
521                                        RunnerContext::new(
522                                            max_task_parallelism,
523                                            running_task_parallelism.clone(),
524                                        ),
525                                        rx,
526                                    )
527                                    .await {
528                                        tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
529                                    }
530                                });
531                            },
532                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
533                                // set flag
534                                pull_task_ack = true;
535                            },
536                    }
537                    }
538                    Some(Err(e)) => {
539                        tracing::warn!("Failed to consume stream. {}", e.message());
540                        continue 'start_stream;
541                    }
542                    _ => {
543                        // The stream is exhausted
544                        continue 'start_stream;
545                    }
546                }
547            }
548        }
549    });
550
551    (join_handle, shutdown_tx)
552}
553
554/// The background compaction thread that receives compaction tasks from hummock compaction
555/// manager and runs compaction tasks.
556#[must_use]
557pub fn start_compactor(
558    compactor_context: CompactorContext,
559    hummock_meta_client: Arc<dyn HummockMetaClient>,
560    object_id_manager: Arc<ObjectIdManager>,
561    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
562) -> (JoinHandle<()>, Sender<()>) {
563    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
564    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
565    let stream_retry_interval = Duration::from_secs(30);
566    let task_progress = compactor_context.task_progress_manager.clone();
567    let periodic_event_update_interval = Duration::from_millis(1000);
568
569    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
570        * compactor_context.storage_opts.compactor_max_task_multiplier)
571        .ceil() as u32;
572    let running_task_parallelism = Arc::new(AtomicU32::new(0));
573
574    const MAX_PULL_TASK_COUNT: u32 = 4;
575    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
576
577    assert_ge!(
578        compactor_context.storage_opts.compactor_max_task_multiplier,
579        0.0
580    );
581
582    let join_handle = tokio::spawn(async move {
583        let shutdown_map = CompactionShutdownMap::default();
584        let mut min_interval = tokio::time::interval(stream_retry_interval);
585        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
586
587        // This outer loop is to recreate stream.
588        'start_stream: loop {
589            // reset state
590            // pull_task_ack.store(true, Ordering::SeqCst);
591            let mut pull_task_ack = true;
592            tokio::select! {
593                // Wait for interval.
594                _ = min_interval.tick() => {},
595                // Shutdown compactor.
596                _ = &mut shutdown_rx => {
597                    tracing::info!("Compactor is shutting down");
598                    return;
599                }
600            }
601
602            let (request_sender, response_event_stream) =
603                match hummock_meta_client.subscribe_compaction_event().await {
604                    Ok((request_sender, response_event_stream)) => {
605                        tracing::debug!("Succeeded subscribe_compaction_event.");
606                        (request_sender, response_event_stream)
607                    }
608
609                    Err(e) => {
610                        tracing::warn!(
611                            error = %e.as_report(),
612                            "Subscribing to compaction tasks failed with error. Will retry.",
613                        );
614                        continue 'start_stream;
615                    }
616                };
617
618            pin_mut!(response_event_stream);
619
620            let executor = compactor_context.compaction_executor.clone();
621            let object_id_manager = object_id_manager.clone();
622
623            // This inner loop is to consume stream or report task progress.
624            let mut event_loop_iteration_now = Instant::now();
625            'consume_stream: loop {
626                {
627                    // report
628                    compactor_context
629                        .compactor_metrics
630                        .compaction_event_loop_iteration_latency
631                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
632                    event_loop_iteration_now = Instant::now();
633                }
634
635                let running_task_parallelism = running_task_parallelism.clone();
636                let request_sender = request_sender.clone();
637                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
638                    _ = periodic_event_interval.tick() => {
639                        let progress_list = get_task_progress(task_progress.clone());
640
641                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
642                            event: Some(RequestEvent::HeartBeat(
643                                HeartBeat {
644                                    progress: progress_list
645                                }
646                            )),
647                            create_at: SystemTime::now()
648                                .duration_since(std::time::UNIX_EPOCH)
649                                .expect("Clock may have gone backwards")
650                                .as_millis() as u64,
651                        }) {
652                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
653                            // re subscribe stream
654                            continue 'start_stream;
655                        }
656
657
658                        let mut pending_pull_task_count = 0;
659                        if pull_task_ack {
660                            // TODO: Compute parallelism on meta side
661                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
662
663                            if pending_pull_task_count > 0 {
664                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
665                                    event: Some(RequestEvent::PullTask(
666                                        PullTask {
667                                            pull_task_count: pending_pull_task_count,
668                                        }
669                                    )),
670                                    create_at: SystemTime::now()
671                                        .duration_since(std::time::UNIX_EPOCH)
672                                        .expect("Clock may have gone backwards")
673                                        .as_millis() as u64,
674                                }) {
675                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
676
677                                    // re subscribe stream
678                                    continue 'start_stream;
679                                } else {
680                                    pull_task_ack = false;
681                                }
682                            }
683                        }
684
685                        tracing::info!(
686                            running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
687                            pull_task_ack = %pull_task_ack,
688                            pending_pull_task_count = %pending_pull_task_count
689                        );
690
691                        continue;
692                    }
693                    event = response_event_stream.next() => {
694                        event
695                    }
696
697                    _ = &mut shutdown_rx => {
698                        tracing::info!("Compactor is shutting down");
699                        return
700                    }
701                };
702
703                fn send_report_task_event(
704                    compact_task: &CompactTask,
705                    table_stats: TableStatsMap,
706                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
707                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
708                ) {
709                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
710                        event: Some(RequestEvent::ReportTask(ReportTask {
711                            task_id: compact_task.task_id,
712                            task_status: compact_task.task_status.into(),
713                            sorted_output_ssts: compact_task
714                                .sorted_output_ssts
715                                .iter()
716                                .map(|sst| sst.into())
717                                .collect(),
718                            table_stats_change: to_prost_table_stats_map(table_stats),
719                            object_timestamps: object_timestamps
720                                .into_iter()
721                                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
722                                .collect(),
723                        })),
724                        create_at: SystemTime::now()
725                            .duration_since(std::time::UNIX_EPOCH)
726                            .expect("Clock may have gone backwards")
727                            .as_millis() as u64,
728                    }) {
729                        let task_id = compact_task.task_id;
730                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
731                    }
732                }
733
734                match event {
735                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
736                        let event = match event {
737                            Some(event) => event,
738                            None => continue 'consume_stream,
739                        };
740                        let shutdown = shutdown_map.clone();
741                        let context = compactor_context.clone();
742                        let consumed_latency_ms = SystemTime::now()
743                            .duration_since(std::time::UNIX_EPOCH)
744                            .expect("Clock may have gone backwards")
745                            .as_millis() as u64
746                            - create_at;
747                        context
748                            .compactor_metrics
749                            .compaction_event_consumed_latency
750                            .observe(consumed_latency_ms as _);
751
752                        let object_id_manager = object_id_manager.clone();
753                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
754
755                        match event {
756                            ResponseEvent::CompactTask(compact_task) => {
757                                let compact_task = CompactTask::from(compact_task);
758                                let parallelism =
759                                    calculate_task_parallelism(&compact_task, &context);
760
761                                assert_ne!(parallelism, 0, "splits cannot be empty");
762
763                                if (max_task_parallelism
764                                    - running_task_parallelism.load(Ordering::SeqCst))
765                                    < parallelism as u32
766                                {
767                                    tracing::warn!(
768                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
769                                        compact_task.task_id,
770                                        parallelism,
771                                        max_task_parallelism,
772                                        running_task_parallelism.load(Ordering::Relaxed),
773                                    );
774                                    let (compact_task, table_stats, object_timestamps) =
775                                        compact_done(
776                                            compact_task,
777                                            context.clone(),
778                                            vec![],
779                                            TaskStatus::NoAvailCpuResourceCanceled,
780                                        );
781
782                                    send_report_task_event(
783                                        &compact_task,
784                                        table_stats,
785                                        object_timestamps,
786                                        &request_sender,
787                                    );
788
789                                    continue 'consume_stream;
790                                }
791
792                                running_task_parallelism
793                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
794                                executor.spawn(async move {
795                                    let (tx, rx) = tokio::sync::oneshot::channel();
796                                    let task_id = compact_task.task_id;
797                                    shutdown.lock().unwrap().insert(task_id, tx);
798
799                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
800                                        context.clone(),
801                                        compact_task,
802                                        rx,
803                                        object_id_manager.clone(),
804                                        compaction_catalog_manager_ref.clone(),
805                                    )
806                                    .await;
807
808                                    shutdown.lock().unwrap().remove(&task_id);
809                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
810
811                                    send_report_task_event(
812                                        &compact_task,
813                                        table_stats,
814                                        object_timestamps,
815                                        &request_sender,
816                                    );
817
818                                    let enable_check_compaction_result =
819                                    context.storage_opts.check_compaction_result;
820                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
821
822                                    if enable_check_compaction_result && need_check_task {
823                                        let compact_table_ids = compact_task.build_compact_table_ids();
824                                        match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
825                                            Ok(compaction_catalog_agent_ref) =>  {
826                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
827                                                {
828                                                    Err(e) => {
829                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
830                                                    }
831                                                    Ok(true) => (),
832                                                    Ok(false) => {
833                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
834                                                    }
835                                                }
836                                            },
837                                            Err(e) => {
838                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
839                                            }
840                                        }
841                                    }
842                                });
843                            }
844                            ResponseEvent::VacuumTask(_) => {
845                                unreachable!("unexpected vacuum task");
846                            }
847                            ResponseEvent::FullScanTask(_) => {
848                                unreachable!("unexpected scan task");
849                            }
850                            ResponseEvent::ValidationTask(validation_task) => {
851                                let validation_task = ValidationTask::from(validation_task);
852                                executor.spawn(async move {
853                                    validate_ssts(validation_task, context.sstable_store.clone())
854                                        .await;
855                                });
856                            }
857                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
858                                .lock()
859                                .unwrap()
860                                .remove(&cancel_compact_task.task_id)
861                            {
862                                Some(tx) => {
863                                    if tx.send(()).is_err() {
864                                        tracing::warn!(
865                                            "Cancellation of compaction task failed. task_id: {}",
866                                            cancel_compact_task.task_id
867                                        );
868                                    }
869                                }
870                                _ => {
871                                    tracing::warn!(
872                                        "Attempting to cancel non-existent compaction task. task_id: {}",
873                                        cancel_compact_task.task_id
874                                    );
875                                }
876                            },
877
878                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
879                                // set flag
880                                pull_task_ack = true;
881                            }
882                        }
883                    }
884                    Some(Err(e)) => {
885                        tracing::warn!("Failed to consume stream. {}", e.message());
886                        continue 'start_stream;
887                    }
888                    _ => {
889                        // The stream is exhausted
890                        continue 'start_stream;
891                    }
892                }
893            }
894        }
895    });
896
897    (join_handle, shutdown_tx)
898}
899
900/// The background compaction thread that receives compaction tasks from hummock compaction
901/// manager and runs compaction tasks.
902#[must_use]
903pub fn start_shared_compactor(
904    grpc_proxy_client: GrpcCompactorProxyClient,
905    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
906    context: CompactorContext,
907) -> (JoinHandle<()>, Sender<()>) {
908    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
909    let task_progress = context.task_progress_manager.clone();
910    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
911    let periodic_event_update_interval = Duration::from_millis(1000);
912
913    let join_handle = tokio::spawn(async move {
914        let shutdown_map = CompactionShutdownMap::default();
915
916        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
917        let executor = context.compaction_executor.clone();
918        let report_heartbeat_client = grpc_proxy_client.clone();
919        'consume_stream: loop {
920            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
921                _ = periodic_event_interval.tick() => {
922                    let progress_list = get_task_progress(task_progress.clone());
923                    let report_compaction_task_request = ReportCompactionTaskRequest{
924                        event: Some(ReportCompactionTaskEvent::HeartBeat(
925                            SharedHeartBeat {
926                                progress: progress_list
927                            }
928                        )),
929                     };
930                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
931                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
932                    }
933                    continue
934                }
935
936
937                _ = &mut shutdown_rx => {
938                    tracing::info!("Compactor is shutting down");
939                    return
940                }
941
942                request = receiver.recv() => {
943                    request
944                }
945
946            };
947            match request {
948                Some(request) => {
949                    let context = context.clone();
950                    let shutdown = shutdown_map.clone();
951
952                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
953                    executor.spawn(async move {
954                        let DispatchCompactionTaskRequest {
955                            tables,
956                            output_object_ids,
957                            task: dispatch_task,
958                        } = request.into_inner();
959                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
960                            acc.insert(table.id, table);
961                            acc
962                        });
963
964                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
965                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
966                        let shared_compactor_object_id_manager =
967                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
968                            match dispatch_task.unwrap() {
969                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
970                                    let compact_task = CompactTask::from(&compact_task);
971                                    let (tx, rx) = tokio::sync::oneshot::channel();
972                                    let task_id = compact_task.task_id;
973                                    shutdown.lock().unwrap().insert(task_id, tx);
974
975                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
976                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
977                                        context.clone(),
978                                        compact_task,
979                                        rx,
980                                        shared_compactor_object_id_manager,
981                                        compaction_catalog_agent_ref.clone(),
982                                    )
983                                    .await;
984                                    shutdown.lock().unwrap().remove(&task_id);
985                                    let report_compaction_task_request = ReportCompactionTaskRequest {
986                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
987                                            compact_task: Some(PbCompactTask::from(&compact_task)),
988                                            table_stats_change: to_prost_table_stats_map(table_stats),
989                                            object_timestamps: object_timestamps
990                                            .into_iter()
991                                            .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
992                                            .collect(),
993                                    })),
994                                    };
995
996                                    match cloned_grpc_proxy_client
997                                        .report_compaction_task(report_compaction_task_request)
998                                        .await
999                                    {
1000                                        Ok(_) => {
1001                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1002                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1003                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1004                                            if enable_check_compaction_result && need_check_task {
1005                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1006                                                    Err(e) => {
1007                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1008                                                    },
1009                                                    Ok(true) => (),
1010                                                    Ok(false) => {
1011                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1012                                                    }
1013                                                }
1014                                            }
1015                                        }
1016                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1017                                    }
1018
1019                                }
1020                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1021                                    unreachable!("unexpected vacuum task");
1022                                }
1023                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1024                                    unreachable!("unexpected scan task");
1025                                }
1026                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1027                                    let validation_task = ValidationTask::from(validation_task);
1028                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1029                                }
1030                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1031                                    match shutdown
1032                                        .lock()
1033                                        .unwrap()
1034                                        .remove(&cancel_compact_task.task_id)
1035                                    { Some(tx) => {
1036                                        if tx.send(()).is_err() {
1037                                            tracing::warn!(
1038                                                "Cancellation of compaction task failed. task_id: {}",
1039                                                cancel_compact_task.task_id
1040                                            );
1041                                        }
1042                                    } _ => {
1043                                        tracing::warn!(
1044                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1045                                            cancel_compact_task.task_id
1046                                        );
1047                                    }}
1048                                }
1049                            }
1050                    });
1051                }
1052                None => continue 'consume_stream,
1053            }
1054        }
1055    });
1056    (join_handle, shutdown_tx)
1057}
1058
1059fn get_task_progress(
1060    task_progress: Arc<
1061        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1062    >,
1063) -> Vec<CompactTaskProgress> {
1064    let mut progress_list = Vec::new();
1065    for (&task_id, progress) in &*task_progress.lock() {
1066        progress_list.push(progress.snapshot(task_id));
1067    }
1068    progress_list
1069}