Skip to main content

risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24    ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28    subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58// Import iceberg compactor runner types from the local `iceberg_compaction` module.
59use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61    IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62    ReportSendResult, build_iceberg_task_report, create_task_execution,
63    flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77    CompactTaskProgress, PbSstableFilterLayout, PbSstableFilterType, ReportCompactionTaskRequest,
78    SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::compact;
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88    check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93    GetObjectId, HummockError, HummockErrorInner, HummockResult, ObjectIdManager,
94    SstableBuilderOptions, Xor8FilterBuilder, Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104    BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, FilterBuilder, NoneFilterBuilder,
105    SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory,
106    validate_ssts,
107};
108use crate::monitor::CompactorMetrics;
109
110/// Heartbeat logging interval for compaction tasks
111const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
112
113/// Represents the compaction task state for logging purposes
114#[derive(Debug, Clone, PartialEq, Eq)]
115struct CompactionLogState {
116    running_parallelism: u32,
117    pull_task_ack: bool,
118    pending_pull_task_count: u32,
119}
120
121/// Represents the iceberg compaction task state for logging purposes
122#[derive(Debug, Clone, PartialEq, Eq)]
123struct IcebergCompactionLogState {
124    running_parallelism: u32,
125    waiting_parallelism: u32,
126    available_parallelism: u32,
127    pull_task_ack: bool,
128    pending_pull_task_count: u32,
129}
130
131/// Controls periodic logging with state change detection
132struct LogThrottler<T: PartialEq> {
133    last_logged_state: Option<T>,
134    last_heartbeat: Instant,
135    heartbeat_interval: Duration,
136}
137
138impl<T: PartialEq> LogThrottler<T> {
139    fn new(heartbeat_interval: Duration) -> Self {
140        Self {
141            last_logged_state: None,
142            last_heartbeat: Instant::now(),
143            heartbeat_interval,
144        }
145    }
146
147    /// Returns true if logging should occur (state changed or heartbeat interval elapsed)
148    fn should_log(&self, current_state: &T) -> bool {
149        self.last_logged_state.as_ref() != Some(current_state)
150            || self.last_heartbeat.elapsed() >= self.heartbeat_interval
151    }
152
153    /// Updates the state and heartbeat timestamp after logging
154    fn update(&mut self, current_state: T) {
155        self.last_logged_state = Some(current_state);
156        self.last_heartbeat = Instant::now();
157    }
158}
159
160/// Implementation of Hummock compaction.
161pub struct Compactor {
162    /// The context of the compactor.
163    context: CompactorContext,
164    object_id_getter: Arc<dyn GetObjectId>,
165    task_config: TaskConfig,
166    options: SstableBuilderOptions,
167    get_id_time: Arc<AtomicU64>,
168}
169
170pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
171
172impl Compactor {
173    /// Create a new compactor.
174    pub fn new(
175        context: CompactorContext,
176        options: SstableBuilderOptions,
177        task_config: TaskConfig,
178        object_id_getter: Arc<dyn GetObjectId>,
179    ) -> Self {
180        Self {
181            context,
182            options,
183            task_config,
184            get_id_time: Arc::new(AtomicU64::new(0)),
185            object_id_getter,
186        }
187    }
188
189    /// Compact the given key range and merge iterator.
190    /// Upon a successful return, the built SSTs are already uploaded to object store.
191    ///
192    /// `task_progress` is only used for tasks on the compactor.
193    async fn compact_key_range(
194        &self,
195        iter: impl HummockIterator<Direction = Forward>,
196        compaction_filter: impl CompactionFilter,
197        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
198        task_progress: Option<Arc<TaskProgress>>,
199        task_id: Option<HummockCompactionTaskId>,
200        split_index: Option<usize>,
201    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
202        // Monitor time cost building shared buffer to SSTs.
203        let compact_timer = if self.context.is_share_buffer_compact {
204            self.context
205                .compactor_metrics
206                .write_build_l0_sst_duration
207                .start_timer()
208        } else {
209            self.context
210                .compactor_metrics
211                .compact_sst_duration
212                .start_timer()
213        };
214
215        let (split_table_outputs, table_stats_map) = {
216            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
217            match (
218                self.task_config.sstable_filter_type,
219                self.task_config.sstable_filter_layout,
220            ) {
221                (PbSstableFilterType::SstableFilterNone, _) => {
222                    self.compact_key_range_impl::<_, NoneFilterBuilder>(
223                        factory,
224                        iter,
225                        compaction_filter,
226                        compaction_catalog_agent_ref,
227                        task_progress.clone(),
228                        self.object_id_getter.clone(),
229                    )
230                    .instrument_await("compact".verbose())
231                    .await?
232                }
233                (PbSstableFilterType::SstableFilterXor8, PbSstableFilterLayout::Blocked) => {
234                    self.compact_key_range_impl::<_, BlockedXor8FilterBuilder>(
235                        factory,
236                        iter,
237                        compaction_filter,
238                        compaction_catalog_agent_ref,
239                        task_progress.clone(),
240                        self.object_id_getter.clone(),
241                    )
242                    .instrument_await("compact".verbose())
243                    .await?
244                }
245                (PbSstableFilterType::SstableFilterXor8, PbSstableFilterLayout::Plain) => {
246                    self.compact_key_range_impl::<_, Xor8FilterBuilder>(
247                        factory,
248                        iter,
249                        compaction_filter,
250                        compaction_catalog_agent_ref,
251                        task_progress.clone(),
252                        self.object_id_getter.clone(),
253                    )
254                    .instrument_await("compact".verbose())
255                    .await?
256                }
257                (PbSstableFilterType::SstableFilterXor16, PbSstableFilterLayout::Blocked) => {
258                    self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
259                        factory,
260                        iter,
261                        compaction_filter,
262                        compaction_catalog_agent_ref,
263                        task_progress.clone(),
264                        self.object_id_getter.clone(),
265                    )
266                    .instrument_await("compact".verbose())
267                    .await?
268                }
269                (PbSstableFilterType::SstableFilterXor16, PbSstableFilterLayout::Plain) => {
270                    self.compact_key_range_impl::<_, Xor16FilterBuilder>(
271                        factory,
272                        iter,
273                        compaction_filter,
274                        compaction_catalog_agent_ref,
275                        task_progress.clone(),
276                        self.object_id_getter.clone(),
277                    )
278                    .instrument_await("compact".verbose())
279                    .await?
280                }
281                (filter_type, layout) => {
282                    return Err(HummockError::compaction_executor(format!(
283                        "unresolved SST filter layout in task config: {:?}, {:?}",
284                        filter_type, layout
285                    )));
286                }
287            }
288        };
289
290        compact_timer.observe_duration();
291
292        Self::report_progress(
293            self.context.compactor_metrics.clone(),
294            task_progress,
295            &split_table_outputs,
296            self.context.is_share_buffer_compact,
297        );
298
299        self.context
300            .compactor_metrics
301            .get_table_id_total_time_duration
302            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
303
304        debug_assert!(
305            split_table_outputs
306                .iter()
307                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
308        );
309
310        if task_id.is_some() {
311            // skip shared buffer compaction
312            tracing::info!(
313                "Finish Task {:?} split_index {:?} sst count {}",
314                task_id,
315                split_index,
316                split_table_outputs.len()
317            );
318        }
319        Ok((split_table_outputs, table_stats_map))
320    }
321
322    pub fn report_progress(
323        metrics: Arc<CompactorMetrics>,
324        task_progress: Option<Arc<TaskProgress>>,
325        ssts: &Vec<LocalSstableInfo>,
326        is_share_buffer_compact: bool,
327    ) {
328        for sst_info in ssts {
329            let sst_size = sst_info.file_size();
330            if let Some(tracker) = &task_progress {
331                tracker.inc_ssts_uploaded();
332                tracker.dec_num_pending_write_io();
333            }
334            if is_share_buffer_compact {
335                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
336            } else {
337                metrics.compaction_upload_sst_counts.inc();
338            }
339        }
340    }
341
342    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
343        &self,
344        writer_factory: F,
345        iter: impl HummockIterator<Direction = Forward>,
346        compaction_filter: impl CompactionFilter,
347        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
348        task_progress: Option<Arc<TaskProgress>>,
349        object_id_getter: Arc<dyn GetObjectId>,
350    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
351        let builder_factory = RemoteBuilderFactory::<F, B> {
352            object_id_getter,
353            limiter: self.context.memory_limiter.clone(),
354            options: self.options.clone(),
355            policy: self.task_config.cache_policy,
356            remote_rpc_cost: self.get_id_time.clone(),
357            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
358            sstable_writer_factory: writer_factory,
359            _phantom: PhantomData,
360        };
361
362        let mut sst_builder = CapacitySplitTableBuilder::new(
363            builder_factory,
364            self.context.compactor_metrics.clone(),
365            task_progress.clone(),
366            self.task_config.table_vnode_partition.clone(),
367            self.context
368                .storage_opts
369                .compactor_concurrent_uploading_sst_count,
370            compaction_catalog_agent_ref,
371        );
372        let compaction_statistics = compact_and_build_sst(
373            &mut sst_builder,
374            &self.task_config,
375            self.context.compactor_metrics.clone(),
376            iter,
377            compaction_filter,
378        )
379        .instrument_await("compact_and_build_sst".verbose())
380        .await?;
381
382        let ssts = sst_builder
383            .finish()
384            .instrument_await("builder_finish".verbose())
385            .await?;
386
387        Ok((ssts, compaction_statistics))
388    }
389}
390
391/// The background compaction thread that receives compaction tasks from hummock compaction
392/// manager and runs compaction tasks.
393#[must_use]
394pub fn start_iceberg_compactor(
395    compactor_context: CompactorContext,
396    hummock_meta_client: Arc<dyn HummockMetaClient>,
397) -> (JoinHandle<()>, Sender<()>) {
398    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
399    let stream_retry_interval = Duration::from_secs(30);
400    let periodic_event_update_interval = Duration::from_millis(
401        compactor_context
402            .storage_opts
403            .iceberg_compaction_pull_interval_ms,
404    );
405    let worker_num = compactor_context.compaction_executor.worker_num();
406
407    let max_task_parallelism: u32 = (worker_num as f32
408        * compactor_context.storage_opts.compactor_max_task_multiplier)
409        .ceil() as u32;
410
411    const MAX_PULL_TASK_COUNT: u32 = 4;
412    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
413
414    assert_ge!(
415        compactor_context.storage_opts.compactor_max_task_multiplier,
416        0.0
417    );
418
419    let join_handle = tokio::spawn(async move {
420        // Initialize task queue with event-driven scheduling
421        let pending_parallelism_budget = (max_task_parallelism as f32
422            * compactor_context
423                .storage_opts
424                .iceberg_compaction_pending_parallelism_budget_multiplier)
425            .ceil() as u32;
426        let mut task_queue =
427            IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
428
429        // Shutdown tracking for running tasks (task_key -> shutdown_sender)
430        let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
431
432        // Channel for task completion notifications
433        let (task_completion_tx, mut task_completion_rx) =
434            tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
435        let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
436        // Buffers task reports that failed to send on the current stream.
437        // The queue is flushed in FIFO order after the stream reconnects.
438        let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
439
440        let mut min_interval = tokio::time::interval(stream_retry_interval);
441        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
442
443        // Track last logged state to avoid duplicate logs
444        let mut log_throttler =
445            LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
446
447        // This outer loop is to recreate stream.
448        'start_stream: loop {
449            // reset state
450            // pull_task_ack.store(true, Ordering::SeqCst);
451            let mut pull_task_ack = true;
452            tokio::select! {
453                // Wait for interval.
454                _ = min_interval.tick() => {},
455                // Shutdown compactor.
456                _ = &mut shutdown_rx => {
457                    tracing::info!("Compactor is shutting down");
458                    return;
459                }
460            }
461
462            let (request_sender, response_event_stream) = match hummock_meta_client
463                .subscribe_iceberg_compaction_event()
464                .await
465            {
466                Ok((request_sender, response_event_stream)) => {
467                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
468                    (request_sender, response_event_stream)
469                }
470
471                Err(e) => {
472                    tracing::warn!(
473                        error = %e.as_report(),
474                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
475                    );
476                    continue 'start_stream;
477                }
478            };
479
480            if matches!(
481                flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
482                ReportSendResult::RestartStream
483            ) {
484                continue 'start_stream;
485            }
486
487            pin_mut!(response_event_stream);
488
489            let _executor = compactor_context.compaction_executor.clone();
490
491            // This inner loop is to consume stream or report task progress.
492            let mut event_loop_iteration_now = Instant::now();
493            'consume_stream: loop {
494                {
495                    // report
496                    compactor_context
497                        .compactor_metrics
498                        .compaction_event_loop_iteration_latency
499                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
500                    event_loop_iteration_now = Instant::now();
501                }
502
503                let request_sender = request_sender.clone();
504                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
505                    // Handle task completion notifications
506                    Some(plan_completion) = task_completion_rx.recv() => {
507                        let task_key = plan_completion.task_key;
508                        let error_message = plan_completion.error_message;
509                        tracing::debug!(
510                            task_id = task_key.0,
511                            plan_index = task_key.1,
512                            success = error_message.is_none(),
513                            "Plan completed, updating queue state"
514                        );
515                        task_queue.finish_running(task_key);
516
517                        let completed_task_id = task_key.0;
518                        let Entry::Occupied(mut tracker_entry) =
519                            task_trackers.entry(completed_task_id)
520                        else {
521                            continue 'consume_stream;
522                        };
523                        tracker_entry.get_mut().record_completion(error_message);
524                        if !tracker_entry.get().is_finished() {
525                            continue 'consume_stream;
526                        }
527
528                        let report = tracker_entry.remove().into_report(completed_task_id);
529                        if matches!(
530                            send_or_buffer_iceberg_task_report(
531                                &request_sender,
532                                &mut pending_task_reports,
533                                report,
534                            ),
535                            ReportSendResult::RestartStream
536                        ) {
537                            continue 'start_stream;
538                        }
539                        continue 'consume_stream;
540                    }
541
542                    // Event-driven task scheduling - wait for tasks to become schedulable
543                    _ = task_queue.wait_schedulable() => {
544                        schedule_queued_tasks(
545                            &mut task_queue,
546                            &compactor_context,
547                            &shutdown_map,
548                            &task_completion_tx,
549                        );
550                        continue 'consume_stream;
551                    }
552
553                    _ = periodic_event_interval.tick() => {
554                        // Only handle meta task pulling in periodic tick
555                        let should_restart_stream = handle_meta_task_pulling(
556                            &mut pull_task_ack,
557                            &task_queue,
558                            max_task_parallelism,
559                            max_pull_task_count,
560                            &request_sender,
561                            &mut log_throttler,
562                        );
563
564                        if should_restart_stream {
565                            continue 'start_stream;
566                        }
567                        continue;
568                    }
569                    event = response_event_stream.next() => {
570                        event
571                    }
572
573                    _ = &mut shutdown_rx => {
574                        tracing::info!("Iceberg Compactor is shutting down");
575                        return
576                    }
577                };
578
579                match event {
580                    Some(Ok(SubscribeIcebergCompactionEventResponse {
581                        event,
582                        create_at: _create_at,
583                    })) => {
584                        let event = match event {
585                            Some(event) => event,
586                            None => continue 'consume_stream,
587                        };
588
589                        match event {
590                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
591                                let task_id = iceberg_compaction_task.task_id;
592                                let sink_id = iceberg_compaction_task.sink_id;
593                                // Note: write_parquet_properties is now built from sink config (IcebergConfig) in create_task_execution
594                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
595                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
596                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
597                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
598                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
599                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
600                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
601                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
602                                    .enable_prefetch(compactor_context.storage_opts.iceberg_compaction_enable_prefetch)
603                                    .target_binpack_group_size_mb(
604                                        compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
605                                    )
606                                    .min_group_size_mb(
607                                        compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
608                                    )
609                                    .min_group_file_count(
610                                        compactor_context.storage_opts.iceberg_compaction_min_group_file_count
611                                    )
612                                    .build() {
613                                    Ok(config) => config,
614                                    Err(e) => {
615                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
616                                        let report = build_iceberg_task_report(
617                                            task_id,
618                                            sink_id,
619                                            Some(format!(
620                                                "Failed to build iceberg compactor runner config: {}",
621                                                e.as_report()
622                                            )),
623                                        );
624                                        if matches!(
625                                            send_or_buffer_iceberg_task_report(
626                                                &request_sender,
627                                                &mut pending_task_reports,
628                                                report,
629                                            ),
630                                            ReportSendResult::RestartStream
631                                        ) {
632                                            continue 'start_stream;
633                                        }
634                                        continue 'consume_stream;
635                                    }
636                                };
637
638                                // Create task execution context and plan runners from the task
639                                let task_execution = match create_task_execution(
640                                    iceberg_compaction_task,
641                                    compactor_runner_config,
642                                    compactor_context.compactor_metrics.clone(),
643                                ).await {
644                                    Ok(task_execution) => task_execution,
645                                    Err(e) => {
646                                        tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
647                                        let report = build_iceberg_task_report(
648                                            task_id,
649                                            sink_id,
650                                            Some(format!(
651                                                "Failed to create iceberg compaction task execution: {}",
652                                                e.as_report()
653                                            )),
654                                        );
655                                        if matches!(
656                                            send_or_buffer_iceberg_task_report(
657                                                &request_sender,
658                                                &mut pending_task_reports,
659                                                report,
660                                            ),
661                                            ReportSendResult::RestartStream
662                                        ) {
663                                            continue 'start_stream;
664                                        }
665                                        continue 'consume_stream;
666                                    }
667                                };
668
669                                let sink_id = task_execution.sink_id;
670                                let plan_runners = task_execution.plan_runners;
671
672                                if plan_runners.is_empty() {
673                                    tracing::info!(task_id, sink_id, "No plans to execute");
674                                    let report = build_iceberg_task_report(task_id, sink_id, None);
675                                    if matches!(
676                                        send_or_buffer_iceberg_task_report(
677                                            &request_sender,
678                                            &mut pending_task_reports,
679                                            report,
680                                        ),
681                                        ReportSendResult::RestartStream
682                                    ) {
683                                        continue 'start_stream;
684                                    }
685                                    continue 'consume_stream;
686                                }
687
688                                // Enqueue each plan runner independently
689                                let total_plans = plan_runners.len();
690                                let mut enqueued_count = 0;
691
692                                for runner in plan_runners {
693                                    let meta = runner.to_meta();
694                                    let plan_index = meta.plan_index;
695                                    let required_parallelism = runner.required_parallelism();
696                                    let push_result = task_queue.push(meta, Some(runner));
697
698                                    match push_result {
699                                        PushResult::Added => {
700                                            enqueued_count += 1;
701                                            tracing::debug!(
702                                                task_id = task_id,
703                                                plan_index = plan_index,
704                                                required_parallelism = required_parallelism,
705                                                "Iceberg plan runner added to queue"
706                                            );
707                                        },
708                                        PushResult::RejectedCapacity => {
709                                            tracing::warn!(
710                                                task_id = task_id,
711                                                required_parallelism = required_parallelism,
712                                                pending_budget = pending_parallelism_budget,
713                                                enqueued_count = enqueued_count,
714                                                total_plans = total_plans,
715                                                "Iceberg plan runner rejected - queue capacity exceeded"
716                                            );
717                                            // Stop enqueuing remaining plans
718                                            break;
719                                        },
720                                        PushResult::RejectedTooLarge => {
721                                            tracing::error!(
722                                                task_id = task_id,
723                                                required_parallelism = required_parallelism,
724                                                max_parallelism = max_task_parallelism,
725                                                "Iceberg plan runner rejected - parallelism exceeds max"
726                                            );
727                                        },
728                                        PushResult::RejectedInvalidParallelism => {
729                                            tracing::error!(
730                                                task_id = task_id,
731                                                required_parallelism = required_parallelism,
732                                                "Iceberg plan runner rejected - invalid parallelism"
733                                            );
734                                        },
735                                        PushResult::RejectedDuplicate => {
736                                            tracing::error!(
737                                                task_id = task_id,
738                                                plan_index = plan_index,
739                                                "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
740                                            );
741                                        }
742                                    }
743                                }
744
745                                if enqueued_count == 0 {
746                                    let report = build_iceberg_task_report(
747                                        task_id,
748                                        sink_id,
749                                        Some("Failed to enqueue all iceberg compaction plans".to_owned()),
750                                    );
751                                    if matches!(
752                                        send_or_buffer_iceberg_task_report(
753                                            &request_sender,
754                                            &mut pending_task_reports,
755                                            report,
756                                        ),
757                                        ReportSendResult::RestartStream
758                                    ) {
759                                        continue 'start_stream;
760                                    }
761                                } else {
762                                    task_trackers.insert(
763                                        task_id,
764                                        IcebergTaskTracker::new(sink_id, enqueued_count),
765                                    );
766                                }
767
768                                tracing::info!(
769                                    task_id = task_id,
770                                    sink_id = sink_id,
771                                    total_plans = total_plans,
772                                    enqueued_count = enqueued_count,
773                                    "Enqueued {} of {} Iceberg plan runners",
774                                    enqueued_count,
775                                    total_plans
776                                );
777                            },
778                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
779                                // set flag
780                                pull_task_ack = true;
781                            },
782                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CancelCompactTask(cancel_compact_task) => {
783                                cancel_iceberg_task(
784                                    cancel_compact_task.task_id,
785                                    &mut task_queue,
786                                    &shutdown_map,
787                                    &mut task_trackers,
788                                );
789                            },
790                        }
791                    }
792                    Some(Err(e)) => {
793                        tracing::warn!("Failed to consume stream. {}", e.message());
794                        continue 'start_stream;
795                    }
796                    _ => {
797                        // The stream is exhausted
798                        continue 'start_stream;
799                    }
800                }
801            }
802        }
803    });
804
805    (join_handle, shutdown_tx)
806}
807
808/// The background compaction thread that receives compaction tasks from hummock compaction
809/// manager and runs compaction tasks.
810#[must_use]
811pub fn start_compactor(
812    compactor_context: CompactorContext,
813    hummock_meta_client: Arc<dyn HummockMetaClient>,
814    object_id_manager: Arc<ObjectIdManager>,
815    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
816) -> (JoinHandle<()>, Sender<()>) {
817    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
818    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
819    let stream_retry_interval = Duration::from_secs(30);
820    let task_progress = compactor_context.task_progress_manager.clone();
821    let periodic_event_update_interval = Duration::from_millis(1000);
822
823    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
824        * compactor_context.storage_opts.compactor_max_task_multiplier)
825        .ceil() as u32;
826    let running_task_parallelism = Arc::new(AtomicU32::new(0));
827
828    const MAX_PULL_TASK_COUNT: u32 = 4;
829    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
830
831    assert_ge!(
832        compactor_context.storage_opts.compactor_max_task_multiplier,
833        0.0
834    );
835
836    let join_handle = tokio::spawn(async move {
837        let shutdown_map = CompactionShutdownMap::default();
838        let mut min_interval = tokio::time::interval(stream_retry_interval);
839        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
840
841        // Track last logged state to avoid duplicate logs
842        let mut log_throttler =
843            LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
844
845        // This outer loop is to recreate stream.
846        'start_stream: loop {
847            // reset state
848            // pull_task_ack.store(true, Ordering::SeqCst);
849            let mut pull_task_ack = true;
850            tokio::select! {
851                // Wait for interval.
852                _ = min_interval.tick() => {},
853                // Shutdown compactor.
854                _ = &mut shutdown_rx => {
855                    tracing::info!("Compactor is shutting down");
856                    return;
857                }
858            }
859
860            let (request_sender, response_event_stream) =
861                match hummock_meta_client.subscribe_compaction_event().await {
862                    Ok((request_sender, response_event_stream)) => {
863                        tracing::debug!("Succeeded subscribe_compaction_event.");
864                        (request_sender, response_event_stream)
865                    }
866
867                    Err(e) => {
868                        tracing::warn!(
869                            error = %e.as_report(),
870                            "Subscribing to compaction tasks failed with error. Will retry.",
871                        );
872                        continue 'start_stream;
873                    }
874                };
875
876            pin_mut!(response_event_stream);
877
878            let executor = compactor_context.compaction_executor.clone();
879            let object_id_manager = object_id_manager.clone();
880
881            // This inner loop is to consume stream or report task progress.
882            let mut event_loop_iteration_now = Instant::now();
883            'consume_stream: loop {
884                {
885                    // report
886                    compactor_context
887                        .compactor_metrics
888                        .compaction_event_loop_iteration_latency
889                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
890                    event_loop_iteration_now = Instant::now();
891                }
892
893                let running_task_parallelism = running_task_parallelism.clone();
894                let request_sender = request_sender.clone();
895                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
896                    _ = periodic_event_interval.tick() => {
897                        let progress_list = get_task_progress(task_progress.clone());
898
899                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
900                            event: Some(RequestEvent::HeartBeat(
901                                HeartBeat {
902                                    progress: progress_list
903                                }
904                            )),
905                            create_at: SystemTime::now()
906                                .duration_since(std::time::UNIX_EPOCH)
907                                .expect("Clock may have gone backwards")
908                                .as_millis() as u64,
909                        }) {
910                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
911                            // re subscribe stream
912                            continue 'start_stream;
913                        }
914
915
916                        let mut pending_pull_task_count = 0;
917                        if pull_task_ack {
918                            // TODO: Compute parallelism on meta side
919                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
920
921                            if pending_pull_task_count > 0 {
922                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
923                                    event: Some(RequestEvent::PullTask(
924                                        PullTask {
925                                            pull_task_count: pending_pull_task_count,
926                                        }
927                                    )),
928                                    create_at: SystemTime::now()
929                                        .duration_since(std::time::UNIX_EPOCH)
930                                        .expect("Clock may have gone backwards")
931                                        .as_millis() as u64,
932                                }) {
933                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
934
935                                    // re subscribe stream
936                                    continue 'start_stream;
937                                } else {
938                                    pull_task_ack = false;
939                                }
940                            }
941                        }
942
943                        let running_count = running_task_parallelism.load(Ordering::SeqCst);
944                        let current_state = CompactionLogState {
945                            running_parallelism: running_count,
946                            pull_task_ack,
947                            pending_pull_task_count,
948                        };
949
950                        // Log only when state changes or periodically as heartbeat
951                        if log_throttler.should_log(&current_state) {
952                            tracing::info!(
953                                running_parallelism_count = %current_state.running_parallelism,
954                                pull_task_ack = %current_state.pull_task_ack,
955                                pending_pull_task_count = %current_state.pending_pull_task_count
956                            );
957                            log_throttler.update(current_state);
958                        }
959
960                        continue;
961                    }
962                    event = response_event_stream.next() => {
963                        event
964                    }
965
966                    _ = &mut shutdown_rx => {
967                        tracing::info!("Compactor is shutting down");
968                        return
969                    }
970                };
971
972                fn send_report_task_event(
973                    compact_task: &CompactTask,
974                    table_stats: TableStatsMap,
975                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
976                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
977                ) {
978                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
979                        event: Some(RequestEvent::ReportTask(ReportTask {
980                            task_id: compact_task.task_id,
981                            task_status: compact_task.task_status.into(),
982                            sorted_output_ssts: compact_task
983                                .sorted_output_ssts
984                                .iter()
985                                .map(|sst| sst.into())
986                                .collect(),
987                            table_stats_change: to_prost_table_stats_map(table_stats),
988                            object_timestamps,
989                        })),
990                        create_at: SystemTime::now()
991                            .duration_since(std::time::UNIX_EPOCH)
992                            .expect("Clock may have gone backwards")
993                            .as_millis() as u64,
994                    }) {
995                        let task_id = compact_task.task_id;
996                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
997                    }
998                }
999
1000                match event {
1001                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
1002                        let event = match event {
1003                            Some(event) => event,
1004                            None => continue 'consume_stream,
1005                        };
1006                        let shutdown = shutdown_map.clone();
1007                        let context = compactor_context.clone();
1008                        let consumed_latency_ms = SystemTime::now()
1009                            .duration_since(std::time::UNIX_EPOCH)
1010                            .expect("Clock may have gone backwards")
1011                            .as_millis() as u64
1012                            - create_at;
1013                        context
1014                            .compactor_metrics
1015                            .compaction_event_consumed_latency
1016                            .observe(consumed_latency_ms as _);
1017
1018                        let object_id_manager = object_id_manager.clone();
1019                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
1020
1021                        match event {
1022                            ResponseEvent::CompactTask(compact_task) => {
1023                                let compact_task = CompactTask::from(compact_task);
1024                                let parallelism =
1025                                    calculate_task_parallelism(&compact_task, &context);
1026
1027                                assert_ne!(parallelism, 0, "splits cannot be empty");
1028
1029                                if (max_task_parallelism
1030                                    - running_task_parallelism.load(Ordering::SeqCst))
1031                                    < parallelism as u32
1032                                {
1033                                    tracing::warn!(
1034                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
1035                                        compact_task.task_id,
1036                                        parallelism,
1037                                        max_task_parallelism,
1038                                        running_task_parallelism.load(Ordering::Relaxed),
1039                                    );
1040                                    let (compact_task, table_stats, object_timestamps) =
1041                                        compact_done(
1042                                            compact_task,
1043                                            context.clone(),
1044                                            vec![],
1045                                            TaskStatus::NoAvailCpuResourceCanceled,
1046                                        );
1047
1048                                    send_report_task_event(
1049                                        &compact_task,
1050                                        table_stats,
1051                                        object_timestamps,
1052                                        &request_sender,
1053                                    );
1054
1055                                    continue 'consume_stream;
1056                                }
1057
1058                                running_task_parallelism
1059                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
1060                                executor.spawn(async move {
1061                                    let (tx, rx) = tokio::sync::oneshot::channel();
1062                                    let task_id = compact_task.task_id;
1063                                    shutdown.lock().unwrap().insert(task_id, tx);
1064
1065                                    let (
1066                                        (compact_task, table_stats, object_timestamps),
1067                                        _memory_tracker,
1068                                    ) = compactor_runner::compact(
1069                                        context.clone(),
1070                                        compact_task,
1071                                        rx,
1072                                        object_id_manager.clone(),
1073                                        compaction_catalog_manager_ref.clone(),
1074                                    )
1075                                    .await;
1076
1077                                    shutdown.lock().unwrap().remove(&task_id);
1078                                    running_task_parallelism
1079                                        .fetch_sub(parallelism as u32, Ordering::SeqCst);
1080
1081                                    send_report_task_event(
1082                                        &compact_task,
1083                                        table_stats,
1084                                        object_timestamps,
1085                                        &request_sender,
1086                                    );
1087
1088                                    let enable_check_compaction_result =
1089                                        context.storage_opts.check_compaction_result;
1090                                    let need_check_task =
1091                                        !compact_task.sorted_output_ssts.is_empty()
1092                                            && compact_task.task_status == TaskStatus::Success;
1093
1094                                    if enable_check_compaction_result && need_check_task {
1095                                        let read_table_ids = compact_task
1096                                            .get_table_ids_from_input_ssts()
1097                                            .collect::<Vec<_>>();
1098                                        match compaction_catalog_manager_ref.acquire(read_table_ids).await {
1099                                            Ok(compaction_catalog_agent_ref) =>  {
1100                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1101                                                {
1102                                                    Err(e) => {
1103                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1104                                                    }
1105                                                    Ok(true) => (),
1106                                                    Ok(false) => {
1107                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1108                                                    }
1109                                                }
1110                                            },
1111                                            Err(e) => {
1112                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1113                                            }
1114                                        }
1115                                    }
1116                                });
1117                            }
1118                            #[expect(deprecated)]
1119                            ResponseEvent::VacuumTask(_) => {
1120                                unreachable!("unexpected vacuum task");
1121                            }
1122                            #[expect(deprecated)]
1123                            ResponseEvent::FullScanTask(_) => {
1124                                unreachable!("unexpected scan task");
1125                            }
1126                            #[expect(deprecated)]
1127                            ResponseEvent::ValidationTask(validation_task) => {
1128                                let validation_task = ValidationTask::from(validation_task);
1129                                executor.spawn(async move {
1130                                    validate_ssts(validation_task, context.sstable_store.clone())
1131                                        .await;
1132                                });
1133                            }
1134                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1135                                .lock()
1136                                .unwrap()
1137                                .remove(&cancel_compact_task.task_id)
1138                            {
1139                                Some(tx) => {
1140                                    if tx.send(()).is_err() {
1141                                        tracing::warn!(
1142                                            "Cancellation of compaction task failed. task_id: {}",
1143                                            cancel_compact_task.task_id
1144                                        );
1145                                    }
1146                                }
1147                                _ => {
1148                                    tracing::warn!(
1149                                        "Attempting to cancel non-existent compaction task. task_id: {}",
1150                                        cancel_compact_task.task_id
1151                                    );
1152                                }
1153                            },
1154
1155                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
1156                                // set flag
1157                                pull_task_ack = true;
1158                            }
1159                        }
1160                    }
1161                    Some(Err(e)) => {
1162                        tracing::warn!("Failed to consume stream. {}", e.message());
1163                        continue 'start_stream;
1164                    }
1165                    _ => {
1166                        // The stream is exhausted
1167                        continue 'start_stream;
1168                    }
1169                }
1170            }
1171        }
1172    });
1173
1174    (join_handle, shutdown_tx)
1175}
1176
1177/// The background compaction thread that receives compaction tasks from hummock compaction
1178/// manager and runs compaction tasks.
1179#[must_use]
1180pub fn start_shared_compactor(
1181    grpc_proxy_client: GrpcCompactorProxyClient,
1182    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1183    context: CompactorContext,
1184) -> (JoinHandle<()>, Sender<()>) {
1185    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1186    let task_progress = context.task_progress_manager.clone();
1187    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1188    let periodic_event_update_interval = Duration::from_millis(1000);
1189
1190    let join_handle = tokio::spawn(async move {
1191        let shutdown_map = CompactionShutdownMap::default();
1192
1193        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1194        let executor = context.compaction_executor.clone();
1195        let report_heartbeat_client = grpc_proxy_client.clone();
1196        'consume_stream: loop {
1197            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1198                _ = periodic_event_interval.tick() => {
1199                    let progress_list = get_task_progress(task_progress.clone());
1200                    let report_compaction_task_request = ReportCompactionTaskRequest{
1201                        event: Some(ReportCompactionTaskEvent::HeartBeat(
1202                            SharedHeartBeat {
1203                                progress: progress_list
1204                            }
1205                        )),
1206                     };
1207                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1208                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1209                    }
1210                    continue
1211                }
1212
1213
1214                _ = &mut shutdown_rx => {
1215                    tracing::info!("Compactor is shutting down");
1216                    return
1217                }
1218
1219                request = receiver.recv() => {
1220                    request
1221                }
1222
1223            };
1224            match request {
1225                Some(request) => {
1226                    let context = context.clone();
1227                    let shutdown = shutdown_map.clone();
1228
1229                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1230                    executor.spawn(async move {
1231                        let DispatchCompactionTaskRequest {
1232                            tables,
1233                            output_object_ids,
1234                            task: dispatch_task,
1235                        } = request.into_inner();
1236                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1237                            acc.insert(table.id, table);
1238                            acc
1239                        });
1240
1241                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1242                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1243                        let shared_compactor_object_id_manager =
1244                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1245                            match dispatch_task.unwrap() {
1246                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1247                                    let compact_task = CompactTask::from(&compact_task);
1248                                    let (tx, rx) = tokio::sync::oneshot::channel();
1249                                    let task_id = compact_task.task_id;
1250                                    shutdown.lock().unwrap().insert(task_id, tx);
1251
1252                                    let compaction_catalog_manager_ref =
1253                                        Arc::new(CompactionCatalogManager::new_preloaded(
1254                                            table_id_to_catalog,
1255                                        ));
1256                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1257                                        context.clone(),
1258                                        compact_task,
1259                                        rx,
1260                                        shared_compactor_object_id_manager,
1261                                        compaction_catalog_manager_ref.clone(),
1262                                    )
1263                                    .await;
1264                                    shutdown.lock().unwrap().remove(&task_id);
1265                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1266                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1267                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1268                                            table_stats_change: to_prost_table_stats_map(table_stats),
1269                                            object_timestamps,
1270                                    })),
1271                                    };
1272
1273                                    match cloned_grpc_proxy_client
1274                                        .report_compaction_task(report_compaction_task_request)
1275                                        .await
1276                                    {
1277                                        Ok(_) => {
1278                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1279                                            let enable_check_compaction_result =
1280                                                context.storage_opts.check_compaction_result;
1281                                            let need_check_task =
1282                                                !compact_task.sorted_output_ssts.is_empty()
1283                                                    && compact_task.task_status
1284                                                        == TaskStatus::Success;
1285                                            if enable_check_compaction_result && need_check_task {
1286                                                let read_table_ids = compact_task
1287                                                    .get_table_ids_from_input_ssts()
1288                                                    .collect::<Vec<_>>();
1289                                                match compaction_catalog_manager_ref.acquire(read_table_ids).await {
1290                                                    Ok(compaction_catalog_agent_ref) => {
1291                                                        match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1292                                                        {
1293                                                            Err(e) => {
1294                                                                tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1295                                                            }
1296                                                            Ok(true) => (),
1297                                                            Ok(false) => {
1298                                                                panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1299                                                            }
1300                                                        }
1301                                                    }
1302                                                    Err(e) => {
1303                                                        tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1304                                                    }
1305                                                }
1306                                            }
1307                                        }
1308                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1309                                    }
1310
1311                                }
1312                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1313                                    unreachable!("unexpected vacuum task");
1314                                }
1315                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1316                                    unreachable!("unexpected scan task");
1317                                }
1318                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1319                                    let validation_task = ValidationTask::from(validation_task);
1320                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1321                                }
1322                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1323                                    match shutdown
1324                                        .lock()
1325                                        .unwrap()
1326                                        .remove(&cancel_compact_task.task_id)
1327                                    { Some(tx) => {
1328                                        if tx.send(()).is_err() {
1329                                            tracing::warn!(
1330                                                "Cancellation of compaction task failed. task_id: {}",
1331                                                cancel_compact_task.task_id
1332                                            );
1333                                        }
1334                                    } _ => {
1335                                        tracing::warn!(
1336                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1337                                            cancel_compact_task.task_id
1338                                        );
1339                                    }}
1340                                }
1341                            }
1342                    });
1343                }
1344                None => continue 'consume_stream,
1345            }
1346        }
1347    });
1348    (join_handle, shutdown_tx)
1349}
1350
1351fn get_task_progress(
1352    task_progress: Arc<
1353        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1354    >,
1355) -> Vec<CompactTaskProgress> {
1356    let mut progress_list = Vec::new();
1357    for (&task_id, progress) in &*task_progress.lock() {
1358        progress_list.push(progress.snapshot(task_id));
1359    }
1360    progress_list
1361}
1362
1363/// Schedule queued tasks if we have capacity
1364fn schedule_queued_tasks(
1365    task_queue: &mut IcebergTaskQueue,
1366    compactor_context: &CompactorContext,
1367    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1368    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1369) {
1370    while let Some(popped_task) = task_queue.pop() {
1371        let task_id = popped_task.meta.task_id;
1372        let plan_index = popped_task.meta.plan_index;
1373        let task_key = (task_id, plan_index);
1374
1375        // Get unique_ident before moving runner
1376        let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1377
1378        let Some(runner) = popped_task.runner else {
1379            tracing::error!(
1380                task_id = task_id,
1381                plan_index = plan_index,
1382                "Popped task missing runner - this should not happen"
1383            );
1384            task_queue.finish_running(task_key);
1385            continue;
1386        };
1387
1388        let executor = compactor_context.compaction_executor.clone();
1389        let shutdown_map_clone = shutdown_map.clone();
1390        let completion_tx_clone = task_completion_tx.clone();
1391        let (tx, rx) = tokio::sync::oneshot::channel();
1392
1393        {
1394            let mut shutdown_guard = shutdown_map.lock().unwrap();
1395            shutdown_guard.insert(task_key, tx);
1396        }
1397
1398        tracing::info!(
1399            task_id = task_id,
1400            plan_index = plan_index,
1401            unique_ident = ?unique_ident,
1402            required_parallelism = popped_task.meta.required_parallelism,
1403            "Starting iceberg compaction task from queue"
1404        );
1405
1406        executor.spawn(async move {
1407            let _cleanup_guard = scopeguard::guard(shutdown_map_clone, move |shutdown_map| {
1408                let mut shutdown_guard = shutdown_map.lock().unwrap();
1409                shutdown_guard.remove(&task_key);
1410            });
1411
1412            let result = Box::pin(runner.compact(rx)).await;
1413
1414            let completion = match result {
1415                Ok(_) => IcebergPlanCompletion {
1416                    task_key,
1417                    error_message: None,
1418                },
1419                Err(e) => {
1420                    if is_cancelled_iceberg_compaction_error(&e) {
1421                        tracing::info!(
1422                            task_id = task_key.0,
1423                            plan_index = task_key.1,
1424                            "Iceberg compaction plan cancelled"
1425                        );
1426                    } else {
1427                        tracing::warn!(
1428                            error = %e.as_report(),
1429                            task_id = task_key.0,
1430                            plan_index = task_key.1,
1431                            "Failed to compact iceberg runner"
1432                        );
1433                    }
1434                    IcebergPlanCompletion {
1435                        task_key,
1436                        error_message: Some(e.to_report_string()),
1437                    }
1438                }
1439            };
1440
1441            if completion_tx_clone.send(completion).is_err() {
1442                tracing::warn!(
1443                    task_id = task_key.0,
1444                    plan_index = task_key.1,
1445                    "Failed to notify task completion - main loop may have shut down"
1446                );
1447            }
1448        });
1449    }
1450}
1451
1452fn is_cancelled_iceberg_compaction_error(error: &crate::hummock::HummockError) -> bool {
1453    matches!(
1454        error.inner(),
1455        HummockErrorInner::CompactionExecutor(message) if message == "Plan cancelled"
1456    )
1457}
1458
1459fn cancel_iceberg_task(
1460    task_id: u64,
1461    task_queue: &mut IcebergTaskQueue,
1462    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1463    task_trackers: &mut HashMap<u64, IcebergTaskTracker>,
1464) {
1465    // Meta assigns one task id to an Iceberg compact task, but the compactor
1466    // splits it into multiple plan runners tracked by `(task_id, plan_index)`.
1467    // A cancel event only carries `task_id`, so it must cancel all waiting and
1468    // running plan runners that belong to that task.
1469    let cancelled_waiting = task_queue.cancel_waiting_task(task_id);
1470    let removed_tracker = task_trackers.remove(&task_id).is_some();
1471
1472    let cancelled_running = {
1473        let mut shutdown_guard = shutdown_map.lock().unwrap();
1474        let task_keys: Vec<_> = shutdown_guard
1475            .keys()
1476            .filter(|(running_task_id, _)| *running_task_id == task_id)
1477            .copied()
1478            .collect();
1479
1480        for task_key in &task_keys {
1481            if let Some(tx) = shutdown_guard.remove(task_key)
1482                && tx.send(()).is_err()
1483            {
1484                tracing::debug!(
1485                    task_id = task_key.0,
1486                    plan_index = task_key.1,
1487                    "Iceberg compaction plan shutdown receiver already closed during cancellation"
1488                );
1489            }
1490        }
1491
1492        task_keys.len()
1493    };
1494
1495    if cancelled_waiting == 0 && cancelled_running == 0 && !removed_tracker {
1496        tracing::warn!(
1497            task_id = task_id,
1498            "Attempting to cancel non-existent iceberg compaction task"
1499        );
1500    } else {
1501        tracing::info!(
1502            task_id = task_id,
1503            cancelled_waiting = cancelled_waiting,
1504            cancelled_running = cancelled_running,
1505            removed_tracker = removed_tracker,
1506            "Cancelled iceberg compaction task"
1507        );
1508    }
1509}
1510
1511/// Handle pulling new tasks from meta service
1512/// Returns true if the stream should be restarted
1513fn handle_meta_task_pulling(
1514    pull_task_ack: &mut bool,
1515    task_queue: &IcebergTaskQueue,
1516    max_task_parallelism: u32,
1517    max_pull_task_count: u32,
1518    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1519    log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1520) -> bool {
1521    let mut pending_pull_task_count = 0;
1522    if *pull_task_ack {
1523        // Use queue's running parallelism for pull decision
1524        let current_running_parallelism = task_queue.running_parallelism_sum();
1525        pending_pull_task_count =
1526            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1527
1528        if pending_pull_task_count > 0 {
1529            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1530                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1531                    subscribe_iceberg_compaction_event_request::PullTask {
1532                        pull_task_count: pending_pull_task_count,
1533                    },
1534                )),
1535                create_at: SystemTime::now()
1536                    .duration_since(std::time::UNIX_EPOCH)
1537                    .expect("Clock may have gone backwards")
1538                    .as_millis() as u64,
1539            }) {
1540                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1541                return true; // Signal to restart stream
1542            } else {
1543                *pull_task_ack = false;
1544            }
1545        }
1546    }
1547
1548    let running_count = task_queue.running_parallelism_sum();
1549    let waiting_count = task_queue.waiting_parallelism_sum();
1550    let available_count = max_task_parallelism.saturating_sub(running_count);
1551    let current_state = IcebergCompactionLogState {
1552        running_parallelism: running_count,
1553        waiting_parallelism: waiting_count,
1554        available_parallelism: available_count,
1555        pull_task_ack: *pull_task_ack,
1556        pending_pull_task_count,
1557    };
1558
1559    // Log only when state changes or periodically as heartbeat
1560    if log_throttler.should_log(&current_state) {
1561        tracing::info!(
1562            running_parallelism_count = %current_state.running_parallelism,
1563            waiting_parallelism_count = %current_state.waiting_parallelism,
1564            available_parallelism = %current_state.available_parallelism,
1565            pull_task_ack = %current_state.pull_task_ack,
1566            pending_pull_task_count = %current_state.pending_pull_task_count
1567        );
1568        log_throttler.update(current_state);
1569    }
1570
1571    false // No need to restart stream
1572}
1573
1574#[cfg(test)]
1575mod tests {
1576    use super::*;
1577
1578    #[test]
1579    fn test_cancel_iceberg_task_removes_waiting_plans_and_tracker() {
1580        let task_id = 42;
1581        let mut task_queue = IcebergTaskQueue::new(10, 30);
1582        assert_eq!(
1583            task_queue.push(
1584                iceberg_compaction::IcebergTaskMeta {
1585                    task_id,
1586                    plan_index: 0,
1587                    required_parallelism: 3,
1588                },
1589                None,
1590            ),
1591            PushResult::Added
1592        );
1593        assert_eq!(
1594            task_queue.push(
1595                iceberg_compaction::IcebergTaskMeta {
1596                    task_id,
1597                    plan_index: 1,
1598                    required_parallelism: 4,
1599                },
1600                None,
1601            ),
1602            PushResult::Added
1603        );
1604        assert_eq!(task_queue.waiting_parallelism_sum(), 7);
1605
1606        let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1607        let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 2))]);
1608
1609        cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1610
1611        assert_eq!(task_queue.waiting_parallelism_sum(), 0);
1612        assert!(!task_trackers.contains_key(&task_id));
1613    }
1614
1615    #[test]
1616    fn test_cancel_iceberg_task_shuts_down_running_plans_and_tracker() {
1617        let task_id = 43;
1618        let task_key = (task_id, 0);
1619        let mut task_queue = IcebergTaskQueue::new(10, 30);
1620        let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1621        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1622        shutdown_map.lock().unwrap().insert(task_key, shutdown_tx);
1623        let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 1))]);
1624
1625        cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1626
1627        assert!(shutdown_rx.try_recv().is_ok());
1628        assert!(shutdown_map.lock().unwrap().is_empty());
1629        assert!(!task_trackers.contains_key(&task_id));
1630    }
1631
1632    #[test]
1633    fn test_log_state_equality() {
1634        // Test CompactionLogState
1635        let state1 = CompactionLogState {
1636            running_parallelism: 10,
1637            pull_task_ack: true,
1638            pending_pull_task_count: 2,
1639        };
1640        let state2 = CompactionLogState {
1641            running_parallelism: 10,
1642            pull_task_ack: true,
1643            pending_pull_task_count: 2,
1644        };
1645        let state3 = CompactionLogState {
1646            running_parallelism: 11,
1647            pull_task_ack: true,
1648            pending_pull_task_count: 2,
1649        };
1650        assert_eq!(state1, state2);
1651        assert_ne!(state1, state3);
1652
1653        // Test IcebergCompactionLogState
1654        let ice_state1 = IcebergCompactionLogState {
1655            running_parallelism: 10,
1656            waiting_parallelism: 5,
1657            available_parallelism: 15,
1658            pull_task_ack: true,
1659            pending_pull_task_count: 2,
1660        };
1661        let ice_state2 = IcebergCompactionLogState {
1662            running_parallelism: 10,
1663            waiting_parallelism: 6,
1664            available_parallelism: 15,
1665            pull_task_ack: true,
1666            pending_pull_task_count: 2,
1667        };
1668        assert_ne!(ice_state1, ice_state2);
1669    }
1670
1671    #[test]
1672    fn test_log_throttler_state_change_detection() {
1673        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1674        let state1 = CompactionLogState {
1675            running_parallelism: 10,
1676            pull_task_ack: true,
1677            pending_pull_task_count: 2,
1678        };
1679        let state2 = CompactionLogState {
1680            running_parallelism: 11,
1681            pull_task_ack: true,
1682            pending_pull_task_count: 2,
1683        };
1684
1685        // First call should always log
1686        assert!(throttler.should_log(&state1));
1687        throttler.update(state1.clone());
1688
1689        // Same state should not log
1690        assert!(!throttler.should_log(&state1));
1691
1692        // Changed state should log
1693        assert!(throttler.should_log(&state2));
1694        throttler.update(state2.clone());
1695
1696        // Same state again should not log
1697        assert!(!throttler.should_log(&state2));
1698    }
1699
1700    #[test]
1701    fn test_log_throttler_heartbeat() {
1702        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1703        let state = CompactionLogState {
1704            running_parallelism: 10,
1705            pull_task_ack: true,
1706            pending_pull_task_count: 2,
1707        };
1708
1709        // First call should log
1710        assert!(throttler.should_log(&state));
1711        throttler.update(state.clone());
1712
1713        // Same state immediately should not log
1714        assert!(!throttler.should_log(&state));
1715
1716        // Wait for heartbeat interval to pass
1717        std::thread::sleep(Duration::from_millis(15));
1718
1719        // Same state after interval should log (heartbeat)
1720        assert!(throttler.should_log(&state));
1721    }
1722}