risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
22use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
23use risingwave_pb::hummock::PbCompactTask;
24use risingwave_pb::hummock::report_compaction_task_request::{
25    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
26    ReportTask as ReportSharedTask,
27};
28use risingwave_pb::iceberg_compaction::{
29    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
30    subscribe_iceberg_compaction_event_request,
31};
32use risingwave_rpc_client::GrpcCompactorProxyClient;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc;
35use tonic::Request;
36
37pub mod compactor_runner;
38mod context;
39pub mod fast_compactor_runner;
40mod iterator;
41mod shared_buffer_compact;
42pub(super) mod task_progress;
43
44use std::collections::{HashMap, VecDeque};
45use std::marker::PhantomData;
46use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::{Duration, SystemTime};
49
50use await_tree::{InstrumentAwait, SpanExt};
51pub use compaction_executor::CompactionExecutor;
52pub use compaction_filter::{
53    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
54    TtlCompactionFilter,
55};
56pub use context::{
57    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
58};
59use futures::{StreamExt, pin_mut};
60// Import iceberg compactor runner types from the local `iceberg_compaction` module.
61use iceberg_compaction::iceberg_compactor_runner::{
62    IcebergCompactorRunner, IcebergCompactorRunnerConfigBuilder,
63};
64use iceberg_compaction::{IcebergTaskQueue, PushResult};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78    SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88    check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::iterator::{Forward, HummockIterator};
101use crate::hummock::{
102    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
103    UnifiedSstableWriterFactory, validate_ssts,
104};
105use crate::monitor::CompactorMetrics;
106
107/// Implementation of Hummock compaction.
108pub struct Compactor {
109    /// The context of the compactor.
110    context: CompactorContext,
111    object_id_getter: Arc<dyn GetObjectId>,
112    task_config: TaskConfig,
113    options: SstableBuilderOptions,
114    get_id_time: Arc<AtomicU64>,
115}
116
117pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
118
119impl Compactor {
120    /// Create a new compactor.
121    pub fn new(
122        context: CompactorContext,
123        options: SstableBuilderOptions,
124        task_config: TaskConfig,
125        object_id_getter: Arc<dyn GetObjectId>,
126    ) -> Self {
127        Self {
128            context,
129            options,
130            task_config,
131            get_id_time: Arc::new(AtomicU64::new(0)),
132            object_id_getter,
133        }
134    }
135
136    /// Compact the given key range and merge iterator.
137    /// Upon a successful return, the built SSTs are already uploaded to object store.
138    ///
139    /// `task_progress` is only used for tasks on the compactor.
140    async fn compact_key_range(
141        &self,
142        iter: impl HummockIterator<Direction = Forward>,
143        compaction_filter: impl CompactionFilter,
144        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
145        task_progress: Option<Arc<TaskProgress>>,
146        task_id: Option<HummockCompactionTaskId>,
147        split_index: Option<usize>,
148    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
149        // Monitor time cost building shared buffer to SSTs.
150        let compact_timer = if self.context.is_share_buffer_compact {
151            self.context
152                .compactor_metrics
153                .write_build_l0_sst_duration
154                .start_timer()
155        } else {
156            self.context
157                .compactor_metrics
158                .compact_sst_duration
159                .start_timer()
160        };
161
162        let (split_table_outputs, table_stats_map) = {
163            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
164            if self.task_config.use_block_based_filter {
165                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
166                    factory,
167                    iter,
168                    compaction_filter,
169                    compaction_catalog_agent_ref,
170                    task_progress.clone(),
171                    self.object_id_getter.clone(),
172                )
173                .instrument_await("compact".verbose())
174                .await?
175            } else {
176                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
177                    factory,
178                    iter,
179                    compaction_filter,
180                    compaction_catalog_agent_ref,
181                    task_progress.clone(),
182                    self.object_id_getter.clone(),
183                )
184                .instrument_await("compact".verbose())
185                .await?
186            }
187        };
188
189        compact_timer.observe_duration();
190
191        Self::report_progress(
192            self.context.compactor_metrics.clone(),
193            task_progress,
194            &split_table_outputs,
195            self.context.is_share_buffer_compact,
196        );
197
198        self.context
199            .compactor_metrics
200            .get_table_id_total_time_duration
201            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
202
203        debug_assert!(
204            split_table_outputs
205                .iter()
206                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
207        );
208
209        if task_id.is_some() {
210            // skip shared buffer compaction
211            tracing::info!(
212                "Finish Task {:?} split_index {:?} sst count {}",
213                task_id,
214                split_index,
215                split_table_outputs.len()
216            );
217        }
218        Ok((split_table_outputs, table_stats_map))
219    }
220
221    pub fn report_progress(
222        metrics: Arc<CompactorMetrics>,
223        task_progress: Option<Arc<TaskProgress>>,
224        ssts: &Vec<LocalSstableInfo>,
225        is_share_buffer_compact: bool,
226    ) {
227        for sst_info in ssts {
228            let sst_size = sst_info.file_size();
229            if let Some(tracker) = &task_progress {
230                tracker.inc_ssts_uploaded();
231                tracker.dec_num_pending_write_io();
232            }
233            if is_share_buffer_compact {
234                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
235            } else {
236                metrics.compaction_upload_sst_counts.inc();
237            }
238        }
239    }
240
241    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
242        &self,
243        writer_factory: F,
244        iter: impl HummockIterator<Direction = Forward>,
245        compaction_filter: impl CompactionFilter,
246        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
247        task_progress: Option<Arc<TaskProgress>>,
248        object_id_getter: Arc<dyn GetObjectId>,
249    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
250        let builder_factory = RemoteBuilderFactory::<F, B> {
251            object_id_getter,
252            limiter: self.context.memory_limiter.clone(),
253            options: self.options.clone(),
254            policy: self.task_config.cache_policy,
255            remote_rpc_cost: self.get_id_time.clone(),
256            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
257            sstable_writer_factory: writer_factory,
258            _phantom: PhantomData,
259        };
260
261        let mut sst_builder = CapacitySplitTableBuilder::new(
262            builder_factory,
263            self.context.compactor_metrics.clone(),
264            task_progress.clone(),
265            self.task_config.table_vnode_partition.clone(),
266            self.context
267                .storage_opts
268                .compactor_concurrent_uploading_sst_count,
269            compaction_catalog_agent_ref,
270        );
271        let compaction_statistics = compact_and_build_sst(
272            &mut sst_builder,
273            &self.task_config,
274            self.context.compactor_metrics.clone(),
275            iter,
276            compaction_filter,
277        )
278        .instrument_await("compact_and_build_sst".verbose())
279        .await?;
280
281        let ssts = sst_builder
282            .finish()
283            .instrument_await("builder_finish".verbose())
284            .await?;
285
286        Ok((ssts, compaction_statistics))
287    }
288}
289
290/// The background compaction thread that receives compaction tasks from hummock compaction
291/// manager and runs compaction tasks.
292#[must_use]
293pub fn start_iceberg_compactor(
294    compactor_context: CompactorContext,
295    hummock_meta_client: Arc<dyn HummockMetaClient>,
296) -> (JoinHandle<()>, Sender<()>) {
297    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
298    let stream_retry_interval = Duration::from_secs(30);
299    let periodic_event_update_interval = Duration::from_millis(1000);
300    let worker_num = compactor_context.compaction_executor.worker_num();
301
302    let max_task_parallelism: u32 = (worker_num as f32
303        * compactor_context.storage_opts.compactor_max_task_multiplier)
304        .ceil() as u32;
305
306    const MAX_PULL_TASK_COUNT: u32 = 4;
307    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
308
309    assert_ge!(
310        compactor_context.storage_opts.compactor_max_task_multiplier,
311        0.0
312    );
313
314    let join_handle = tokio::spawn(async move {
315        // Initialize task queue with event-driven scheduling using Notify
316        let pending_parallelism_budget = (max_task_parallelism as f32
317            * compactor_context
318                .storage_opts
319                .iceberg_compaction_pending_parallelism_budget_multiplier)
320            .ceil() as u32;
321        let (mut task_queue, _schedule_notify) =
322            IcebergTaskQueue::new_with_notify(max_task_parallelism, pending_parallelism_budget);
323
324        // Shutdown tracking for running tasks (task_id -> shutdown_sender)
325        let shutdown_map = Arc::new(Mutex::new(
326            HashMap::<u64, tokio::sync::oneshot::Sender<()>>::new(),
327        ));
328
329        // Channel for task completion notifications
330        let (task_completion_tx, mut task_completion_rx) =
331            tokio::sync::mpsc::unbounded_channel::<u64>();
332
333        let mut min_interval = tokio::time::interval(stream_retry_interval);
334        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
335
336        // This outer loop is to recreate stream.
337        'start_stream: loop {
338            // reset state
339            // pull_task_ack.store(true, Ordering::SeqCst);
340            let mut pull_task_ack = true;
341            tokio::select! {
342                // Wait for interval.
343                _ = min_interval.tick() => {},
344                // Shutdown compactor.
345                _ = &mut shutdown_rx => {
346                    tracing::info!("Compactor is shutting down");
347                    return;
348                }
349            }
350
351            let (request_sender, response_event_stream) = match hummock_meta_client
352                .subscribe_iceberg_compaction_event()
353                .await
354            {
355                Ok((request_sender, response_event_stream)) => {
356                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
357                    (request_sender, response_event_stream)
358                }
359
360                Err(e) => {
361                    tracing::warn!(
362                        error = %e.as_report(),
363                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
364                    );
365                    continue 'start_stream;
366                }
367            };
368
369            pin_mut!(response_event_stream);
370
371            let _executor = compactor_context.compaction_executor.clone();
372
373            // This inner loop is to consume stream or report task progress.
374            let mut event_loop_iteration_now = Instant::now();
375            'consume_stream: loop {
376                {
377                    // report
378                    compactor_context
379                        .compactor_metrics
380                        .compaction_event_loop_iteration_latency
381                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
382                    event_loop_iteration_now = Instant::now();
383                }
384
385                let request_sender = request_sender.clone();
386                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
387                    // Handle task completion notifications
388                    Some(completed_task_id) = task_completion_rx.recv() => {
389                        tracing::debug!(task_id = completed_task_id, "Task completed, updating queue state");
390                        task_queue.finish_running(completed_task_id);
391                        continue 'consume_stream;
392                    }
393
394                    // Event-driven task scheduling - wait for tasks to become schedulable
395                    _ = task_queue.wait_schedulable() => {
396                        schedule_queued_tasks(
397                            &mut task_queue,
398                            &compactor_context,
399                            &shutdown_map,
400                            &task_completion_tx,
401                        );
402                        continue 'consume_stream;
403                    }
404
405                    _ = periodic_event_interval.tick() => {
406                        // Only handle meta task pulling in periodic tick
407                        let should_restart_stream = handle_meta_task_pulling(
408                            &mut pull_task_ack,
409                            &task_queue,
410                            max_task_parallelism,
411                            max_pull_task_count,
412                            &request_sender,
413                        );
414
415                        if should_restart_stream {
416                            continue 'start_stream;
417                        }
418                        continue;
419                    }
420                    event = response_event_stream.next() => {
421                        event
422                    }
423
424                    _ = &mut shutdown_rx => {
425                        tracing::info!("Iceberg Compactor is shutting down");
426                        return
427                    }
428                };
429
430                match event {
431                    Some(Ok(SubscribeIcebergCompactionEventResponse {
432                        event,
433                        create_at: _create_at,
434                    })) => {
435                        let event = match event {
436                            Some(event) => event,
437                            None => continue 'consume_stream,
438                        };
439
440                        match event {
441                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
442                                let task_id = iceberg_compaction_task.task_id;
443                                 let write_parquet_properties = WriterProperties::builder()
444                                        .set_created_by(concat!(
445                                            "risingwave version ",
446                                            env!("CARGO_PKG_VERSION")
447                                        )
448                                        .to_owned())
449                                        .set_max_row_group_size(
450                                            compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
451                                        )
452                                        .set_compression(Compression::SNAPPY) // TODO: make it configurable
453                                        .build();
454
455                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
456                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
457                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
458                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
459                                    .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
460                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
461                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
462                                    .write_parquet_properties(write_parquet_properties)
463                                    .small_file_threshold(compactor_context.storage_opts.iceberg_compaction_small_file_threshold_mb as u64 * 1024 * 1024)
464                                    .max_task_total_size(
465                                        compactor_context.storage_opts.iceberg_compaction_max_task_total_size_mb as u64 * 1024 * 1024,
466                                    )
467                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
468                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
469                                    .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
470                                    .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
471                                    .build() {
472                                    Ok(config) => config,
473                                    Err(e) => {
474                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
475                                        continue 'consume_stream;
476                                    }
477                                };
478
479                                let iceberg_runner = match IcebergCompactorRunner::new(
480                                    iceberg_compaction_task,
481                                    compactor_runner_config,
482                                    compactor_context.compactor_metrics.clone(),
483                                ).await {
484                                    Ok(runner) => runner,
485                                    Err(e) => {
486                                        tracing::warn!(error = %e.as_report(), "Failed to create iceberg compactor runner {}", task_id);
487                                        continue 'consume_stream;
488                                    }
489                                };
490
491                                // Push task to queue instead of directly spawning
492                                let meta = iceberg_runner.to_meta();
493                                let push_result = task_queue.push(meta.clone(), Some(iceberg_runner));
494
495                                match push_result {
496                                    PushResult::Added => {
497                                        tracing::info!(
498                                            task_id = task_id,
499                                            unique_ident = %meta.unique_ident,
500                                            required_parallelism = meta.required_parallelism,
501                                            "Iceberg compaction task added to queue"
502                                        );
503                                    },
504                                    PushResult::Replaced { old_task_id } => {
505                                        tracing::info!(
506                                            task_id = task_id,
507                                            old_task_id = old_task_id,
508                                            unique_ident = %meta.unique_ident,
509                                            required_parallelism = meta.required_parallelism,
510                                            "Iceberg compaction task replaced in queue"
511                                        );
512                                    },
513                                    PushResult::RejectedRunningDuplicate => {
514                                        tracing::warn!(
515                                            task_id = task_id,
516                                            unique_ident = %meta.unique_ident,
517                                            "Iceberg compaction task rejected - duplicate already running"
518                                        );
519                                    },
520                                    PushResult::RejectedCapacity => {
521                                        tracing::warn!(
522                                            task_id = task_id,
523                                            unique_ident = %meta.unique_ident,
524                                            required_parallelism = meta.required_parallelism,
525                                            pending_budget = pending_parallelism_budget,
526                                            "Iceberg compaction task rejected - queue capacity exceeded"
527                                        );
528                                    },
529                                    PushResult::RejectedTooLarge => {
530                                        tracing::error!(
531                                            task_id = task_id,
532                                            unique_ident = %meta.unique_ident,
533                                            required_parallelism = meta.required_parallelism,
534                                            max_parallelism = max_task_parallelism,
535                                            "Iceberg compaction task rejected - parallelism requirement exceeds max"
536                                        );
537                                    },
538                                    PushResult::RejectedInvalidParallelism => {
539                                        tracing::error!(
540                                            task_id = task_id,
541                                            unique_ident = %meta.unique_ident,
542                                            required_parallelism = meta.required_parallelism,
543                                            "Iceberg compaction task rejected - invalid parallelism (must be > 0)"
544                                        );
545                                    }
546                                }
547                            },
548                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
549                                // set flag
550                                pull_task_ack = true;
551                            },
552                    }
553                    }
554                    Some(Err(e)) => {
555                        tracing::warn!("Failed to consume stream. {}", e.message());
556                        continue 'start_stream;
557                    }
558                    _ => {
559                        // The stream is exhausted
560                        continue 'start_stream;
561                    }
562                }
563            }
564        }
565    });
566
567    (join_handle, shutdown_tx)
568}
569
570/// The background compaction thread that receives compaction tasks from hummock compaction
571/// manager and runs compaction tasks.
572#[must_use]
573pub fn start_compactor(
574    compactor_context: CompactorContext,
575    hummock_meta_client: Arc<dyn HummockMetaClient>,
576    object_id_manager: Arc<ObjectIdManager>,
577    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
578) -> (JoinHandle<()>, Sender<()>) {
579    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
580    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
581    let stream_retry_interval = Duration::from_secs(30);
582    let task_progress = compactor_context.task_progress_manager.clone();
583    let periodic_event_update_interval = Duration::from_millis(1000);
584
585    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
586        * compactor_context.storage_opts.compactor_max_task_multiplier)
587        .ceil() as u32;
588    let running_task_parallelism = Arc::new(AtomicU32::new(0));
589
590    const MAX_PULL_TASK_COUNT: u32 = 4;
591    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
592
593    assert_ge!(
594        compactor_context.storage_opts.compactor_max_task_multiplier,
595        0.0
596    );
597
598    let join_handle = tokio::spawn(async move {
599        let shutdown_map = CompactionShutdownMap::default();
600        let mut min_interval = tokio::time::interval(stream_retry_interval);
601        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
602
603        // This outer loop is to recreate stream.
604        'start_stream: loop {
605            // reset state
606            // pull_task_ack.store(true, Ordering::SeqCst);
607            let mut pull_task_ack = true;
608            tokio::select! {
609                // Wait for interval.
610                _ = min_interval.tick() => {},
611                // Shutdown compactor.
612                _ = &mut shutdown_rx => {
613                    tracing::info!("Compactor is shutting down");
614                    return;
615                }
616            }
617
618            let (request_sender, response_event_stream) =
619                match hummock_meta_client.subscribe_compaction_event().await {
620                    Ok((request_sender, response_event_stream)) => {
621                        tracing::debug!("Succeeded subscribe_compaction_event.");
622                        (request_sender, response_event_stream)
623                    }
624
625                    Err(e) => {
626                        tracing::warn!(
627                            error = %e.as_report(),
628                            "Subscribing to compaction tasks failed with error. Will retry.",
629                        );
630                        continue 'start_stream;
631                    }
632                };
633
634            pin_mut!(response_event_stream);
635
636            let executor = compactor_context.compaction_executor.clone();
637            let object_id_manager = object_id_manager.clone();
638
639            // This inner loop is to consume stream or report task progress.
640            let mut event_loop_iteration_now = Instant::now();
641            'consume_stream: loop {
642                {
643                    // report
644                    compactor_context
645                        .compactor_metrics
646                        .compaction_event_loop_iteration_latency
647                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
648                    event_loop_iteration_now = Instant::now();
649                }
650
651                let running_task_parallelism = running_task_parallelism.clone();
652                let request_sender = request_sender.clone();
653                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
654                    _ = periodic_event_interval.tick() => {
655                        let progress_list = get_task_progress(task_progress.clone());
656
657                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
658                            event: Some(RequestEvent::HeartBeat(
659                                HeartBeat {
660                                    progress: progress_list
661                                }
662                            )),
663                            create_at: SystemTime::now()
664                                .duration_since(std::time::UNIX_EPOCH)
665                                .expect("Clock may have gone backwards")
666                                .as_millis() as u64,
667                        }) {
668                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
669                            // re subscribe stream
670                            continue 'start_stream;
671                        }
672
673
674                        let mut pending_pull_task_count = 0;
675                        if pull_task_ack {
676                            // TODO: Compute parallelism on meta side
677                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
678
679                            if pending_pull_task_count > 0 {
680                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
681                                    event: Some(RequestEvent::PullTask(
682                                        PullTask {
683                                            pull_task_count: pending_pull_task_count,
684                                        }
685                                    )),
686                                    create_at: SystemTime::now()
687                                        .duration_since(std::time::UNIX_EPOCH)
688                                        .expect("Clock may have gone backwards")
689                                        .as_millis() as u64,
690                                }) {
691                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
692
693                                    // re subscribe stream
694                                    continue 'start_stream;
695                                } else {
696                                    pull_task_ack = false;
697                                }
698                            }
699                        }
700
701                        tracing::info!(
702                            running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
703                            pull_task_ack = %pull_task_ack,
704                            pending_pull_task_count = %pending_pull_task_count
705                        );
706
707                        continue;
708                    }
709                    event = response_event_stream.next() => {
710                        event
711                    }
712
713                    _ = &mut shutdown_rx => {
714                        tracing::info!("Compactor is shutting down");
715                        return
716                    }
717                };
718
719                fn send_report_task_event(
720                    compact_task: &CompactTask,
721                    table_stats: TableStatsMap,
722                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
723                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
724                ) {
725                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
726                        event: Some(RequestEvent::ReportTask(ReportTask {
727                            task_id: compact_task.task_id,
728                            task_status: compact_task.task_status.into(),
729                            sorted_output_ssts: compact_task
730                                .sorted_output_ssts
731                                .iter()
732                                .map(|sst| sst.into())
733                                .collect(),
734                            table_stats_change: to_prost_table_stats_map(table_stats),
735                            object_timestamps: object_timestamps
736                                .into_iter()
737                                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
738                                .collect(),
739                        })),
740                        create_at: SystemTime::now()
741                            .duration_since(std::time::UNIX_EPOCH)
742                            .expect("Clock may have gone backwards")
743                            .as_millis() as u64,
744                    }) {
745                        let task_id = compact_task.task_id;
746                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
747                    }
748                }
749
750                match event {
751                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
752                        let event = match event {
753                            Some(event) => event,
754                            None => continue 'consume_stream,
755                        };
756                        let shutdown = shutdown_map.clone();
757                        let context = compactor_context.clone();
758                        let consumed_latency_ms = SystemTime::now()
759                            .duration_since(std::time::UNIX_EPOCH)
760                            .expect("Clock may have gone backwards")
761                            .as_millis() as u64
762                            - create_at;
763                        context
764                            .compactor_metrics
765                            .compaction_event_consumed_latency
766                            .observe(consumed_latency_ms as _);
767
768                        let object_id_manager = object_id_manager.clone();
769                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
770
771                        match event {
772                            ResponseEvent::CompactTask(compact_task) => {
773                                let compact_task = CompactTask::from(compact_task);
774                                let parallelism =
775                                    calculate_task_parallelism(&compact_task, &context);
776
777                                assert_ne!(parallelism, 0, "splits cannot be empty");
778
779                                if (max_task_parallelism
780                                    - running_task_parallelism.load(Ordering::SeqCst))
781                                    < parallelism as u32
782                                {
783                                    tracing::warn!(
784                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
785                                        compact_task.task_id,
786                                        parallelism,
787                                        max_task_parallelism,
788                                        running_task_parallelism.load(Ordering::Relaxed),
789                                    );
790                                    let (compact_task, table_stats, object_timestamps) =
791                                        compact_done(
792                                            compact_task,
793                                            context.clone(),
794                                            vec![],
795                                            TaskStatus::NoAvailCpuResourceCanceled,
796                                        );
797
798                                    send_report_task_event(
799                                        &compact_task,
800                                        table_stats,
801                                        object_timestamps,
802                                        &request_sender,
803                                    );
804
805                                    continue 'consume_stream;
806                                }
807
808                                running_task_parallelism
809                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
810                                executor.spawn(async move {
811                                    let (tx, rx) = tokio::sync::oneshot::channel();
812                                    let task_id = compact_task.task_id;
813                                    shutdown.lock().unwrap().insert(task_id, tx);
814
815                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
816                                        context.clone(),
817                                        compact_task,
818                                        rx,
819                                        object_id_manager.clone(),
820                                        compaction_catalog_manager_ref.clone(),
821                                    )
822                                    .await;
823
824                                    shutdown.lock().unwrap().remove(&task_id);
825                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
826
827                                    send_report_task_event(
828                                        &compact_task,
829                                        table_stats,
830                                        object_timestamps,
831                                        &request_sender,
832                                    );
833
834                                    let enable_check_compaction_result =
835                                    context.storage_opts.check_compaction_result;
836                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
837
838                                    if enable_check_compaction_result && need_check_task {
839                                        let compact_table_ids = compact_task.build_compact_table_ids();
840                                        match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
841                                            Ok(compaction_catalog_agent_ref) =>  {
842                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
843                                                {
844                                                    Err(e) => {
845                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
846                                                    }
847                                                    Ok(true) => (),
848                                                    Ok(false) => {
849                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
850                                                    }
851                                                }
852                                            },
853                                            Err(e) => {
854                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
855                                            }
856                                        }
857                                    }
858                                });
859                            }
860                            ResponseEvent::VacuumTask(_) => {
861                                unreachable!("unexpected vacuum task");
862                            }
863                            ResponseEvent::FullScanTask(_) => {
864                                unreachable!("unexpected scan task");
865                            }
866                            ResponseEvent::ValidationTask(validation_task) => {
867                                let validation_task = ValidationTask::from(validation_task);
868                                executor.spawn(async move {
869                                    validate_ssts(validation_task, context.sstable_store.clone())
870                                        .await;
871                                });
872                            }
873                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
874                                .lock()
875                                .unwrap()
876                                .remove(&cancel_compact_task.task_id)
877                            {
878                                Some(tx) => {
879                                    if tx.send(()).is_err() {
880                                        tracing::warn!(
881                                            "Cancellation of compaction task failed. task_id: {}",
882                                            cancel_compact_task.task_id
883                                        );
884                                    }
885                                }
886                                _ => {
887                                    tracing::warn!(
888                                        "Attempting to cancel non-existent compaction task. task_id: {}",
889                                        cancel_compact_task.task_id
890                                    );
891                                }
892                            },
893
894                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
895                                // set flag
896                                pull_task_ack = true;
897                            }
898                        }
899                    }
900                    Some(Err(e)) => {
901                        tracing::warn!("Failed to consume stream. {}", e.message());
902                        continue 'start_stream;
903                    }
904                    _ => {
905                        // The stream is exhausted
906                        continue 'start_stream;
907                    }
908                }
909            }
910        }
911    });
912
913    (join_handle, shutdown_tx)
914}
915
916/// The background compaction thread that receives compaction tasks from hummock compaction
917/// manager and runs compaction tasks.
918#[must_use]
919pub fn start_shared_compactor(
920    grpc_proxy_client: GrpcCompactorProxyClient,
921    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
922    context: CompactorContext,
923) -> (JoinHandle<()>, Sender<()>) {
924    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
925    let task_progress = context.task_progress_manager.clone();
926    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
927    let periodic_event_update_interval = Duration::from_millis(1000);
928
929    let join_handle = tokio::spawn(async move {
930        let shutdown_map = CompactionShutdownMap::default();
931
932        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
933        let executor = context.compaction_executor.clone();
934        let report_heartbeat_client = grpc_proxy_client.clone();
935        'consume_stream: loop {
936            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
937                _ = periodic_event_interval.tick() => {
938                    let progress_list = get_task_progress(task_progress.clone());
939                    let report_compaction_task_request = ReportCompactionTaskRequest{
940                        event: Some(ReportCompactionTaskEvent::HeartBeat(
941                            SharedHeartBeat {
942                                progress: progress_list
943                            }
944                        )),
945                     };
946                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
947                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
948                    }
949                    continue
950                }
951
952
953                _ = &mut shutdown_rx => {
954                    tracing::info!("Compactor is shutting down");
955                    return
956                }
957
958                request = receiver.recv() => {
959                    request
960                }
961
962            };
963            match request {
964                Some(request) => {
965                    let context = context.clone();
966                    let shutdown = shutdown_map.clone();
967
968                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
969                    executor.spawn(async move {
970                        let DispatchCompactionTaskRequest {
971                            tables,
972                            output_object_ids,
973                            task: dispatch_task,
974                        } = request.into_inner();
975                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
976                            acc.insert(table.id, table);
977                            acc
978                        });
979
980                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
981                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
982                        let shared_compactor_object_id_manager =
983                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
984                            match dispatch_task.unwrap() {
985                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
986                                    let compact_task = CompactTask::from(&compact_task);
987                                    let (tx, rx) = tokio::sync::oneshot::channel();
988                                    let task_id = compact_task.task_id;
989                                    shutdown.lock().unwrap().insert(task_id, tx);
990
991                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
992                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
993                                        context.clone(),
994                                        compact_task,
995                                        rx,
996                                        shared_compactor_object_id_manager,
997                                        compaction_catalog_agent_ref.clone(),
998                                    )
999                                    .await;
1000                                    shutdown.lock().unwrap().remove(&task_id);
1001                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1002                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1003                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1004                                            table_stats_change: to_prost_table_stats_map(table_stats),
1005                                            object_timestamps: object_timestamps
1006                                            .into_iter()
1007                                            .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
1008                                            .collect(),
1009                                    })),
1010                                    };
1011
1012                                    match cloned_grpc_proxy_client
1013                                        .report_compaction_task(report_compaction_task_request)
1014                                        .await
1015                                    {
1016                                        Ok(_) => {
1017                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1018                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1019                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1020                                            if enable_check_compaction_result && need_check_task {
1021                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1022                                                    Err(e) => {
1023                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1024                                                    },
1025                                                    Ok(true) => (),
1026                                                    Ok(false) => {
1027                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1028                                                    }
1029                                                }
1030                                            }
1031                                        }
1032                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1033                                    }
1034
1035                                }
1036                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1037                                    unreachable!("unexpected vacuum task");
1038                                }
1039                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1040                                    unreachable!("unexpected scan task");
1041                                }
1042                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1043                                    let validation_task = ValidationTask::from(validation_task);
1044                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1045                                }
1046                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1047                                    match shutdown
1048                                        .lock()
1049                                        .unwrap()
1050                                        .remove(&cancel_compact_task.task_id)
1051                                    { Some(tx) => {
1052                                        if tx.send(()).is_err() {
1053                                            tracing::warn!(
1054                                                "Cancellation of compaction task failed. task_id: {}",
1055                                                cancel_compact_task.task_id
1056                                            );
1057                                        }
1058                                    } _ => {
1059                                        tracing::warn!(
1060                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1061                                            cancel_compact_task.task_id
1062                                        );
1063                                    }}
1064                                }
1065                            }
1066                    });
1067                }
1068                None => continue 'consume_stream,
1069            }
1070        }
1071    });
1072    (join_handle, shutdown_tx)
1073}
1074
1075fn get_task_progress(
1076    task_progress: Arc<
1077        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1078    >,
1079) -> Vec<CompactTaskProgress> {
1080    let mut progress_list = Vec::new();
1081    for (&task_id, progress) in &*task_progress.lock() {
1082        progress_list.push(progress.snapshot(task_id));
1083    }
1084    progress_list
1085}
1086
1087/// Schedule queued tasks if we have capacity
1088fn schedule_queued_tasks(
1089    task_queue: &mut IcebergTaskQueue,
1090    compactor_context: &CompactorContext,
1091    shutdown_map: &Arc<Mutex<HashMap<u64, tokio::sync::oneshot::Sender<()>>>>,
1092    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<u64>,
1093) {
1094    while let Some(popped_task) = task_queue.pop() {
1095        let task_id = popped_task.meta.task_id;
1096        let Some(runner) = popped_task.runner else {
1097            tracing::error!(
1098                task_id = task_id,
1099                "Popped task missing runner - this should not happen"
1100            );
1101            task_queue.finish_running(task_id);
1102            continue;
1103        };
1104
1105        let executor = compactor_context.compaction_executor.clone();
1106        let shutdown_map_clone = shutdown_map.clone();
1107        let completion_tx_clone = task_completion_tx.clone();
1108
1109        tracing::info!(
1110            task_id = task_id,
1111            unique_ident = %popped_task.meta.unique_ident,
1112            required_parallelism = popped_task.meta.required_parallelism,
1113            "Starting iceberg compaction task from queue"
1114        );
1115
1116        executor.spawn(async move {
1117            let (tx, rx) = tokio::sync::oneshot::channel();
1118            {
1119                let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1120                shutdown_guard.insert(task_id, tx);
1121            }
1122
1123            let _cleanup_guard = scopeguard::guard(
1124                (task_id, shutdown_map_clone, completion_tx_clone),
1125                move |(task_id, shutdown_map, completion_tx)| {
1126                    {
1127                        let mut shutdown_guard = shutdown_map.lock().unwrap();
1128                        shutdown_guard.remove(&task_id);
1129                    }
1130                    // Notify main loop that task is completed
1131                    // Multiple tasks can send completion notifications concurrently via mpsc
1132                    if completion_tx.send(task_id).is_err() {
1133                        tracing::warn!(task_id = task_id, "Failed to notify task completion - main loop may have shut down");
1134                    }
1135                },
1136            );
1137
1138            if let Err(e) = runner.compact(rx).await {
1139                tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
1140            }
1141        });
1142    }
1143}
1144
1145/// Handle pulling new tasks from meta service
1146/// Returns true if the stream should be restarted
1147fn handle_meta_task_pulling(
1148    pull_task_ack: &mut bool,
1149    task_queue: &IcebergTaskQueue,
1150    max_task_parallelism: u32,
1151    max_pull_task_count: u32,
1152    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1153) -> bool {
1154    let mut pending_pull_task_count = 0;
1155    if *pull_task_ack {
1156        // Use queue's running parallelism for pull decision
1157        let current_running_parallelism = task_queue.running_parallelism_sum();
1158        pending_pull_task_count =
1159            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1160
1161        if pending_pull_task_count > 0 {
1162            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1163                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1164                    subscribe_iceberg_compaction_event_request::PullTask {
1165                        pull_task_count: pending_pull_task_count,
1166                    },
1167                )),
1168                create_at: SystemTime::now()
1169                    .duration_since(std::time::UNIX_EPOCH)
1170                    .expect("Clock may have gone backwards")
1171                    .as_millis() as u64,
1172            }) {
1173                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1174                return true; // Signal to restart stream
1175            } else {
1176                *pull_task_ack = false;
1177            }
1178        }
1179    }
1180
1181    tracing::info!(
1182        running_parallelism_count = %task_queue.running_parallelism_sum(),
1183        waiting_parallelism_count = %task_queue.waiting_parallelism_sum(),
1184        available_parallelism = %(max_task_parallelism.saturating_sub(task_queue.running_parallelism_sum())),
1185        pull_task_ack = %*pull_task_ack,
1186        pending_pull_task_count = %pending_pull_task_count
1187    );
1188
1189    false // No need to restart stream
1190}